001/**
002 * Logback: the reliable, generic, fast and flexible logging framework.
003 * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
004 *
005 * This program and the accompanying materials are dual-licensed under
006 * either the terms of the Eclipse Public License v1.0 as published by
007 * the Eclipse Foundation
008 *
009 *   or (per the licensee's choosing)
010 *
011 * under the terms of the GNU Lesser General Public License version 2.1
012 * as published by the Free Software Foundation.
013 */
014package ch.qos.logback.classic.net;
015
016import java.io.EOFException;
017import java.io.IOException;
018import java.io.ObjectInputStream;
019import java.net.ConnectException;
020import java.net.InetAddress;
021import java.net.Socket;
022import java.net.UnknownHostException;
023import java.util.concurrent.ExecutionException;
024import java.util.concurrent.Future;
025import java.util.concurrent.RejectedExecutionException;
026
027import javax.net.SocketFactory;
028
029import ch.qos.logback.classic.Logger;
030import ch.qos.logback.classic.LoggerContext;
031import ch.qos.logback.classic.net.server.HardenedLoggingEventInputStream;
032import ch.qos.logback.classic.spi.ILoggingEvent;
033import ch.qos.logback.core.net.DefaultSocketConnector;
034import ch.qos.logback.core.net.AbstractSocketAppender;
035import ch.qos.logback.core.net.SocketConnector;
036import ch.qos.logback.core.util.CloseUtil;
037
038/**
039 * A component that receives serialized {@link ILoggingEvent} objects from a
040 * remote appender over a {@link Socket}.
041 *
042 * @author Carl Harris
043 */
044public class SocketReceiver extends ReceiverBase implements Runnable, SocketConnector.ExceptionHandler {
045
046    private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000;
047
048    private String remoteHost;
049    private InetAddress address;
050    private int port;
051    private int reconnectionDelay;
052    private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY;
053
054    private String receiverId;
055    private volatile Socket socket;
056    private Future<Socket> connectorTask;
057
058    /**
059     * {@inheritDoc}
060     */
061    protected boolean shouldStart() {
062        int errorCount = 0;
063        if (port == 0) {
064            errorCount++;
065            addError("No port was configured for receiver. "
066                    + "For more information, please visit http://logback.qos.ch/codes.html#receiver_no_port");
067        }
068
069        if (remoteHost == null) {
070            errorCount++;
071            addError("No host name or address was configured for receiver. "
072                    + "For more information, please visit http://logback.qos.ch/codes.html#receiver_no_host");
073        }
074
075        if (reconnectionDelay == 0) {
076            reconnectionDelay = AbstractSocketAppender.DEFAULT_RECONNECTION_DELAY;
077        }
078
079        if (errorCount == 0) {
080            try {
081                address = InetAddress.getByName(remoteHost);
082            } catch (UnknownHostException ex) {
083                addError("unknown host: " + remoteHost);
084                errorCount++;
085            }
086        }
087
088        if (errorCount == 0) {
089            receiverId = "receiver " + remoteHost + ":" + port + ": ";
090        }
091
092        return errorCount == 0;
093    }
094
095    /**
096     * {@inheritDoc}
097     */
098    protected void onStop() {
099        if (socket != null) {
100            CloseUtil.closeQuietly(socket);
101        }
102    }
103
104    @Override
105    protected Runnable getRunnableTask() {
106        return this;
107    }
108
109    /**
110     * {@inheritDoc}
111     */
112    public void run() {
113        try {
114            LoggerContext lc = (LoggerContext) getContext();
115            while (!Thread.currentThread().isInterrupted()) {
116                SocketConnector connector = createConnector(address, port, 0, reconnectionDelay);
117                connectorTask = activateConnector(connector);
118                if (connectorTask == null) {
119                    break;
120                }
121                socket = waitForConnectorToReturnASocket();
122                if (socket == null)
123                    break;
124                dispatchEvents(lc);
125            }
126        } catch (InterruptedException ex) {
127            assert true; // ok... we'll exit now
128        }
129        addInfo("shutting down");
130    }
131
132    private SocketConnector createConnector(InetAddress address, int port, int initialDelay, int retryDelay) {
133        SocketConnector connector = newConnector(address, port, initialDelay, retryDelay);
134        connector.setExceptionHandler(this);
135        connector.setSocketFactory(getSocketFactory());
136        return connector;
137    }
138
139    private Future<Socket> activateConnector(SocketConnector connector) {
140        try {
141            return getContext().getScheduledExecutorService().submit(connector);
142        } catch (RejectedExecutionException ex) {
143            return null;
144        }
145    }
146
147    private Socket waitForConnectorToReturnASocket() throws InterruptedException {
148        try {
149            Socket s = connectorTask.get();
150            connectorTask = null;
151            return s;
152        } catch (ExecutionException e) {
153            return null;
154        }
155    }
156
157    private void dispatchEvents(LoggerContext lc) {
158        ObjectInputStream ois = null;
159        try {
160            socket.setSoTimeout(acceptConnectionTimeout);
161            ois = new HardenedLoggingEventInputStream(socket.getInputStream());
162            socket.setSoTimeout(0);
163            addInfo(receiverId + "connection established");
164            while (true) {
165                ILoggingEvent event = (ILoggingEvent) ois.readObject();
166                Logger remoteLogger = lc.getLogger(event.getLoggerName());
167                if (remoteLogger.isEnabledFor(event.getLevel())) {
168                    remoteLogger.callAppenders(event);
169                }
170            }
171        } catch (EOFException ex) {
172            addInfo(receiverId + "end-of-stream detected");
173        } catch (IOException ex) {
174            addInfo(receiverId + "connection failed: " + ex);
175        } catch (ClassNotFoundException ex) {
176            addInfo(receiverId + "unknown event class: " + ex);
177        } finally {
178            CloseUtil.closeQuietly(ois);
179            CloseUtil.closeQuietly(socket);
180            socket = null;
181            addInfo(receiverId + "connection closed");
182        }
183    }
184
185    /**
186     * {@inheritDoc}
187     */
188    public void connectionFailed(SocketConnector connector, Exception ex) {
189        if (ex instanceof InterruptedException) {
190            addInfo("connector interrupted");
191        } else if (ex instanceof ConnectException) {
192            addInfo(receiverId + "connection refused");
193        } else {
194            addInfo(receiverId + ex);
195        }
196    }
197
198    protected SocketConnector newConnector(InetAddress address, int port, int initialDelay, int retryDelay) {
199        return new DefaultSocketConnector(address, port, initialDelay, retryDelay);
200    }
201
202    protected SocketFactory getSocketFactory() {
203        return SocketFactory.getDefault();
204    }
205
206    public void setRemoteHost(String remoteHost) {
207        this.remoteHost = remoteHost;
208    }
209
210    public void setPort(int port) {
211        this.port = port;
212    }
213
214    public void setReconnectionDelay(int reconnectionDelay) {
215        this.reconnectionDelay = reconnectionDelay;
216    }
217
218    public void setAcceptConnectionTimeout(int acceptConnectionTimeout) {
219        this.acceptConnectionTimeout = acceptConnectionTimeout;
220    }
221
222}