001/** 002 * Logback: the reliable, generic, fast and flexible logging framework. 003 * Copyright (C) 1999-2015, QOS.ch. All rights reserved. 004 * 005 * This program and the accompanying materials are dual-licensed under 006 * either the terms of the Eclipse Public License v1.0 as published by 007 * the Eclipse Foundation 008 * 009 * or (per the licensee's choosing) 010 * 011 * under the terms of the GNU Lesser General Public License version 2.1 012 * as published by the Free Software Foundation. 013 */ 014// Contributors: Dan MacDonald <dan@redknee.com> 015package ch.qos.logback.core.net; 016 017import java.io.IOException; 018import java.io.Serializable; 019import java.net.ConnectException; 020import java.net.InetAddress; 021import java.net.Socket; 022import java.net.UnknownHostException; 023import java.util.concurrent.BlockingDeque; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026 027import javax.net.SocketFactory; 028 029import ch.qos.logback.core.AppenderBase; 030import ch.qos.logback.core.spi.PreSerializationTransformer; 031import ch.qos.logback.core.util.CloseUtil; 032import ch.qos.logback.core.util.Duration; 033 034/** 035 * An abstract base for module specific {@code SocketAppender} implementations 036 * in other logback modules. 037 * 038 * @author Ceki Gülcü 039 * @author Sébastien Pennec 040 * @author Carl Harris 041 * @author Sebastian Gröbler 042 */ 043 044public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler { 045 046 /** 047 * The default port number of remote logging server (4560). 048 */ 049 public static final int DEFAULT_PORT = 4560; 050 051 /** 052 * The default reconnection delay (30000 milliseconds or 30 seconds). 053 */ 054 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 055 056 /** 057 * Default size of the deque used to hold logging events that are destined for 058 * the remote peer. 059 */ 060 public static final int DEFAULT_QUEUE_SIZE = 128; 061 062 /** 063 * Default timeout when waiting for the remote server to accept our connection. 064 */ 065 private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000; 066 067 /** 068 * Default timeout for how long to wait when inserting an event into the 069 * BlockingQueue. 070 */ 071 private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100; 072 073 private final ObjectWriterFactory objectWriterFactory; 074 private final QueueFactory queueFactory; 075 076 private String remoteHost; 077 private int port = DEFAULT_PORT; 078 private InetAddress address; 079 private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY); 080 private int queueSize = DEFAULT_QUEUE_SIZE; 081 private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY; 082 private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT); 083 084 private BlockingDeque<E> deque; 085 private String peerId; 086 private SocketConnector connector; 087 private Future<?> task; 088 089 private volatile Socket socket; 090 091 /** 092 * Constructs a new appender. 093 */ 094 protected AbstractSocketAppender() { 095 this(new QueueFactory(), new ObjectWriterFactory()); 096 } 097 098 /** 099 * Constructs a new appender using the given {@link QueueFactory} and 100 * {@link ObjectWriterFactory}. 101 */ 102 AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) { 103 this.objectWriterFactory = objectWriterFactory; 104 this.queueFactory = queueFactory; 105 } 106 107 /** 108 * {@inheritDoc} 109 */ 110 public void start() { 111 if (isStarted()) 112 return; 113 int errorCount = 0; 114 if (port <= 0) { 115 errorCount++; 116 addError("No port was configured for appender" + name 117 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port"); 118 } 119 120 if (remoteHost == null) { 121 errorCount++; 122 addError("No remote host was configured for appender" + name 123 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host"); 124 } 125 126 if (queueSize == 0) { 127 addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing"); 128 } 129 130 if (queueSize < 0) { 131 errorCount++; 132 addError("Queue size must be greater than zero"); 133 } 134 135 if (errorCount == 0) { 136 try { 137 address = InetAddress.getByName(remoteHost); 138 } catch (UnknownHostException ex) { 139 addError("unknown host: " + remoteHost); 140 errorCount++; 141 } 142 } 143 144 if (errorCount == 0) { 145 deque = queueFactory.newLinkedBlockingDeque(queueSize); 146 peerId = "remote peer " + remoteHost + ":" + port + ": "; 147 connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds()); 148 task = getContext().getExecutorService().submit(new Runnable() { 149 @Override 150 public void run() { 151 connectSocketAndDispatchEvents(); 152 } 153 }); 154 super.start(); 155 } 156 } 157 158 /** 159 * {@inheritDoc} 160 */ 161 @Override 162 public void stop() { 163 if (!isStarted()) 164 return; 165 CloseUtil.closeQuietly(socket); 166 task.cancel(true); 167 super.stop(); 168 } 169 170 /** 171 * {@inheritDoc} 172 */ 173 @Override 174 protected void append(E event) { 175 if (event == null || !isStarted()) 176 return; 177 178 try { 179 final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS); 180 if (!inserted) { 181 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded"); 182 } 183 } catch (InterruptedException e) { 184 addError("Interrupted while appending event to SocketAppender", e); 185 } 186 } 187 188 private void connectSocketAndDispatchEvents() { 189 try { 190 while (socketConnectionCouldBeEstablished()) { 191 try { 192 ObjectWriter objectWriter = createObjectWriterForSocket(); 193 addInfo(peerId + "connection established"); 194 dispatchEvents(objectWriter); 195 } catch (javax.net.ssl.SSLHandshakeException she) { 196 // FIXME 197 Thread.sleep(DEFAULT_RECONNECTION_DELAY); 198 } catch (IOException ex) { 199 addInfo(peerId + "connection failed: ", ex); 200 } finally { 201 CloseUtil.closeQuietly(socket); 202 socket = null; 203 addInfo(peerId + "connection closed"); 204 } 205 } 206 } catch (InterruptedException ex) { 207 assert true; // ok... we'll exit now 208 } 209 addInfo("shutting down"); 210 } 211 212 private boolean socketConnectionCouldBeEstablished() throws InterruptedException { 213 return (socket = connector.call()) != null; 214 } 215 216 private ObjectWriter createObjectWriterForSocket() throws IOException { 217 socket.setSoTimeout(acceptConnectionTimeout); 218 ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream()); 219 socket.setSoTimeout(0); 220 return objectWriter; 221 } 222 223 private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) { 224 SocketConnector connector = newConnector(address, port, initialDelay, retryDelay); 225 connector.setExceptionHandler(this); 226 connector.setSocketFactory(getSocketFactory()); 227 return connector; 228 } 229 230 private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException { 231 while (true) { 232 E event = deque.takeFirst(); 233 postProcessEvent(event); 234 Serializable serializableEvent = getPST().transform(event); 235 try { 236 objectWriter.write(serializableEvent); 237 } catch (IOException e) { 238 tryReAddingEventToFrontOfQueue(event); 239 throw e; 240 } 241 } 242 } 243 244 private void tryReAddingEventToFrontOfQueue(E event) { 245 final boolean wasInserted = deque.offerFirst(event); 246 if (!wasInserted) { 247 addInfo("Dropping event due to socket connection error and maxed out deque capacity"); 248 } 249 } 250 251 /** 252 * {@inheritDoc} 253 */ 254 public void connectionFailed(SocketConnector connector, Exception ex) { 255 if (ex instanceof InterruptedException) { 256 addInfo("connector interrupted"); 257 } else if (ex instanceof ConnectException) { 258 addInfo(peerId + "connection refused"); 259 } else { 260 addInfo(peerId + ex); 261 } 262 } 263 264 /** 265 * Creates a new {@link SocketConnector}. 266 * <p> 267 * The default implementation creates an instance of 268 * {@link DefaultSocketConnector}. A subclass may override to provide a 269 * different {@link SocketConnector} implementation. 270 * 271 * @param address target remote address 272 * @param port target remote port 273 * @param initialDelay delay before the first connection attempt 274 * @param retryDelay delay before a reconnection attempt 275 * @return socket connector 276 */ 277 protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) { 278 return new DefaultSocketConnector(address, port, initialDelay, retryDelay); 279 } 280 281 /** 282 * Gets the default {@link SocketFactory} for the platform. 283 * <p> 284 * Subclasses may override to provide a custom socket factory. 285 */ 286 protected SocketFactory getSocketFactory() { 287 return SocketFactory.getDefault(); 288 } 289 290 /** 291 * Post-processes an event before it is serialized for delivery to the remote 292 * receiver. 293 * 294 * @param event the event to post-process 295 */ 296 protected abstract void postProcessEvent(E event); 297 298 /** 299 * Get the pre-serialization transformer that will be used to transform each 300 * event into a Serializable object before delivery to the remote receiver. 301 * 302 * @return transformer object 303 */ 304 protected abstract PreSerializationTransformer<E> getPST(); 305 306 /** 307 * The <b>RemoteHost</b> property takes the name of the host where a 308 * corresponding server is running. 309 */ 310 public void setRemoteHost(String host) { 311 remoteHost = host; 312 } 313 314 /** 315 * Returns value of the <b>RemoteHost</b> property. 316 */ 317 public String getRemoteHost() { 318 return remoteHost; 319 } 320 321 /** 322 * The <b>Port</b> property takes a positive integer representing the port where 323 * the server is waiting for connections. 324 */ 325 public void setPort(int port) { 326 this.port = port; 327 } 328 329 /** 330 * Returns value of the <b>Port</b> property. 331 */ 332 public int getPort() { 333 return port; 334 } 335 336 /** 337 * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value 338 * representing the time to wait between each failed connection attempt to the 339 * server. The default value of this option is to 30 seconds. 340 * 341 * <p> 342 * Setting this option to zero turns off reconnection capability. 343 */ 344 public void setReconnectionDelay(Duration delay) { 345 this.reconnectionDelay = delay; 346 } 347 348 /** 349 * Returns value of the <b>reconnectionDelay</b> property. 350 */ 351 public Duration getReconnectionDelay() { 352 return reconnectionDelay; 353 } 354 355 /** 356 * The <b>queueSize</b> property takes a non-negative integer representing the 357 * number of logging events to retain for delivery to the remote receiver. When 358 * the deque size is zero, event delivery to the remote receiver is synchronous. 359 * When the deque size is greater than zero, the {@link #append(Object)} method 360 * returns immediately after enqueing the event, assuming that there is space 361 * available in the deque. Using a non-zero deque length can improve performance 362 * by eliminating delays caused by transient network delays. 363 * 364 * @param queueSize the deque size to set. 365 */ 366 public void setQueueSize(int queueSize) { 367 this.queueSize = queueSize; 368 } 369 370 /** 371 * Returns the value of the <b>queueSize</b> property. 372 */ 373 public int getQueueSize() { 374 return queueSize; 375 } 376 377 /** 378 * The <b>eventDelayLimit</b> takes a non-negative integer representing the 379 * number of milliseconds to allow the appender to block if the underlying 380 * BlockingQueue is full. Once this limit is reached, the event is dropped. 381 * 382 * @param eventDelayLimit the event delay limit 383 */ 384 public void setEventDelayLimit(Duration eventDelayLimit) { 385 this.eventDelayLimit = eventDelayLimit; 386 } 387 388 /** 389 * Returns the value of the <b>eventDelayLimit</b> property. 390 */ 391 public Duration getEventDelayLimit() { 392 return eventDelayLimit; 393 } 394 395 /** 396 * Sets the timeout that controls how long we'll wait for the remote peer to 397 * accept our connection attempt. 398 * <p> 399 * This property is configurable primarily to support instrumentation for unit 400 * testing. 401 * 402 * @param acceptConnectionTimeout timeout value in milliseconds 403 */ 404 void setAcceptConnectionTimeout(int acceptConnectionTimeout) { 405 this.acceptConnectionTimeout = acceptConnectionTimeout; 406 } 407 408}