001/**
002 * Logback: the reliable, generic, fast and flexible logging framework.
003 * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
004 *
005 * This program and the accompanying materials are dual-licensed under
006 * either the terms of the Eclipse Public License v1.0 as published by
007 * the Eclipse Foundation
008 *
009 *   or (per the licensee's choosing)
010 *
011 * under the terms of the GNU Lesser General Public License version 2.1
012 * as published by the Free Software Foundation.
013 */
014// Contributors: Dan MacDonald <dan@redknee.com>
015package ch.qos.logback.core.net;
016
017import java.io.IOException;
018import java.io.Serializable;
019import java.net.ConnectException;
020import java.net.InetAddress;
021import java.net.Socket;
022import java.net.UnknownHostException;
023import java.util.concurrent.BlockingDeque;
024import java.util.concurrent.Future;
025import java.util.concurrent.TimeUnit;
026
027import javax.net.SocketFactory;
028
029import ch.qos.logback.core.AppenderBase;
030import ch.qos.logback.core.spi.PreSerializationTransformer;
031import ch.qos.logback.core.util.CloseUtil;
032import ch.qos.logback.core.util.Duration;
033
034/**
035 * An abstract base for module specific {@code SocketAppender} implementations
036 * in other logback modules.
037 * 
038 * @author Ceki G&uuml;lc&uuml;
039 * @author S&eacute;bastien Pennec
040 * @author Carl Harris
041 * @author Sebastian Gr&ouml;bler
042 */
043
044public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler {
045
046    /**
047     * The default port number of remote logging server (4560).
048     */
049    public static final int DEFAULT_PORT = 4560;
050
051    /**
052     * The default reconnection delay (30000 milliseconds or 30 seconds).
053     */
054    public static final int DEFAULT_RECONNECTION_DELAY = 30000;
055
056    /**
057     * Default size of the deque used to hold logging events that are destined for
058     * the remote peer.
059     */
060    public static final int DEFAULT_QUEUE_SIZE = 128;
061
062    /**
063     * Default timeout when waiting for the remote server to accept our connection.
064     */
065    private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
066
067    /**
068     * Default timeout for how long to wait when inserting an event into the
069     * BlockingQueue.
070     */
071    private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100;
072
073    private final ObjectWriterFactory objectWriterFactory;
074    private final QueueFactory queueFactory;
075
076    private String remoteHost;
077    private int port = DEFAULT_PORT;
078    private InetAddress address;
079    private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY);
080    private int queueSize = DEFAULT_QUEUE_SIZE;
081    private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
082    private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT);
083
084    private BlockingDeque<E> deque;
085    private String peerId;
086    private SocketConnector connector;
087    private Future<?> task;
088
089    private volatile Socket socket;
090
091    /**
092     * Constructs a new appender.
093     */
094    protected AbstractSocketAppender() {
095        this(new QueueFactory(), new ObjectWriterFactory());
096    }
097
098    /**
099     * Constructs a new appender using the given {@link QueueFactory} and
100     * {@link ObjectWriterFactory}.
101     */
102    AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) {
103        this.objectWriterFactory = objectWriterFactory;
104        this.queueFactory = queueFactory;
105    }
106
107    /**
108     * {@inheritDoc}
109     */
110    public void start() {
111        if (isStarted())
112            return;
113        int errorCount = 0;
114        if (port <= 0) {
115            errorCount++;
116            addError("No port was configured for appender" + name
117                    + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port");
118        }
119
120        if (remoteHost == null) {
121            errorCount++;
122            addError("No remote host was configured for appender" + name
123                    + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host");
124        }
125
126        if (queueSize == 0) {
127            addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing");
128        }
129
130        if (queueSize < 0) {
131            errorCount++;
132            addError("Queue size must be greater than zero");
133        }
134
135        if (errorCount == 0) {
136            try {
137                address = InetAddress.getByName(remoteHost);
138            } catch (UnknownHostException ex) {
139                addError("unknown host: " + remoteHost);
140                errorCount++;
141            }
142        }
143
144        if (errorCount == 0) {
145            deque = queueFactory.newLinkedBlockingDeque(queueSize);
146            peerId = "remote peer " + remoteHost + ":" + port + ": ";
147            connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds());
148            task = getContext().getExecutorService().submit(new Runnable() {
149                @Override
150                public void run() {
151                    connectSocketAndDispatchEvents();
152                }
153            });
154            super.start();
155        }
156    }
157
158    /**
159     * {@inheritDoc}
160     */
161    @Override
162    public void stop() {
163        if (!isStarted())
164            return;
165        CloseUtil.closeQuietly(socket);
166        task.cancel(true);
167        super.stop();
168    }
169
170    /**
171     * {@inheritDoc}
172     */
173    @Override
174    protected void append(E event) {
175        if (event == null || !isStarted())
176            return;
177
178        try {
179            final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS);
180            if (!inserted) {
181                addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded");
182            }
183        } catch (InterruptedException e) {
184            addError("Interrupted while appending event to SocketAppender", e);
185        }
186    }
187
188    private void connectSocketAndDispatchEvents() {
189        try {
190            while (socketConnectionCouldBeEstablished()) {
191                try {
192                    ObjectWriter objectWriter = createObjectWriterForSocket();
193                    addInfo(peerId + "connection established");
194                    dispatchEvents(objectWriter);
195                } catch (javax.net.ssl.SSLHandshakeException she) {
196                    // FIXME
197                    Thread.sleep(DEFAULT_RECONNECTION_DELAY);
198                } catch (IOException ex) {
199                    addInfo(peerId + "connection failed: ", ex);
200                } finally {
201                    CloseUtil.closeQuietly(socket);
202                    socket = null;
203                    addInfo(peerId + "connection closed");
204                }
205            }
206        } catch (InterruptedException ex) {
207            assert true; // ok... we'll exit now
208        }
209        addInfo("shutting down");
210    }
211
212    private boolean socketConnectionCouldBeEstablished() throws InterruptedException {
213        return (socket = connector.call()) != null;
214    }
215
216    private ObjectWriter createObjectWriterForSocket() throws IOException {
217        socket.setSoTimeout(acceptConnectionTimeout);
218        ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream());
219        socket.setSoTimeout(0);
220        return objectWriter;
221    }
222
223    private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) {
224        SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
225        connector.setExceptionHandler(this);
226        connector.setSocketFactory(getSocketFactory());
227        return connector;
228    }
229
230    private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException {
231        while (true) {
232            E event = deque.takeFirst();
233            postProcessEvent(event);
234            Serializable serializableEvent = getPST().transform(event);
235            try {
236                objectWriter.write(serializableEvent);
237            } catch (IOException e) {
238                tryReAddingEventToFrontOfQueue(event);
239                throw e;
240            }
241        }
242    }
243
244    private void tryReAddingEventToFrontOfQueue(E event) {
245        final boolean wasInserted = deque.offerFirst(event);
246        if (!wasInserted) {
247            addInfo("Dropping event due to socket connection error and maxed out deque capacity");
248        }
249    }
250
251    /**
252     * {@inheritDoc}
253     */
254    public void connectionFailed(SocketConnector connector, Exception ex) {
255        if (ex instanceof InterruptedException) {
256            addInfo("connector interrupted");
257        } else if (ex instanceof ConnectException) {
258            addInfo(peerId + "connection refused");
259        } else {
260            addInfo(peerId + ex);
261        }
262    }
263
264    /**
265     * Creates a new {@link SocketConnector}.
266     * <p>
267     * The default implementation creates an instance of
268     * {@link DefaultSocketConnector}. A subclass may override to provide a
269     * different {@link SocketConnector} implementation.
270     * 
271     * @param address      target remote address
272     * @param port         target remote port
273     * @param initialDelay delay before the first connection attempt
274     * @param retryDelay   delay before a reconnection attempt
275     * @return socket connector
276     */
277    protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) {
278        return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
279    }
280
281    /**
282     * Gets the default {@link SocketFactory} for the platform.
283     * <p>
284     * Subclasses may override to provide a custom socket factory.
285     */
286    protected SocketFactory getSocketFactory() {
287        return SocketFactory.getDefault();
288    }
289
290    /**
291     * Post-processes an event before it is serialized for delivery to the remote
292     * receiver.
293     * 
294     * @param event the event to post-process
295     */
296    protected abstract void postProcessEvent(E event);
297
298    /**
299     * Get the pre-serialization transformer that will be used to transform each
300     * event into a Serializable object before delivery to the remote receiver.
301     * 
302     * @return transformer object
303     */
304    protected abstract PreSerializationTransformer<E> getPST();
305
306    /**
307     * The <b>RemoteHost</b> property takes the name of the host where a
308     * corresponding server is running.
309     */
310    public void setRemoteHost(String host) {
311        remoteHost = host;
312    }
313
314    /**
315     * Returns value of the <b>RemoteHost</b> property.
316     */
317    public String getRemoteHost() {
318        return remoteHost;
319    }
320
321    /**
322     * The <b>Port</b> property takes a positive integer representing the port where
323     * the server is waiting for connections.
324     */
325    public void setPort(int port) {
326        this.port = port;
327    }
328
329    /**
330     * Returns value of the <b>Port</b> property.
331     */
332    public int getPort() {
333        return port;
334    }
335
336    /**
337     * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value
338     * representing the time to wait between each failed connection attempt to the
339     * server. The default value of this option is to 30 seconds.
340     *
341     * <p>
342     * Setting this option to zero turns off reconnection capability.
343     */
344    public void setReconnectionDelay(Duration delay) {
345        this.reconnectionDelay = delay;
346    }
347
348    /**
349     * Returns value of the <b>reconnectionDelay</b> property.
350     */
351    public Duration getReconnectionDelay() {
352        return reconnectionDelay;
353    }
354
355    /**
356     * The <b>queueSize</b> property takes a non-negative integer representing the
357     * number of logging events to retain for delivery to the remote receiver. When
358     * the deque size is zero, event delivery to the remote receiver is synchronous.
359     * When the deque size is greater than zero, the {@link #append(Object)} method
360     * returns immediately after enqueing the event, assuming that there is space
361     * available in the deque. Using a non-zero deque length can improve performance
362     * by eliminating delays caused by transient network delays.
363     * 
364     * @param queueSize the deque size to set.
365     */
366    public void setQueueSize(int queueSize) {
367        this.queueSize = queueSize;
368    }
369
370    /**
371     * Returns the value of the <b>queueSize</b> property.
372     */
373    public int getQueueSize() {
374        return queueSize;
375    }
376
377    /**
378     * The <b>eventDelayLimit</b> takes a non-negative integer representing the
379     * number of milliseconds to allow the appender to block if the underlying
380     * BlockingQueue is full. Once this limit is reached, the event is dropped.
381     *
382     * @param eventDelayLimit the event delay limit
383     */
384    public void setEventDelayLimit(Duration eventDelayLimit) {
385        this.eventDelayLimit = eventDelayLimit;
386    }
387
388    /**
389     * Returns the value of the <b>eventDelayLimit</b> property.
390     */
391    public Duration getEventDelayLimit() {
392        return eventDelayLimit;
393    }
394
395    /**
396     * Sets the timeout that controls how long we'll wait for the remote peer to
397     * accept our connection attempt.
398     * <p>
399     * This property is configurable primarily to support instrumentation for unit
400     * testing.
401     * 
402     * @param acceptConnectionTimeout timeout value in milliseconds
403     */
404    void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
405        this.acceptConnectionTimeout = acceptConnectionTimeout;
406    }
407
408}