1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package ch.qos.logback.core.net;
16
17 import java.io.IOException;
18 import java.io.Serializable;
19 import java.net.ConnectException;
20 import java.net.InetAddress;
21 import java.net.Socket;
22 import java.net.UnknownHostException;
23 import java.util.concurrent.BlockingDeque;
24 import java.util.concurrent.Future;
25 import java.util.concurrent.TimeUnit;
26
27 import javax.net.SocketFactory;
28
29 import ch.qos.logback.core.AppenderBase;
30 import ch.qos.logback.core.spi.DeferredProcessingAware;
31 import ch.qos.logback.core.spi.PreSerializationTransformer;
32 import ch.qos.logback.core.util.CloseUtil;
33 import ch.qos.logback.core.util.Duration;
34
35
36
37
38
39
40
41
42
43
44
45 public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
46
47
48
49
50 public static final int DEFAULT_PORT = 4560;
51
52
53
54
55 public static final int DEFAULT_RECONNECTION_DELAY = 30000;
56
57
58
59
60
61 public static final int DEFAULT_QUEUE_SIZE = 128;
62
63
64
65
66 private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
67
68
69
70
71
72 private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
73
74 private final ObjectWriterFactory objectWriterFactory;
75 private final QueueFactory queueFactory;
76
77 private String remoteHost;
78 private int port = DEFAULT_PORT;
79 private InetAddress address;
80 private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
81 private int queueSize = DEFAULT_QUEUE_SIZE;
82 private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
83 private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
84
85 private BlockingDeque<E> deque;
86 private String peerId;
87 private SocketConnector connector;
88 private Future<?> task;
89
90 private volatile Socket socket;
91
92
93
94
95 protected AbstractSocketAppender() {
96 this(new QueueFactory(), new ObjectWriterFactory());
97 }
98
99
100
101
102
103 AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
104 this.objectWriterFactory = objectWriterFactory;
105 this.queueFactory = queueFactory;
106 }
107
108
109
110
111 public void start() {
112 if (isStarted())
113 return;
114 int errorCount = 0;
115 if (port <= 0) {
116 errorCount++;
117 addError("No port was configured for appender" + name
118 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
119 }
120
121 if (remoteHost == null) {
122 errorCount++;
123 addError("No remote host was configured for appender" + name
124 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
125 }
126
127 if (queueSize == 0) {
128 addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
129 }
130
131 if (queueSize < 0) {
132 errorCount++;
133 addError("Queue size must be greater than zero");
134 }
135
136 if (errorCount == 0) {
137 try {
138 address = InetAddress.getByName(remoteHost);
139 } catch (UnknownHostException ex) {
140 addError("unknown host: " + remoteHost);
141 errorCount++;
142 }
143 }
144
145 if (errorCount == 0) {
146 deque = queueFactory.newLinkedBlockingDeque(queueSize);
147 peerId = "remote peer " + remoteHost + ":" + port + ": ";
148 connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
149 task = getContext().getExecutorService().submit(new Runnable() {
150 @Override
151 public void run() {
152 connectSocketAndDispatchEvents();
153 }
154 });
155 super.start();
156 }
157 }
158
159
160
161
162 @Override
163 public void stop() {
164 if (!isStarted())
165 return;
166 CloseUtil.closeQuietly(socket);
167 task.cancel(true);
168 super.stop();
169 }
170
171
172
173
174 @Override
175 protected void append(E event) {
176 if (event == null || !isStarted())
177 return;
178
179 try {
180
181
182 if(event instanceof DeferredProcessingAware) {
183 ((DeferredProcessingAware) event).prepareForDeferredProcessing();
184 }
185
186 final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
187 if (!inserted) {
188 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
189 }
190 } catch (InterruptedException e) {
191 addError("Interrupted while appending event to SocketAppender", e);
192 }
193 }
194
195 private void connectSocketAndDispatchEvents() {
196 try {
197 while (socketConnectionCouldBeEstablished()) {
198 try {
199 ObjectWriter objectWriter = createObjectWriterForSocket();
200 addInfo(peerId + "connection established");
201 dispatchEvents(objectWriter);
202 } catch (javax.net.ssl.SSLHandshakeException she) {
203
204 Thread.sleep(DEFAULT_RECONNECTION_DELAY);
205 } catch (IOException ex) {
206 addInfo(peerId + "connection failed: ", ex);
207 } finally {
208 CloseUtil.closeQuietly(socket);
209 socket = null;
210 addInfo(peerId + "connection closed");
211 }
212 }
213 } catch (InterruptedException ex) {
214 assert true;
215 }
216 addInfo("shutting down");
217 }
218
219 private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
220 return (socket = connector.call()) != null;
221 }
222
223 private ObjectWriter createObjectWriterForSocket() throws IOException {
224 socket.setSoTimeout(acceptConnectionTimeout);
225 ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
226 socket.setSoTimeout(0);
227 return objectWriter;
228 }
229
230 private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
231 SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
232 connector.setExceptionHandler(this);
233 connector.setSocketFactory(getSocketFactory());
234 return connector;
235 }
236
237 private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
238 while (true) {
239 E event = deque.takeFirst();
240 postProcessEvent(event);
241 Serializable serializableEvent = getPST().transform(event);
242 try {
243 objectWriter.write(serializableEvent);
244 } catch (IOException e) {
245 tryReAddingEventToFrontOfQueue(event);
246 throw e;
247 }
248 }
249 }
250
251 private void tryReAddingEventToFrontOfQueue(E event) {
252 final boolean wasInserted = deque.offerFirst(event);
253 if (!wasInserted) {
254 addInfo("Dropping event due to socket connection error and maxed out deque capacity");
255 }
256 }
257
258
259
260
261 public void connectionFailed(SocketConnector connector, Exception ex) {
262 if (ex instanceof InterruptedException) {
263 addInfo("connector interrupted");
264 } else if (ex instanceof ConnectException) {
265 addInfo(peerId + "connection refused");
266 } else {
267 addInfo(peerId + ex);
268 }
269 }
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284 protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
285 return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
286 }
287
288
289
290
291
292
293 protected SocketFactory getSocketFactory() {
294 return SocketFactory.getDefault();
295 }
296
297
298
299
300
301
302
303 protected abstract void postProcessEvent(E event);
304
305
306
307
308
309
310
311 protected abstract PreSerializationTransformer<E> getPST();
312
313
314
315
316
317 public void setRemoteHost(String host) {
318 remoteHost = host;
319 }
320
321
322
323
324 public String getRemoteHost() {
325 return remoteHost;
326 }
327
328
329
330
331
332 public void setPort(int port) {
333 this.port = port;
334 }
335
336
337
338
339 public int getPort() {
340 return port;
341 }
342
343
344
345
346
347
348
349
350
351 public void setReconnectionDelay(Duration delay) {
352 this.reconnectionDelay = delay;
353 }
354
355
356
357
358 public Duration getReconnectionDelay() {
359 return reconnectionDelay;
360 }
361
362
363
364
365
366
367
368
369
370
371
372
373 public void setQueueSize(int queueSize) {
374 this.queueSize = queueSize;
375 }
376
377
378
379
380 public int getQueueSize() {
381 return queueSize;
382 }
383
384
385
386
387
388
389
390
391 public void setEventDelayLimit(Duration eventDelayLimit) {
392 this.eventDelayLimit = eventDelayLimit;
393 }
394
395
396
397
398 public Duration getEventDelayLimit() {
399 return eventDelayLimit;
400 }
401
402
403
404
405
406
407
408
409
410
411 void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
412 this.acceptConnectionTimeout = acceptConnectionTimeout;
413 }
414
415 }