001/*
002 * Logback: the reliable, generic, fast and flexible logging framework.
003 * Copyright (C) 1999-2026, QOS.ch. All rights reserved.
004 *
005 * This program and the accompanying materials are dual-licensed under
006 * either the terms of the Eclipse Public License v2.0 as published by
007 * the Eclipse Foundation
008 *
009 *   or (per the licensee's choosing)
010 *
011 * under the terms of the GNU Lesser General Public License version 2.1
012 * as published by the Free Software Foundation.
013 */
014// Contributors: Dan MacDonald <dan@redknee.com>
015package ch.qos.logback.core.net;
016
017import java.io.IOException;
018import java.io.Serializable;
019import java.net.ConnectException;
020import java.net.InetAddress;
021import java.net.Socket;
022import java.net.UnknownHostException;
023import java.util.concurrent.BlockingDeque;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026
027import javax.net.SocketFactory;
028
029import ch.qos.logback.core.AppenderBase;
030import ch.qos.logback.core.spi.DeferredProcessingAware;
031import ch.qos.logback.core.spi.PreSerializationTransformer;
032import ch.qos.logback.core.util.CloseUtil;
033import ch.qos.logback.core.util.Duration;
034
035/**
036 * An abstract base for module specific {@code SocketAppender} implementations
037 * in other logback modules.
038 * 
039 * @author Ceki G&uuml;lc&uuml;
040 * @author S&eacute;bastien Pennec
041 * @author Carl Harris
042 * @author Sebastian Gr&ouml;bler
043 */
044
045public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
046
047    /**
048     * The default port number of remote logging server (4560).
049     */
050    public static final int DEFAULT_PORT = 4560;
051
052    /**
053     * The default reconnection delay (30000 milliseconds or 30 seconds).
054     */
055    public static final int DEFAULT_RECONNECTION_DELAY = 30000;
056
057    /**
058     * Default size of the deque used to hold logging events that are destined for
059     * the remote peer.
060     */
061    public static final int DEFAULT_QUEUE_SIZE = 128;
062
063    /**
064     * Default timeout when waiting for the remote server to accept our connection.
065     */
066    private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
067
068    /**
069     * Default timeout for how long to wait when inserting an event into the
070     * BlockingQueue.
071     */
072    private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
073
074    private final ObjectWriterFactory objectWriterFactory;
075    private final QueueFactory queueFactory;
076
077    private String remoteHost;
078    private int port = DEFAULT_PORT;
079    private InetAddress address;
080    private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
081    private int queueSize = DEFAULT_QUEUE_SIZE;
082    private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
083    private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
084
085    private BlockingDeque<E> deque;
086    private String peerId;
087    private SocketConnector connector;
088    private Future<?> task;
089
090    private volatile Socket socket;
091
092    /**
093     * Constructs a new appender.
094     */
095    protected AbstractSocketAppender() {
096        this(new QueueFactory(), new ObjectWriterFactory());
097    }
098
099    /**
100     * Constructs a new appender using the given {@link QueueFactory} and
101     * {@link ObjectWriterFactory}.
102     */
103    AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
104        this.objectWriterFactory = objectWriterFactory;
105        this.queueFactory = queueFactory;
106    }
107
108    /**
109     * {@inheritDoc}
110     */
111    public void start() {
112        if (isStarted())
113            return;
114        int errorCount = 0;
115        if (port <= 0) {
116            errorCount++;
117            addError("No port was configured for appender" + name
118                    + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
119        }
120
121        if (remoteHost == null) {
122            errorCount++;
123            addError("No remote host was configured for appender" + name
124                    + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
125        }
126
127        if (queueSize == 0) {
128            addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
129        }
130
131        if (queueSize < 0) {
132            errorCount++;
133            addError("Queue size must be greater than zero");
134        }
135
136        if (errorCount == 0) {
137            try {
138                address = InetAddress.getByName(remoteHost);
139            } catch (UnknownHostException ex) {
140                addError("unknown host: " + remoteHost);
141                errorCount++;
142            }
143        }
144
145        if (errorCount == 0) {
146            deque = queueFactory.newLinkedBlockingDeque(queueSize);
147            peerId = "remote peer " + remoteHost + ":" + port + ": ";
148            connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
149            task = getContext().getExecutorService().submit(new Runnable() {
150                @Override
151                public void run() {
152                    connectSocketAndDispatchEvents();
153                }
154            });
155            super.start();
156        }
157    }
158
159    /**
160     * {@inheritDoc}
161     */
162    @Override
163    public void stop() {
164        if (!isStarted())
165            return;
166        CloseUtil.closeQuietly(socket);
167        task.cancel(true);
168        super.stop();
169    }
170
171    /**
172     * {@inheritDoc}
173     */
174    @Override
175    protected void append(E event) {
176        if (event == null || !isStarted())
177            return;
178
179        try {
180
181            // otherwise MDC information is not transferred. See also logback/issues/1010
182            if(event instanceof DeferredProcessingAware) {
183                ((DeferredProcessingAware) event).prepareForDeferredProcessing();
184            }
185
186            final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
187            if (!inserted) {
188                addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
189            }
190        } catch (InterruptedException e) {
191            addError("Interrupted while appending event to SocketAppender", e);
192        }
193    }
194
195    private void connectSocketAndDispatchEvents() {
196        try {
197            while (socketConnectionCouldBeEstablished()) {
198                try {
199                    ObjectWriter objectWriter = createObjectWriterForSocket();
200                    addInfo(peerId + "connection established");
201                    dispatchEvents(objectWriter);
202                } catch (javax.net.ssl.SSLHandshakeException she) {
203                    // FIXME
204                    Thread.sleep(DEFAULT_RECONNECTION_DELAY);
205                } catch (IOException ex) {
206                    addInfo(peerId + "connection failed: ", ex);
207                } finally {
208                    CloseUtil.closeQuietly(socket);
209                    socket = null;
210                    addInfo(peerId + "connection closed");
211                }
212            }
213        } catch (InterruptedException ex) {
214            assert true; // ok... we'll exit now
215        }
216        addInfo("shutting down");
217    }
218
219    private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
220        return (socket = connector.call()) != null;
221    }
222
223    private ObjectWriter createObjectWriterForSocket() throws IOException {
224        socket.setSoTimeout(acceptConnectionTimeout);
225        ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
226        socket.setSoTimeout(0);
227        return objectWriter;
228    }
229
230    private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
231        SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
232        connector.setExceptionHandler(this);
233        connector.setSocketFactory(getSocketFactory());
234        return connector;
235    }
236
237    private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
238        while (true) {
239            E event = deque.takeFirst();
240            postProcessEvent(event);
241            Serializable serializableEvent = getPST().transform(event);
242            try {
243                objectWriter.write(serializableEvent);
244            } catch (IOException e) {
245                tryReAddingEventToFrontOfQueue(event);
246                throw e;
247            }
248        }
249    }
250
251    private void tryReAddingEventToFrontOfQueue(E event) {
252        final boolean wasInserted = deque.offerFirst(event);
253        if (!wasInserted) {
254            addInfo("Dropping event due to socket connection error and maxed out deque capacity");
255        }
256    }
257
258    /**
259     * {@inheritDoc}
260     */
261    public void connectionFailed(SocketConnector connector, Exception ex) {
262        if (ex instanceof InterruptedException) {
263            addInfo("connector interrupted");
264        } else if (ex instanceof ConnectException) {
265            addInfo(peerId + "connection refused");
266        } else {
267            addInfo(peerId + ex);
268        }
269    }
270
271    /**
272     * Creates a new {@link SocketConnector}.
273     * <p>
274     * The default implementation creates an instance of
275     * {@link DefaultSocketConnector}. A subclass may override to provide a
276     * different {@link SocketConnector} implementation.
277     * 
278     * @param address      target remote address
279     * @param port         target remote port
280     * @param initialDelay delay before the first connection attempt
281     * @param retryDelay   delay before a reconnection attempt
282     * @return socket connector
283     */
284    protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
285        return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
286    }
287
288    /**
289     * Gets the default {@link SocketFactory} for the platform.
290     * <p>
291     * Subclasses may override to provide a custom socket factory.
292     */
293    protected SocketFactory getSocketFactory() {
294        return SocketFactory.getDefault();
295    }
296
297    /**
298     * Post-processes an event before it is serialized for delivery to the remote
299     * receiver.
300     * 
301     * @param event the event to post-process
302     */
303    protected abstract void postProcessEvent(E event);
304
305    /**
306     * Get the pre-serialization transformer that will be used to transform each
307     * event into a Serializable object before delivery to the remote receiver.
308     * 
309     * @return transformer object
310     */
311    protected abstract PreSerializationTransformer<E> getPST();
312
313    /**
314     * The <b>RemoteHost</b> property takes the name of the host where a
315     * corresponding server is running.
316     */
317    public void setRemoteHost(String host) {
318        remoteHost = host;
319    }
320
321    /**
322     * Returns value of the <b>RemoteHost</b> property.
323     */
324    public String getRemoteHost() {
325        return remoteHost;
326    }
327
328    /**
329     * The <b>Port</b> property takes a positive integer representing the port where
330     * the server is waiting for connections.
331     */
332    public void setPort(int port) {
333        this.port = port;
334    }
335
336    /**
337     * Returns value of the <b>Port</b> property.
338     */
339    public int getPort() {
340        return port;
341    }
342
343    /**
344     * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value
345     * representing the time to wait between each failed connection attempt to the
346     * server. The default value of this option is to 30 seconds.
347     *
348     * <p>
349     * Setting this option to zero turns off reconnection capability.
350     */
351    public void setReconnectionDelay(Duration delay) {
352        this.reconnectionDelay = delay;
353    }
354
355    /**
356     * Returns value of the <b>reconnectionDelay</b> property.
357     */
358    public Duration getReconnectionDelay() {
359        return reconnectionDelay;
360    }
361
362    /**
363     * The <b>queueSize</b> property takes a non-negative integer representing the
364     * number of logging events to retain for delivery to the remote receiver. When
365     * the deque size is zero, event delivery to the remote receiver is synchronous.
366     * When the deque size is greater than zero, the {@link #append(Object)} method
367     * returns immediately after enqueing the event, assuming that there is space
368     * available in the deque. Using a non-zero deque length can improve performance
369     * by eliminating delays caused by transient network delays.
370     * 
371     * @param queueSize the deque size to set.
372     */
373    public void setQueueSize(int queueSize) {
374        this.queueSize = queueSize;
375    }
376
377    /**
378     * Returns the value of the <b>queueSize</b> property.
379     */
380    public int getQueueSize() {
381        return queueSize;
382    }
383
384    /**
385     * The <b>eventDelayLimit</b> takes a non-negative integer representing the
386     * number of milliseconds to allow the appender to block if the underlying
387     * BlockingQueue is full. Once this limit is reached, the event is dropped.
388     *
389     * @param eventDelayLimit the event delay limit
390     */
391    public void setEventDelayLimit(Duration eventDelayLimit) {
392        this.eventDelayLimit = eventDelayLimit;
393    }
394
395    /**
396     * Returns the value of the <b>eventDelayLimit</b> property.
397     */
398    public Duration getEventDelayLimit() {
399        return eventDelayLimit;
400    }
401
402    /**
403     * Sets the timeout that controls how long we'll wait for the remote peer to
404     * accept our connection attempt.
405     * <p>
406     * This property is configurable primarily to support instrumentation for unit
407     * testing.
408     * 
409     * @param acceptConnectionTimeout timeout value in milliseconds
410     */
411    void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
412        this.acceptConnectionTimeout = acceptConnectionTimeout;
413    }
414
415}