1   /*
2    * Logback: the reliable, generic, fast and flexible logging framework.
3    * Copyright (C) 1999-2026, QOS.ch. All rights reserved.
4    *
5    * This program and the accompanying materials are dual-licensed under
6    * either the terms of the Eclipse Public License v2.0 as published by
7    * the Eclipse Foundation
8    *
9    *   or (per the licensee's choosing)
10   *
11   * under the terms of the GNU Lesser General Public License version 2.1
12   * as published by the Free Software Foundation.
13   */
14  // Contributors: Dan MacDonald <dan@redknee.com>
15  package ch.qos.logback.core.net;
16  
17  import java.io.IOException;
18  import java.io.Serializable;
19  import java.net.ConnectException;
20  import java.net.InetAddress;
21  import java.net.Socket;
22  import java.net.UnknownHostException;
23  import java.util.concurrent.BlockingDeque;
24  import java.util.concurrent.Future;
25  import java.util.concurrent.TimeUnit;
26  
27  import javax.net.SocketFactory;
28  
29  import ch.qos.logback.core.AppenderBase;
30  import ch.qos.logback.core.spi.DeferredProcessingAware;
31  import ch.qos.logback.core.spi.PreSerializationTransformer;
32  import ch.qos.logback.core.util.CloseUtil;
33  import ch.qos.logback.core.util.Duration;
34  
35  /**
36   * An abstract base for module specific {@code SocketAppender} implementations
37   * in other logback modules.
38   * 
39   * @author Ceki G&uuml;lc&uuml;
40   * @author S&eacute;bastien Pennec
41   * @author Carl Harris
42   * @author Sebastian Gr&ouml;bler
43   */
44  
45  public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
46  
47      /**
48       * The default port number of remote logging server (4560).
49       */
50      public static final int DEFAULT_PORT = 4560;
51  
52      /**
53       * The default reconnection delay (30000 milliseconds or 30 seconds).
54       */
55      public static final int DEFAULT_RECONNECTION_DELAY = 30000;
56  
57      /**
58       * Default size of the deque used to hold logging events that are destined for
59       * the remote peer.
60       */
61      public static final int DEFAULT_QUEUE_SIZE = 128;
62  
63      /**
64       * Default timeout when waiting for the remote server to accept our connection.
65       */
66      private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
67  
68      /**
69       * Default timeout for how long to wait when inserting an event into the
70       * BlockingQueue.
71       */
72      private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
73  
74      private final ObjectWriterFactory objectWriterFactory;
75      private final QueueFactory queueFactory;
76  
77      private String remoteHost;
78      private int port = DEFAULT_PORT;
79      private InetAddress address;
80      private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
81      private int queueSize = DEFAULT_QUEUE_SIZE;
82      private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
83      private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
84  
85      private BlockingDeque<E> deque;
86      private String peerId;
87      private SocketConnector connector;
88      private Future<?> task;
89  
90      private volatile Socket socket;
91  
92      /**
93       * Constructs a new appender.
94       */
95      protected AbstractSocketAppender() {
96          this(new QueueFactory(), new ObjectWriterFactory());
97      }
98  
99      /**
100      * Constructs a new appender using the given {@link QueueFactory} and
101      * {@link ObjectWriterFactory}.
102      */
103     AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
104         this.objectWriterFactory = objectWriterFactory;
105         this.queueFactory = queueFactory;
106     }
107 
108     /**
109      * {@inheritDoc}
110      */
111     public void start() {
112         if (isStarted())
113             return;
114         int errorCount = 0;
115         if (port <= 0) {
116             errorCount++;
117             addError("No port was configured for appender" + name
118                     + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
119         }
120 
121         if (remoteHost == null) {
122             errorCount++;
123             addError("No remote host was configured for appender" + name
124                     + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
125         }
126 
127         if (queueSize == 0) {
128             addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
129         }
130 
131         if (queueSize < 0) {
132             errorCount++;
133             addError("Queue size must be greater than zero");
134         }
135 
136         if (errorCount == 0) {
137             try {
138                 address = InetAddress.getByName(remoteHost);
139             } catch (UnknownHostException ex) {
140                 addError("unknown host: " + remoteHost);
141                 errorCount++;
142             }
143         }
144 
145         if (errorCount == 0) {
146             deque = queueFactory.newLinkedBlockingDeque(queueSize);
147             peerId = "remote peer " + remoteHost + ":" + port + ": ";
148             connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
149             task = getContext().getExecutorService().submit(new Runnable() {
150                 @Override
151                 public void run() {
152                     connectSocketAndDispatchEvents();
153                 }
154             });
155             super.start();
156         }
157     }
158 
159     /**
160      * {@inheritDoc}
161      */
162     @Override
163     public void stop() {
164         if (!isStarted())
165             return;
166         CloseUtil.closeQuietly(socket);
167         task.cancel(true);
168         super.stop();
169     }
170 
171     /**
172      * {@inheritDoc}
173      */
174     @Override
175     protected void append(E event) {
176         if (event == null || !isStarted())
177             return;
178 
179         try {
180 
181             // otherwise MDC information is not transferred. See also logback/issues/1010
182             if(event instanceof DeferredProcessingAware) {
183                 ((DeferredProcessingAware) event).prepareForDeferredProcessing();
184             }
185 
186             final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
187             if (!inserted) {
188                 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
189             }
190         } catch (InterruptedException e) {
191             addError("Interrupted while appending event to SocketAppender", e);
192         }
193     }
194 
195     private void connectSocketAndDispatchEvents() {
196         try {
197             while (socketConnectionCouldBeEstablished()) {
198                 try {
199                     ObjectWriter objectWriter = createObjectWriterForSocket();
200                     addInfo(peerId + "connection established");
201                     dispatchEvents(objectWriter);
202                 } catch (javax.net.ssl.SSLHandshakeException she) {
203                     // FIXME
204                     Thread.sleep(DEFAULT_RECONNECTION_DELAY);
205                 } catch (IOException ex) {
206                     addInfo(peerId + "connection failed: ", ex);
207                 } finally {
208                     CloseUtil.closeQuietly(socket);
209                     socket = null;
210                     addInfo(peerId + "connection closed");
211                 }
212             }
213         } catch (InterruptedException ex) {
214             assert true; // ok... we'll exit now
215         }
216         addInfo("shutting down");
217     }
218 
219     private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
220         return (socket = connector.call()) != null;
221     }
222 
223     private ObjectWriter createObjectWriterForSocket() throws IOException {
224         socket.setSoTimeout(acceptConnectionTimeout);
225         ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
226         socket.setSoTimeout(0);
227         return objectWriter;
228     }
229 
230     private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
231         SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
232         connector.setExceptionHandler(this);
233         connector.setSocketFactory(getSocketFactory());
234         return connector;
235     }
236 
237     private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
238         while (true) {
239             E event = deque.takeFirst();
240             postProcessEvent(event);
241             Serializable serializableEvent = getPST().transform(event);
242             try {
243                 objectWriter.write(serializableEvent);
244             } catch (IOException e) {
245                 tryReAddingEventToFrontOfQueue(event);
246                 throw e;
247             }
248         }
249     }
250 
251     private void tryReAddingEventToFrontOfQueue(E event) {
252         final boolean wasInserted = deque.offerFirst(event);
253         if (!wasInserted) {
254             addInfo("Dropping event due to socket connection error and maxed out deque capacity");
255         }
256     }
257 
258     /**
259      * {@inheritDoc}
260      */
261     public void connectionFailed(SocketConnector connector, Exception ex) {
262         if (ex instanceof InterruptedException) {
263             addInfo("connector interrupted");
264         } else if (ex instanceof ConnectException) {
265             addInfo(peerId + "connection refused");
266         } else {
267             addInfo(peerId + ex);
268         }
269     }
270 
271     /**
272      * Creates a new {@link SocketConnector}.
273      * <p>
274      * The default implementation creates an instance of
275      * {@link DefaultSocketConnector}. A subclass may override to provide a
276      * different {@link SocketConnector} implementation.
277      * 
278      * @param address      target remote address
279      * @param port         target remote port
280      * @param initialDelay delay before the first connection attempt
281      * @param retryDelay   delay before a reconnection attempt
282      * @return socket connector
283      */
284     protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
285         return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
286     }
287 
288     /**
289      * Gets the default {@link SocketFactory} for the platform.
290      * <p>
291      * Subclasses may override to provide a custom socket factory.
292      */
293     protected SocketFactory getSocketFactory() {
294         return SocketFactory.getDefault();
295     }
296 
297     /**
298      * Post-processes an event before it is serialized for delivery to the remote
299      * receiver.
300      * 
301      * @param event the event to post-process
302      */
303     protected abstract void postProcessEvent(E event);
304 
305     /**
306      * Get the pre-serialization transformer that will be used to transform each
307      * event into a Serializable object before delivery to the remote receiver.
308      * 
309      * @return transformer object
310      */
311     protected abstract PreSerializationTransformer<E> getPST();
312 
313     /**
314      * The <b>RemoteHost</b> property takes the name of the host where a
315      * corresponding server is running.
316      */
317     public void setRemoteHost(String host) {
318         remoteHost = host;
319     }
320 
321     /**
322      * Returns value of the <b>RemoteHost</b> property.
323      */
324     public String getRemoteHost() {
325         return remoteHost;
326     }
327 
328     /**
329      * The <b>Port</b> property takes a positive integer representing the port where
330      * the server is waiting for connections.
331      */
332     public void setPort(int port) {
333         this.port = port;
334     }
335 
336     /**
337      * Returns value of the <b>Port</b> property.
338      */
339     public int getPort() {
340         return port;
341     }
342 
343     /**
344      * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value
345      * representing the time to wait between each failed connection attempt to the
346      * server. The default value of this option is to 30 seconds.
347      *
348      * <p>
349      * Setting this option to zero turns off reconnection capability.
350      */
351     public void setReconnectionDelay(Duration delay) {
352         this.reconnectionDelay = delay;
353     }
354 
355     /**
356      * Returns value of the <b>reconnectionDelay</b> property.
357      */
358     public Duration getReconnectionDelay() {
359         return reconnectionDelay;
360     }
361 
362     /**
363      * The <b>queueSize</b> property takes a non-negative integer representing the
364      * number of logging events to retain for delivery to the remote receiver. When
365      * the deque size is zero, event delivery to the remote receiver is synchronous.
366      * When the deque size is greater than zero, the {@link #append(Object)} method
367      * returns immediately after enqueing the event, assuming that there is space
368      * available in the deque. Using a non-zero deque length can improve performance
369      * by eliminating delays caused by transient network delays.
370      * 
371      * @param queueSize the deque size to set.
372      */
373     public void setQueueSize(int queueSize) {
374         this.queueSize = queueSize;
375     }
376 
377     /**
378      * Returns the value of the <b>queueSize</b> property.
379      */
380     public int getQueueSize() {
381         return queueSize;
382     }
383 
384     /**
385      * The <b>eventDelayLimit</b> takes a non-negative integer representing the
386      * number of milliseconds to allow the appender to block if the underlying
387      * BlockingQueue is full. Once this limit is reached, the event is dropped.
388      *
389      * @param eventDelayLimit the event delay limit
390      */
391     public void setEventDelayLimit(Duration eventDelayLimit) {
392         this.eventDelayLimit = eventDelayLimit;
393     }
394 
395     /**
396      * Returns the value of the <b>eventDelayLimit</b> property.
397      */
398     public Duration getEventDelayLimit() {
399         return eventDelayLimit;
400     }
401 
402     /**
403      * Sets the timeout that controls how long we'll wait for the remote peer to
404      * accept our connection attempt.
405      * <p>
406      * This property is configurable primarily to support instrumentation for unit
407      * testing.
408      * 
409      * @param acceptConnectionTimeout timeout value in milliseconds
410      */
411     void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
412         this.acceptConnectionTimeout = acceptConnectionTimeout;
413     }
414 
415 }