001/* 002 * Logback: the reliable, generic, fast and flexible logging framework. 003 * Copyright (C) 1999-2026, QOS.ch. All rights reserved. 004 * 005 * This program and the accompanying materials are dual-licensed under 006 * either the terms of the Eclipse Public License v2.0 as published by 007 * the Eclipse Foundation 008 * 009 * or (per the licensee's choosing) 010 * 011 * under the terms of the GNU Lesser General Public License version 2.1 012 * as published by the Free Software Foundation. 013 */ 014// Contributors: Dan MacDonald <dan@redknee.com> 015package ch.qos.logback.core.net; 016 017import java.io.IOException; 018import java.io.Serializable; 019import java.net.ConnectException; 020import java.net.InetAddress; 021import java.net.Socket; 022import java.net.UnknownHostException; 023import java.util.concurrent.BlockingDeque; 024import java.util.concurrent.Future; 025import java.util.concurrent.TimeUnit; 026 027import javax.net.SocketFactory; 028 029import ch.qos.logback.core.AppenderBase; 030import ch.qos.logback.core.spi.DeferredProcessingAware; 031import ch.qos.logback.core.spi.PreSerializationTransformer; 032import ch.qos.logback.core.util.CloseUtil; 033import ch.qos.logback.core.util.Duration; 034 035/** 036 * An abstract base for module specific {@code SocketAppender} implementations 037 * in other logback modules. 038 * 039 * @author Ceki Gülcü 040 * @author Sébastien Pennec 041 * @author Carl Harris 042 * @author Sebastian Gröbler 043 */ 044 045public abstract class AbstractSocketAppender<E> extends AppenderBase<E> implements SocketConnector.ExceptionHandler { 046 047 /** 048 * The default port number of remote logging server (4560). 049 */ 050 public static final int DEFAULT_PORT = 4560; 051 052 /** 053 * The default reconnection delay (30000 milliseconds or 30 seconds). 054 */ 055 public static final int DEFAULT_RECONNECTION_DELAY = 30000; 056 057 /** 058 * Default size of the deque used to hold logging events that are destined for 059 * the remote peer. 060 */ 061 public static final int DEFAULT_QUEUE_SIZE = 128; 062 063 /** 064 * Default timeout when waiting for the remote server to accept our connection. 065 */ 066 private static final int DEFAULT_ACCEPT_CONNECTION_DELAY = 5000; 067 068 /** 069 * Default timeout for how long to wait when inserting an event into the 070 * BlockingQueue. 071 */ 072 private static final int DEFAULT_EVENT_DELAY_TIMEOUT = 100; 073 074 private final ObjectWriterFactory objectWriterFactory; 075 private final QueueFactory queueFactory; 076 077 private String remoteHost; 078 private int port = DEFAULT_PORT; 079 private InetAddress address; 080 private Duration reconnectionDelay = new Duration(DEFAULT_RECONNECTION_DELAY); 081 private int queueSize = DEFAULT_QUEUE_SIZE; 082 private int acceptConnectionTimeout = DEFAULT_ACCEPT_CONNECTION_DELAY; 083 private Duration eventDelayLimit = new Duration(DEFAULT_EVENT_DELAY_TIMEOUT); 084 085 private BlockingDeque<E> deque; 086 private String peerId; 087 private SocketConnector connector; 088 private Future<?> task; 089 090 private volatile Socket socket; 091 092 /** 093 * Constructs a new appender. 094 */ 095 protected AbstractSocketAppender() { 096 this(new QueueFactory(), new ObjectWriterFactory()); 097 } 098 099 /** 100 * Constructs a new appender using the given {@link QueueFactory} and 101 * {@link ObjectWriterFactory}. 102 */ 103 AbstractSocketAppender(QueueFactory queueFactory, ObjectWriterFactory objectWriterFactory) { 104 this.objectWriterFactory = objectWriterFactory; 105 this.queueFactory = queueFactory; 106 } 107 108 /** 109 * {@inheritDoc} 110 */ 111 public void start() { 112 if (isStarted()) 113 return; 114 int errorCount = 0; 115 if (port <= 0) { 116 errorCount++; 117 addError("No port was configured for appender" + name 118 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_port"); 119 } 120 121 if (remoteHost == null) { 122 errorCount++; 123 addError("No remote host was configured for appender" + name 124 + " For more information, please visit http://logback.qos.ch/codes.html#socket_no_host"); 125 } 126 127 if (queueSize == 0) { 128 addWarn("Queue size of zero is deprecated, use a size of one to indicate synchronous processing"); 129 } 130 131 if (queueSize < 0) { 132 errorCount++; 133 addError("Queue size must be greater than zero"); 134 } 135 136 if (errorCount == 0) { 137 try { 138 address = InetAddress.getByName(remoteHost); 139 } catch (UnknownHostException ex) { 140 addError("unknown host: " + remoteHost); 141 errorCount++; 142 } 143 } 144 145 if (errorCount == 0) { 146 deque = queueFactory.newLinkedBlockingDeque(queueSize); 147 peerId = "remote peer " + remoteHost + ":" + port + ": "; 148 connector = createConnector(address, port, 0, reconnectionDelay.getMilliseconds()); 149 task = getContext().getExecutorService().submit(new Runnable() { 150 @Override 151 public void run() { 152 connectSocketAndDispatchEvents(); 153 } 154 }); 155 super.start(); 156 } 157 } 158 159 /** 160 * {@inheritDoc} 161 */ 162 @Override 163 public void stop() { 164 if (!isStarted()) 165 return; 166 CloseUtil.closeQuietly(socket); 167 task.cancel(true); 168 super.stop(); 169 } 170 171 /** 172 * {@inheritDoc} 173 */ 174 @Override 175 protected void append(E event) { 176 if (event == null || !isStarted()) 177 return; 178 179 try { 180 181 // otherwise MDC information is not transferred. See also logback/issues/1010 182 if(event instanceof DeferredProcessingAware) { 183 ((DeferredProcessingAware) event).prepareForDeferredProcessing(); 184 } 185 186 final boolean inserted = deque.offer(event, eventDelayLimit.getMilliseconds(), TimeUnit.MILLISECONDS); 187 if (!inserted) { 188 addInfo("Dropping event due to timeout limit of [" + eventDelayLimit + "] being exceeded"); 189 } 190 } catch (InterruptedException e) { 191 addError("Interrupted while appending event to SocketAppender", e); 192 } 193 } 194 195 private void connectSocketAndDispatchEvents() { 196 try { 197 while (socketConnectionCouldBeEstablished()) { 198 try { 199 ObjectWriter objectWriter = createObjectWriterForSocket(); 200 addInfo(peerId + "connection established"); 201 dispatchEvents(objectWriter); 202 } catch (javax.net.ssl.SSLHandshakeException she) { 203 // FIXME 204 Thread.sleep(DEFAULT_RECONNECTION_DELAY); 205 } catch (IOException ex) { 206 addInfo(peerId + "connection failed: ", ex); 207 } finally { 208 CloseUtil.closeQuietly(socket); 209 socket = null; 210 addInfo(peerId + "connection closed"); 211 } 212 } 213 } catch (InterruptedException ex) { 214 assert true; // ok... we'll exit now 215 } 216 addInfo("shutting down"); 217 } 218 219 private boolean socketConnectionCouldBeEstablished() throws InterruptedException { 220 return (socket = connector.call()) != null; 221 } 222 223 private ObjectWriter createObjectWriterForSocket() throws IOException { 224 socket.setSoTimeout(acceptConnectionTimeout); 225 ObjectWriter objectWriter = objectWriterFactory.newAutoFlushingObjectWriter(socket.getOutputStream()); 226 socket.setSoTimeout(0); 227 return objectWriter; 228 } 229 230 private SocketConnector createConnector(InetAddress address, int port, int initialDelay, long retryDelay) { 231 SocketConnector connector = newConnector(address, port, initialDelay, retryDelay); 232 connector.setExceptionHandler(this); 233 connector.setSocketFactory(getSocketFactory()); 234 return connector; 235 } 236 237 private void dispatchEvents(ObjectWriter objectWriter) throws InterruptedException, IOException { 238 while (true) { 239 E event = deque.takeFirst(); 240 postProcessEvent(event); 241 Serializable serializableEvent = getPST().transform(event); 242 try { 243 objectWriter.write(serializableEvent); 244 } catch (IOException e) { 245 tryReAddingEventToFrontOfQueue(event); 246 throw e; 247 } 248 } 249 } 250 251 private void tryReAddingEventToFrontOfQueue(E event) { 252 final boolean wasInserted = deque.offerFirst(event); 253 if (!wasInserted) { 254 addInfo("Dropping event due to socket connection error and maxed out deque capacity"); 255 } 256 } 257 258 /** 259 * {@inheritDoc} 260 */ 261 public void connectionFailed(SocketConnector connector, Exception ex) { 262 if (ex instanceof InterruptedException) { 263 addInfo("connector interrupted"); 264 } else if (ex instanceof ConnectException) { 265 addInfo(peerId + "connection refused"); 266 } else { 267 addInfo(peerId + ex); 268 } 269 } 270 271 /** 272 * Creates a new {@link SocketConnector}. 273 * <p> 274 * The default implementation creates an instance of 275 * {@link DefaultSocketConnector}. A subclass may override to provide a 276 * different {@link SocketConnector} implementation. 277 * 278 * @param address target remote address 279 * @param port target remote port 280 * @param initialDelay delay before the first connection attempt 281 * @param retryDelay delay before a reconnection attempt 282 * @return socket connector 283 */ 284 protected SocketConnector newConnector(InetAddress address, int port, long initialDelay, long retryDelay) { 285 return new DefaultSocketConnector(address, port, initialDelay, retryDelay); 286 } 287 288 /** 289 * Gets the default {@link SocketFactory} for the platform. 290 * <p> 291 * Subclasses may override to provide a custom socket factory. 292 */ 293 protected SocketFactory getSocketFactory() { 294 return SocketFactory.getDefault(); 295 } 296 297 /** 298 * Post-processes an event before it is serialized for delivery to the remote 299 * receiver. 300 * 301 * @param event the event to post-process 302 */ 303 protected abstract void postProcessEvent(E event); 304 305 /** 306 * Get the pre-serialization transformer that will be used to transform each 307 * event into a Serializable object before delivery to the remote receiver. 308 * 309 * @return transformer object 310 */ 311 protected abstract PreSerializationTransformer<E> getPST(); 312 313 /** 314 * The <b>RemoteHost</b> property takes the name of the host where a 315 * corresponding server is running. 316 */ 317 public void setRemoteHost(String host) { 318 remoteHost = host; 319 } 320 321 /** 322 * Returns value of the <b>RemoteHost</b> property. 323 */ 324 public String getRemoteHost() { 325 return remoteHost; 326 } 327 328 /** 329 * The <b>Port</b> property takes a positive integer representing the port where 330 * the server is waiting for connections. 331 */ 332 public void setPort(int port) { 333 this.port = port; 334 } 335 336 /** 337 * Returns value of the <b>Port</b> property. 338 */ 339 public int getPort() { 340 return port; 341 } 342 343 /** 344 * The <b>reconnectionDelay</b> property takes a positive {@link Duration} value 345 * representing the time to wait between each failed connection attempt to the 346 * server. The default value of this option is to 30 seconds. 347 * 348 * <p> 349 * Setting this option to zero turns off reconnection capability. 350 */ 351 public void setReconnectionDelay(Duration delay) { 352 this.reconnectionDelay = delay; 353 } 354 355 /** 356 * Returns value of the <b>reconnectionDelay</b> property. 357 */ 358 public Duration getReconnectionDelay() { 359 return reconnectionDelay; 360 } 361 362 /** 363 * The <b>queueSize</b> property takes a non-negative integer representing the 364 * number of logging events to retain for delivery to the remote receiver. When 365 * the deque size is zero, event delivery to the remote receiver is synchronous. 366 * When the deque size is greater than zero, the {@link #append(Object)} method 367 * returns immediately after enqueing the event, assuming that there is space 368 * available in the deque. Using a non-zero deque length can improve performance 369 * by eliminating delays caused by transient network delays. 370 * 371 * @param queueSize the deque size to set. 372 */ 373 public void setQueueSize(int queueSize) { 374 this.queueSize = queueSize; 375 } 376 377 /** 378 * Returns the value of the <b>queueSize</b> property. 379 */ 380 public int getQueueSize() { 381 return queueSize; 382 } 383 384 /** 385 * The <b>eventDelayLimit</b> takes a non-negative integer representing the 386 * number of milliseconds to allow the appender to block if the underlying 387 * BlockingQueue is full. Once this limit is reached, the event is dropped. 388 * 389 * @param eventDelayLimit the event delay limit 390 */ 391 public void setEventDelayLimit(Duration eventDelayLimit) { 392 this.eventDelayLimit = eventDelayLimit; 393 } 394 395 /** 396 * Returns the value of the <b>eventDelayLimit</b> property. 397 */ 398 public Duration getEventDelayLimit() { 399 return eventDelayLimit; 400 } 401 402 /** 403 * Sets the timeout that controls how long we'll wait for the remote peer to 404 * accept our connection attempt. 405 * <p> 406 * This property is configurable primarily to support instrumentation for unit 407 * testing. 408 * 409 * @param acceptConnectionTimeout timeout value in milliseconds 410 */ 411 void setAcceptConnectionTimeout(int acceptConnectionTimeout) { 412 this.acceptConnectionTimeout = acceptConnectionTimeout; 413 } 414 415}