1
2
3
4
5
6
7
8
9
10
11
12
13
14 package ch.qos.logback.core.encoder;
15
16 import static ch.qos.logback.core.CoreConstants.BYTES_PER_INT;
17 import static ch.qos.logback.core.encoder.ObjectStreamEncoder.START_PEBBLE;
18 import static ch.qos.logback.core.encoder.ObjectStreamEncoder.STOP_PEBBLE;
19
20 import java.io.IOException;
21 import java.io.InputStream;
22 import java.io.ObjectInputStream;
23 import java.util.ArrayList;
24 import java.util.List;
25
26
27
28
29
30
31
32
33 public class EventObjectInputStream<E> extends InputStream {
34
35 NonClosableInputStream ncis;
36 List<E> buffer = new ArrayList<E>();
37
38 int index = 0;
39
40 EventObjectInputStream(InputStream is) throws IOException {
41 this.ncis = new NonClosableInputStream(is);
42 }
43
44 @Override
45 public int read() throws IOException {
46 throw new UnsupportedOperationException(
47 "Only the readEvent method is supported.");
48 }
49
50
51
52
53 public int available() throws IOException {
54 return ncis.available();
55 }
56
57 public E readEvent() throws IOException {
58
59 E event = getFromBuffer();
60 if (event != null) {
61 return event;
62 }
63
64 internalReset();
65 int count = readHeader();
66 if(count == -1) {
67 return null;
68 }
69 readPayload(count);
70 readFooter(count);
71 return getFromBuffer();
72 }
73
74 private void internalReset() {
75 index = 0;
76 buffer.clear();
77 }
78
79 E getFromBuffer() {
80 if (index >= buffer.size()) {
81 return null;
82 }
83 return buffer.get(this.index++);
84 }
85
86 int readHeader() throws IOException {
87 byte[] headerBA = new byte[4 * BYTES_PER_INT];
88
89 int bytesRead = ncis.read(headerBA);
90 if(bytesRead == -1) {
91 return -1;
92 }
93
94
95
96
97 int offset = 0;
98 int startPebble = ByteArrayUtil.readInt(headerBA, offset);
99 if (startPebble != START_PEBBLE) {
100 throw new IllegalStateException(
101 "Does not look like data created by ObjectStreamEncoder");
102 }
103 offset += BYTES_PER_INT;
104 int count = ByteArrayUtil.readInt(headerBA, offset);
105 offset += BYTES_PER_INT;
106 int endPointer = ByteArrayUtil.readInt(headerBA, offset);
107 offset += BYTES_PER_INT;
108 int checksum = ByteArrayUtil.readInt(headerBA, offset);
109 if (checksum != (START_PEBBLE ^ count)) {
110 throw new IllegalStateException("Invalid checksum");
111 }
112 return count;
113 }
114
115 @SuppressWarnings("unchecked")
116 E readEvents(ObjectInputStream ois) throws IOException {
117 E e = null;
118 try {
119 e = (E) ois.readObject();
120 buffer.add(e);
121 } catch (ClassNotFoundException e1) {
122
123 e1.printStackTrace();
124 }
125 return e;
126 }
127
128 void readFooter(int count) throws IOException {
129 byte[] headerBA = new byte[2 * BYTES_PER_INT];
130 ncis.read(headerBA);
131
132 int offset = 0;
133 int stopPebble = ByteArrayUtil.readInt(headerBA, offset);
134 if (stopPebble != STOP_PEBBLE) {
135 throw new IllegalStateException(
136 "Looks like a corrupt stream");
137 }
138 offset += BYTES_PER_INT;
139 int checksum = ByteArrayUtil.readInt(headerBA, offset);
140 if (checksum != (STOP_PEBBLE ^ count)) {
141 throw new IllegalStateException("Invalid checksum");
142 }
143 }
144
145 void readPayload(int count) throws IOException {
146 List<E> eventList = new ArrayList<E>(count);
147 ObjectInputStream ois = new ObjectInputStream(ncis);
148 for (int i = 0; i < count; i++) {
149 E e = (E) readEvents(ois);
150 eventList.add(e);
151 }
152 ois.close();
153 }
154
155 public void close() throws IOException {
156 ncis.realClose();
157 }
158
159 }