View Javadoc
1   /**
2    * Logback: the reliable, generic, fast and flexible logging framework.
3    * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
4    *
5    * This program and the accompanying materials are dual-licensed under
6    * either the terms of the Eclipse Public License v1.0 as published by
7    * the Eclipse Foundation
8    *
9    *   or (per the licensee's choosing)
10   *
11   * under the terms of the GNU Lesser General Public License version 2.1
12   * as published by the Free Software Foundation.
13   */
14  // Contributors: Dan MacDonald <dan@redknee.com>
15  package ch.qos.logback.core.net;
16  
17  import java.io.IOException;
18  import java.io.Serializable;
19  import java.net.ConnectException;
20  import java.net.InetAddress;
21  import java.net.Socket;
22  import java.net.UnknownHostException;
23  import java.util.concurrent.BlockingDeque;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.TimeUnit;
26  
27  import javax.net.SocketFactory;
28  
29  import ch.qos.logback.core.AppenderBase;
30  import ch.qos.logback.core.spi.PreSerializationTransformer;
31  import ch.qos.logback.core.util.CloseUtil;
32  import ch.qos.logback.core.util.Duration;
33  
34  /**
35   * An abstract base for module specific {@code SocketAppender} implementations
36   * in other logback modules.
37   * 
38   * @author Ceki G&uuml;lc&uuml;
39   * @author S&eacute;bastien Pennec
40   * @author Carl Harris
41   * @author Sebastian Gr&ouml;bler
42   */
43  
44  public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
45  
46      /**
47       * The default port number of remote logging server (4560).
48       */
49      public static final int DEFAULT_PORT = 4560;
50  
51      /**
52       * The default reconnection delay (30000 milliseconds or 30 seconds).
53       */
54      public static final int DEFAULT_RECONNECTION_DELAY = 30000;
55  
56      /**
57       * Default size of the deque used to hold logging events that are destined for
58       * the remote peer.
59       */
60      public static final int DEFAULT_QUEUE_SIZE = 128;
61  
62      /**
63       * Default timeout when waiting for the remote server to accept our connection.
64       */
65      private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
66  
67      /**
68       * Default timeout for how long to wait when inserting an event into the
69       * BlockingQueue.
70       */
71      private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
72  
73      private final ObjectWriterFactory objectWriterFactory;
74      private final QueueFactory queueFactory;
75  
76      private String remoteHost;
77      private int port = DEFAULT_PORT;
78      private InetAddress address;
79      private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
80      private int queueSize = DEFAULT_QUEUE_SIZE;
81      private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
82      private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
83  
84      private BlockingDeque<E> deque;
85      private String peerId;
86      private SocketConnector connector;
87      private Future<?> task;
88  
89      private volatile Socket socket;
90  
91      /**
92       * Constructs a new appender.
93       */
94      protected AbstractSocketAppender() {
95          this(new QueueFactory(), new ObjectWriterFactory());
96      }
97  
98      /**
99       * Constructs a new appender using the given {@link QueueFactory} and
100      * {@link ObjectWriterFactory}.
101      */
102     AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
103         this.objectWriterFactory = objectWriterFactory;
104         this.queueFactory = queueFactory;
105     }
106 
107     /**
108      * {@inheritDoc}
109      */
110     public void start() {
111         if (isStarted())
112             return;
113         int errorCount = 0;
114         if (port <= 0) {
115             errorCount++;
116             addError("No port was configured for appender" + name
117                     + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
118         }
119 
120         if (remoteHost == null) {
121             errorCount++;
122             addError("No remote host was configured for appender" + name
123                     + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
124         }
125 
126         if (queueSize == 0) {
127             addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
128         }
129 
130         if (queueSize < 0) {
131             errorCount++;
132             addError("Queue size must be greater than zero");
133         }
134 
135         if (errorCount == 0) {
136             try {
137                 address = InetAddress.getByName(remoteHost);
138             } catch (UnknownHostException ex) {
139                 addError("unknown host: " + remoteHost);
140                 errorCount++;
141             }
142         }
143 
144         if (errorCount == 0) {
145             deque = queueFactory.newLinkedBlockingDeque(queueSize);
146             peerId = "remote peer " + remoteHost + ":" + port + ": ";
147             connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
148             task = getContext().getScheduledExecutorService().submit(new Runnable() {
149                 @Override
150                 public void run() {
151                     connectSocketAndDispatchEvents();
152                 }
153             });
154             super.start();
155         }
156     }
157 
158     /**
159      * {@inheritDoc}
160      */
161     @Override
162     public void stop() {
163         if (!isStarted())
164             return;
165         CloseUtil.closeQuietly(socket);
166         task.cancel(true);
167         super.stop();
168     }
169 
170     /**
171      * {@inheritDoc}
172      */
173     @Override
174     protected void append(E event) {
175         if (event == null || !isStarted())
176             return;
177 
178         try {
179             final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
180             if (!inserted) {
181                 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
182             }
183         } catch (InterruptedException e) {
184             addError("Interrupted while appending event to SocketAppender", e);
185         }
186     }
187 
188     private void connectSocketAndDispatchEvents() {
189         try {
190             while (socketConnectionCouldBeEstablished()) {
191                 try {
192                     ObjectWriter objectWriter = createObjectWriterForSocket();
193                     addInfo(peerId + "connection established");
194                     dispatchEvents(objectWriter);
195                 } catch (javax.net.ssl.SSLHandshakeException she) {
196                     // FIXME
197                     Thread.sleep(DEFAULT_RECONNECTION_DELAY);
198                 } catch (IOException ex) {
199                     addInfo(peerId + "connection failed: ", ex);
200                 } finally {
201                     CloseUtil.closeQuietly(socket);
202                     socket = null;
203                     addInfo(peerId + "connection closed");
204                 }
205             }
206         } catch (InterruptedException ex) {
207             assert true; // ok... we'll exit now
208         }
209         addInfo("shutting down");
210     }
211 
212     private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
213         return (socket = connector.call()) != null;
214     }
215 
216     private ObjectWriter createObjectWriterForSocket() throws IOException {
217         socket.setSoTimeout(acceptConnectionTimeout);
218         ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
219         socket.setSoTimeout(0);
220         return objectWriter;
221     }
222 
223     private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
224         SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
225         connector.setExceptionHandler(this);
226         connector.setSocketFactory(getSocketFactory());
227         return connector;
228     }
229 
230     private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
231         while (true) {
232             E event = deque.takeFirst();
233             postProcessEvent(event);
234             Serializable serializableEvent = getPST().transform(event);
235             try {
236                 objectWriter.write(serializableEvent);
237             } catch (IOException e) {
238                 tryReAddingEventToFrontOfQueue(event);
239                 throw e;
240             }
241         }
242     }
243 
244     private void tryReAddingEventToFrontOfQueue(E event) {
245         final boolean wasInserted = deque.offerFirst(event);
246         if (!wasInserted) {
247             addInfo("Dropping event due to socket connection error and maxed out deque capacity");
248         }
249     }
250 
251     /**
252      * {@inheritDoc}
253      */
254     public void connectionFailed(SocketConnector connector, Exception ex) {
255         if (ex instanceof InterruptedException) {
256             addInfo("connector interrupted");
257         } else if (ex instanceof ConnectException) {
258             addInfo(peerId + "connection refused");
259         } else {
260             addInfo(peerId + ex);
261         }
262     }
263 
264     /**
265      * Creates a new {@link SocketConnector}.
266      * <p>
267      * The default implementation creates an instance of
268      * {@link DefaultSocketConnector}. A subclass may override to provide a
269      * different {@link SocketConnector} implementation.
270      * 
271      * @param address      target remote address
272      * @param port         target remote port
273      * @param initialDelay delay before the first connection attempt
274      * @param retryDelay   delay before a reconnection attempt
275      * @return socket connector
276      */
277     protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
278         return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
279     }
280 
281     /**
282      * Gets the default {@link SocketFactory} for the platform.
283      * <p>
284      * Subclasses may override to provide a custom socket factory.
285      */
286     protected SocketFactory getSocketFactory() {
287         return SocketFactory.getDefault();
288     }
289 
290     /**
291      * Post-processes an event before it is serialized for delivery to the remote
292      * receiver.
293      * 
294      * @param event the event to post-process
295      */
296     protected abstract void postProcessEvent(E event);
297 
298     /**
299      * Get the pre-serialization transformer that will be used to transform each
300      * event into a Serializable object before delivery to the remote receiver.
301      * 
302      * @return transformer object
303      */
304     protected abstract PreSerializationTransformer<E> getPST();
305 
306     /**
307      * The <b>RemoteHost</b> property takes the name of the host where a
308      * corresponding server is running.
309      */
310     public void setRemoteHost(String host) {
311         remoteHost = host;
312     }
313 
314     /**
315      * Returns value of the <b>RemoteHost</b> property.
316      */
317     public String getRemoteHost() {
318         return remoteHost;
319     }
320 
321     /**
322      * The <b>Port</b> property takes a positive integer representing the port where
323      * the server is waiting for connections.
324      */
325     public void setPort(int port) {
326         this.port = port;
327     }
328 
329     /**
330      * Returns value of the <b>Port</b> property.
331      */
332     public int getPort() {
333         return port;
334     }
335 
336     /**
337      * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value
338      * representing the time to wait between each failed connection attempt to the
339      * server. The default value of this option is to 30 seconds.
340      *
341      * <p>
342      * Setting this option to zero turns off reconnection capability.
343      */
344     public void setReconnectionDelay(Duration delay) {
345         this.reconnectionDelay = delay;
346     }
347 
348     /**
349      * Returns value of the <b>reconnectionDelay</b> property.
350      */
351     public Duration getReconnectionDelay() {
352         return reconnectionDelay;
353     }
354 
355     /**
356      * The <b>queueSize</b> property takes a non-negative integer representing the
357      * number of logging events to retain for delivery to the remote receiver. When
358      * the deque size is zero, event delivery to the remote receiver is synchronous.
359      * When the deque size is greater than zero, the {@link #append(Object)} method
360      * returns immediately after enqueing the event, assuming that there is space
361      * available in the deque. Using a non-zero deque length can improve performance
362      * by eliminating delays caused by transient network delays.
363      * 
364      * @param queueSize the deque size to set.
365      */
366     public void setQueueSize(int queueSize) {
367         this.queueSize = queueSize;
368     }
369 
370     /**
371      * Returns the value of the <b>queueSize</b> property.
372      */
373     public int getQueueSize() {
374         return queueSize;
375     }
376 
377     /**
378      * The <b>eventDelayLimit</b> takes a non-negative integer representing the
379      * number of milliseconds to allow the appender to block if the underlying
380      * BlockingQueue is full. Once this limit is reached, the event is dropped.
381      *
382      * @param eventDelayLimit the event delay limit
383      */
384     public void setEventDelayLimit(Duration eventDelayLimit) {
385         this.eventDelayLimit = eventDelayLimit;
386     }
387 
388     /**
389      * Returns the value of the <b>eventDelayLimit</b> property.
390      */
391     public Duration getEventDelayLimit() {
392         return eventDelayLimit;
393     }
394 
395     /**
396      * Sets the timeout that controls how long we'll wait for the remote peer to
397      * accept our connection attempt.
398      * <p>
399      * This property is configurable primarily to support instrumentation for unit
400      * testing.
401      * 
402      * @param acceptConnectionTimeout timeout value in milliseconds
403      */
404     void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
405         this.acceptConnectionTimeout = acceptConnectionTimeout;
406     }
407 
408 }