View Javadoc
1   /**
2    * Logback: the reliable, generic, fast and flexible logging framework.
3    * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
4    *
5    * This program and the accompanying materials are dual-licensed under
6    * either the terms of the Eclipse Public License v1.0 as published by
7    * the Eclipse Foundation
8    *
9    *   or (per the licensee's choosing)
10   *
11   * under the terms of the GNU Lesser General Public License version 2.1
12   * as published by the Free Software Foundation.
13   */
14  // Contributors: Dan MacDonald <dan@redknee.com>
15  package ch.qos.logback.core.net;
16  
17  import java.io.IOException;
18  import java.io.Serializable;
19  import java.net.ConnectException;
20  import java.net.InetAddress;
21  import java.net.Socket;
22  import java.net.UnknownHostException;
23  import java.util.concurrent.BlockingDeque;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.TimeUnit;
26  
27  import javax.net.SocketFactory;
28  
29  import ch.qos.logback.core.AppenderBase;
30  import ch.qos.logback.core.spi.PreSerializationTransformer;
31  import ch.qos.logback.core.util.CloseUtil;
32  import ch.qos.logback.core.util.Duration;
33  
34  /**
35   * An abstract base for module specific {@code SocketAppender}
36   * implementations in other logback modules.
37   * 
38   * @author Ceki G&uuml;lc&uuml;
39   * @author S&eacute;bastien Pennec
40   * @author Carl Harris
41   * @author Sebastian Gr&ouml;bler
42   */
43  
44  public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
45  
46      /**
47       * The default port number of remote logging server (4560).
48       */
49      public static final int DEFAULT_PORT = 4560;
50  
51      /**
52       * The default reconnection delay (30000 milliseconds or 30 seconds).
53       */
54      public static final int DEFAULT_RECONNECTION_DELAY = 30000;
55  
56      /**
57       * Default size of the deque used to hold logging events that are destined
58       * for the remote peer.
59       */
60      public static final int DEFAULT_QUEUE_SIZE = 128;
61  
62      /**
63       * Default timeout when waiting for the remote server to accept our
64       * connection.
65       */
66      private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
67  
68      /**
69       * Default timeout for how long to wait when inserting an event into
70       * the BlockingQueue.
71       */
72      private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
73  
74      private final ObjectWriterFactory objectWriterFactory;
75      private final QueueFactory queueFactory;
76  
77      private String remoteHost;
78      private int port = DEFAULT_PORT;
79      private InetAddress address;
80      private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
81      private int queueSize = DEFAULT_QUEUE_SIZE;
82      private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
83      private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
84  
85      private BlockingDeque<E> deque;
86      private String peerId;
87      private SocketConnector connector;
88      private Future<?> task;
89  
90      private volatile Socket socket;
91  
92      /**
93       * Constructs a new appender.
94       */
95      protected AbstractSocketAppender() {
96          this(new QueueFactory(), new ObjectWriterFactory());
97      }
98  
99      /**
100      * Constructs a new appender using the given {@link QueueFactory} and {@link ObjectWriterFactory}.
101      */
102     AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
103         this.objectWriterFactory = objectWriterFactory;
104         this.queueFactory = queueFactory;
105     }
106 
107     /**
108      * {@inheritDoc}
109      */
110     public void start() {
111         if (isStarted())
112             return;
113         int errorCount = 0;
114         if (port <= 0) {
115             errorCount++;
116             addError("No port was configured for appender" + name + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
117         }
118 
119         if (remoteHost == null) {
120             errorCount++;
121             addError("No remote host was configured for appender" + name
122                             + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
123         }
124 
125         if (queueSize == 0) {
126             addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
127         }
128 
129         if (queueSize < 0) {
130             errorCount++;
131             addError("Queue size must be greater than zero");
132         }
133 
134         if (errorCount == 0) {
135             try {
136                 address = InetAddress.getByName(remoteHost);
137             } catch (UnknownHostException ex) {
138                 addError("unknown host: " + remoteHost);
139                 errorCount++;
140             }
141         }
142 
143         if (errorCount == 0) {
144             deque = queueFactory.newLinkedBlockingDeque(queueSize);
145             peerId = "remote peer " + remoteHost + ":" + port + ": ";
146             connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
147             task = getContext().getScheduledExecutorService().submit(new Runnable() {
148                 @Override
149                 public void run() {
150                     connectSocketAndDispatchEvents();
151                 }
152             });
153             super.start();
154         }
155     }
156 
157     /**
158      * {@inheritDoc}
159      */
160     @Override
161     public void stop() {
162         if (!isStarted())
163             return;
164         CloseUtil.closeQuietly(socket);
165         task.cancel(true);
166         super.stop();
167     }
168 
169     /**
170      * {@inheritDoc}
171      */
172     @Override
173     protected void append(E event) {
174         if (event == null || !isStarted())
175             return;
176 
177         try {
178             final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
179             if (!inserted) {
180                 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
181             }
182         } catch (InterruptedException e) {
183             addError("Interrupted while appending event to SocketAppender", e);
184         }
185     }
186 
187     private void connectSocketAndDispatchEvents() {
188         try {
189             while (socketConnectionCouldBeEstablished()) {
190                 try {
191                     ObjectWriter objectWriter = createObjectWriterForSocket();
192                     addInfo(peerId + "connection established");
193                     dispatchEvents(objectWriter);
194                 } catch (IOException ex) {
195                     addInfo(peerId + "connection failed: " + ex);
196                 } finally {
197                     CloseUtil.closeQuietly(socket);
198                     socket = null;
199                     addInfo(peerId + "connection closed");
200                 }
201             }
202         } catch (InterruptedException ex) {
203             assert true; // ok... we'll exit now
204         }
205         addInfo("shutting down");
206     }
207 
208     private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
209         return (socket = connector.call()) != null;
210     }
211 
212     private ObjectWriter createObjectWriterForSocket() throws IOException {
213         socket.setSoTimeout(acceptConnectionTimeout);
214         ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
215         socket.setSoTimeout(0);
216         return objectWriter;
217     }
218 
219     private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
220         SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
221         connector.setExceptionHandler(this);
222         connector.setSocketFactory(getSocketFactory());
223         return connector;
224     }
225 
226     private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
227         while (true) {
228             E event = deque.takeFirst();
229             postProcessEvent(event);
230             Serializable serializableEvent = getPST().transform(event);
231             try {
232                 objectWriter.write(serializableEvent);
233             } catch (IOException e) {
234                 tryReAddingEventToFrontOfQueue(event);
235                 throw e;
236             }
237         }
238     }
239 
240     private void tryReAddingEventToFrontOfQueue(E event) {
241         final boolean wasInserted = deque.offerFirst(event);
242         if (!wasInserted) {
243             addInfo("Dropping event due to socket connection error and maxed out deque capacity");
244         }
245     }
246 
247     /**
248      * {@inheritDoc}
249      */
250     public void connectionFailed(SocketConnector connector, Exception ex) {
251         if (ex instanceof InterruptedException) {
252             addInfo("connector interrupted");
253         } else if (ex instanceof ConnectException) {
254             addInfo(peerId + "connection refused");
255         } else {
256             addInfo(peerId + ex);
257         }
258     }
259 
260     /**
261      * Creates a new {@link SocketConnector}.
262      * <p>
263      * The default implementation creates an instance of {@link DefaultSocketConnector}.
264      * A subclass may override to provide a different {@link SocketConnector}
265      * implementation.
266      * 
267      * @param address target remote address
268      * @param port target remote port
269      * @param initialDelay delay before the first connection attempt
270      * @param retryDelay delay before a reconnection attempt
271      * @return socket connector
272      */
273     protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
274         return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
275     }
276 
277     /**
278      * Gets the default {@link SocketFactory} for the platform.
279      * <p>
280      * Subclasses may override to provide a custom socket factory.
281      */
282     protected SocketFactory getSocketFactory() {
283         return SocketFactory.getDefault();
284     }
285 
286     /**
287      * Post-processes an event before it is serialized for delivery to the
288      * remote receiver.
289      * @param event the event to post-process
290      */
291     protected abstract void postProcessEvent(E event);
292 
293     /**
294      * Get the pre-serialization transformer that will be used to transform
295      * each event into a Serializable object before delivery to the remote
296      * receiver.
297      * @return transformer object
298      */
299     protected abstract PreSerializationTransformer<E> getPST();
300 
301     /**
302      * The <b>RemoteHost</b> property takes the name of of the host where a corresponding server is running.
303      */
304     public void setRemoteHost(String host) {
305         remoteHost = host;
306     }
307 
308     /**
309      * Returns value of the <b>RemoteHost</b> property.
310      */
311     public String getRemoteHost() {
312         return remoteHost;
313     }
314 
315     /**
316      * The <b>Port</b> property takes a positive integer representing the port
317      * where the server is waiting for connections.
318      */
319     public void setPort(int port) {
320         this.port = port;
321     }
322 
323     /**
324      * Returns value of the <b>Port</b> property.
325      */
326     public int getPort() {
327         return port;
328     }
329 
330     /**
331      * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value
332      * representing the time to wait between each failed connection attempt
333      * to the server. The default value of this option is to 30 seconds.
334      *
335      * <p>
336      * Setting this option to zero turns off reconnection capability.
337      */
338     public void setReconnectionDelay(Duration delay) {
339         this.reconnectionDelay = delay;
340     }
341 
342     /**
343      * Returns value of the <b>reconnectionDelay</b> property.
344      */
345     public Duration getReconnectionDelay() {
346         return reconnectionDelay;
347     }
348 
349     /**
350      * The <b>queueSize</b> property takes a non-negative integer representing
351      * the number of logging events to retain for delivery to the remote receiver.
352      * When the deque size is zero, event delivery to the remote receiver is
353      * synchronous.  When the deque size is greater than zero, the
354      * {@link #append(Object)} method returns immediately after enqueing the
355      * event, assuming that there is space available in the deque.  Using a
356      * non-zero deque length can improve performance by eliminating delays
357      * caused by transient network delays.
358      * 
359      * @param queueSize the deque size to set.
360      */
361     public void setQueueSize(int queueSize) {
362         this.queueSize = queueSize;
363     }
364 
365     /**
366      * Returns the value of the <b>queueSize</b> property.
367      */
368     public int getQueueSize() {
369         return queueSize;
370     }
371 
372     /**
373      * The <b>eventDelayLimit</b> takes a non-negative integer representing the
374      * number of milliseconds to allow the appender to block if the underlying
375      * BlockingQueue is full. Once this limit is reached, the event is dropped.
376      *
377      * @param eventDelayLimit the event delay limit
378      */
379     public void setEventDelayLimit(Duration eventDelayLimit) {
380         this.eventDelayLimit = eventDelayLimit;
381     }
382 
383     /**
384      * Returns the value of the <b>eventDelayLimit</b> property.
385      */
386     public Duration getEventDelayLimit() {
387         return eventDelayLimit;
388     }
389 
390     /**
391      * Sets the timeout that controls how long we'll wait for the remote
392      * peer to accept our connection attempt.
393      * <p>
394      * This property is configurable primarily to support instrumentation
395      * for unit testing.
396      * 
397      * @param acceptConnectionTimeout timeout value in milliseconds
398      */
399     void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
400         this.acceptConnectionTimeout = acceptConnectionTimeout;
401     }
402 
403 }