1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ch.qos.logback.core.net;
16
17 import java.io.IOException;
18 import java.io.Serializable;
19 import java.net.ConnectException;
20 import java.net.InetAddress;
21 import java.net.Socket;
22 import java.net.UnknownHostException;
23 import java.util.concurrent.BlockingDeque;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.net.SocketFactory;
28
29 import ch.qos.logback.core.AppenderBase;
30 import ch.qos.logback.core.spi.PreSerializationTransformer;
31 import ch.qos.logback.core.util.CloseUtil;
32 import ch.qos.logback.core.util.Duration;
33
34
35
36
37
38
39
40
41
42
43
44 public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
45
46
47
48
49 public static final int DEFAULT_PORT = 4560;
50
51
52
53
54 public static final int DEFAULT_RECONNECTION_DELAY = 30000;
55
56
57
58
59
60 public static final int DEFAULT_QUEUE_SIZE = 128;
61
62
63
64
65 private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
66
67
68
69
70
71 private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
72
73 private final ObjectWriterFactory objectWriterFactory;
74 private final QueueFactory queueFactory;
75
76 private String remoteHost;
77 private int port = DEFAULT_PORT;
78 private InetAddress address;
79 private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
80 private int queueSize = DEFAULT_QUEUE_SIZE;
81 private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
82 private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
83
84 private BlockingDeque<E> deque;
85 private String peerId;
86 private SocketConnector connector;
87 private Future<?> task;
88
89 private volatile Socket socket;
90
91
92
93
94 protected AbstractSocketAppender() {
95 this(new QueueFactory(), new ObjectWriterFactory());
96 }
97
98
99
100
101
102 AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
103 this.objectWriterFactory = objectWriterFactory;
104 this.queueFactory = queueFactory;
105 }
106
107
108
109
110 public void start() {
111 if (isStarted())
112 return;
113 int errorCount = 0;
114 if (port <= 0) {
115 errorCount++;
116 addError("No port was configured for appender" + name
117 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
118 }
119
120 if (remoteHost == null) {
121 errorCount++;
122 addError("No remote host was configured for appender" + name
123 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
124 }
125
126 if (queueSize == 0) {
127 addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
128 }
129
130 if (queueSize < 0) {
131 errorCount++;
132 addError("Queue size must be greater than zero");
133 }
134
135 if (errorCount == 0) {
136 try {
137 address = InetAddress.getByName(remoteHost);
138 } catch (UnknownHostException ex) {
139 addError("unknown host: " + remoteHost);
140 errorCount++;
141 }
142 }
143
144 if (errorCount == 0) {
145 deque = queueFactory.newLinkedBlockingDeque(queueSize);
146 peerId = "remote peer " + remoteHost + ":" + port + ": ";
147 connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
148 task = getContext().getScheduledExecutorService().submit(new Runnable() {
149 @Override
150 public void run() {
151 connectSocketAndDispatchEvents();
152 }
153 });
154 super.start();
155 }
156 }
157
158
159
160
161 @Override
162 public void stop() {
163 if (!isStarted())
164 return;
165 CloseUtil.closeQuietly(socket);
166 task.cancel(true);
167 super.stop();
168 }
169
170
171
172
173 @Override
174 protected void append(E event) {
175 if (event == null || !isStarted())
176 return;
177
178 try {
179 final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
180 if (!inserted) {
181 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
182 }
183 } catch (InterruptedException e) {
184 addError("Interrupted while appending event to SocketAppender", e);
185 }
186 }
187
188 private void connectSocketAndDispatchEvents() {
189 try {
190 while (socketConnectionCouldBeEstablished()) {
191 try {
192 ObjectWriter objectWriter = createObjectWriterForSocket();
193 addInfo(peerId + "connection established");
194 dispatchEvents(objectWriter);
195 } catch (javax.net.ssl.SSLHandshakeException she) {
196
197 Thread.sleep(DEFAULT_RECONNECTION_DELAY);
198 } catch (IOException ex) {
199 addInfo(peerId + "connection failed: ", ex);
200 } finally {
201 CloseUtil.closeQuietly(socket);
202 socket = null;
203 addInfo(peerId + "connection closed");
204 }
205 }
206 } catch (InterruptedException ex) {
207 assert true;
208 }
209 addInfo("shutting down");
210 }
211
212 private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
213 return (socket = connector.call()) != null;
214 }
215
216 private ObjectWriter createObjectWriterForSocket() throws IOException {
217 socket.setSoTimeout(acceptConnectionTimeout);
218 ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
219 socket.setSoTimeout(0);
220 return objectWriter;
221 }
222
223 private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
224 SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
225 connector.setExceptionHandler(this);
226 connector.setSocketFactory(getSocketFactory());
227 return connector;
228 }
229
230 private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
231 while (true) {
232 E event = deque.takeFirst();
233 postProcessEvent(event);
234 Serializable serializableEvent = getPST().transform(event);
235 try {
236 objectWriter.write(serializableEvent);
237 } catch (IOException e) {
238 tryReAddingEventToFrontOfQueue(event);
239 throw e;
240 }
241 }
242 }
243
244 private void tryReAddingEventToFrontOfQueue(E event) {
245 final boolean wasInserted = deque.offerFirst(event);
246 if (!wasInserted) {
247 addInfo("Dropping event due to socket connection error and maxed out deque capacity");
248 }
249 }
250
251
252
253
254 public void connectionFailed(SocketConnector connector, Exception ex) {
255 if (ex instanceof InterruptedException) {
256 addInfo("connector interrupted");
257 } else if (ex instanceof ConnectException) {
258 addInfo(peerId + "connection refused");
259 } else {
260 addInfo(peerId + ex);
261 }
262 }
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277 protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
278 return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
279 }
280
281
282
283
284
285
286 protected SocketFactory getSocketFactory() {
287 return SocketFactory.getDefault();
288 }
289
290
291
292
293
294
295
296 protected abstract void postProcessEvent(E event);
297
298
299
300
301
302
303
304 protected abstract PreSerializationTransformer<E> getPST();
305
306
307
308
309
310 public void setRemoteHost(String host) {
311 remoteHost = host;
312 }
313
314
315
316
317 public String getRemoteHost() {
318 return remoteHost;
319 }
320
321
322
323
324
325 public void setPort(int port) {
326 this.port = port;
327 }
328
329
330
331
332 public int getPort() {
333 return port;
334 }
335
336
337
338
339
340
341
342
343
344 public void setReconnectionDelay(Duration delay) {
345 this.reconnectionDelay = delay;
346 }
347
348
349
350
351 public Duration getReconnectionDelay() {
352 return reconnectionDelay;
353 }
354
355
356
357
358
359
360
361
362
363
364
365
366 public void setQueueSize(int queueSize) {
367 this.queueSize = queueSize;
368 }
369
370
371
372
373 public int getQueueSize() {
374 return queueSize;
375 }
376
377
378
379
380
381
382
383
384 public void setEventDelayLimit(Duration eventDelayLimit) {
385 this.eventDelayLimit = eventDelayLimit;
386 }
387
388
389
390
391 public Duration getEventDelayLimit() {
392 return eventDelayLimit;
393 }
394
395
396
397
398
399
400
401
402
403
404 void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
405 this.acceptConnectionTimeout = acceptConnectionTimeout;
406 }
407
408 }