001/** 002 * Logback: the reliable, generic, fast and flexible logging framework. 003 * Copyright (C) 1999-2015, QOS.ch. All rights reserved. 004 * 005 * This program and the accompanying materials are dual-licensed under 006 * either the terms of the Eclipse Public License v1.0 as published by 007 * the Eclipse Foundation 008 * 009 * or (per the licensee's choosing) 010 * 011 * under the terms of the GNU Lesser General Public License version 2.1 012 * as published by the Free Software Foundation. 013 */ 014package ch.qos.logback.core; 015 016import ch.qos.logback.core.spi.AppenderAttachable; 017import ch.qos.logback.core.spi.AppenderAttachableImpl; 018import ch.qos.logback.core.util.InterruptUtil; 019 020import java.util.ArrayList; 021import java.util.Iterator; 022import java.util.List; 023import java.util.concurrent.ArrayBlockingQueue; 024import java.util.concurrent.BlockingQueue; 025 026/** 027 * This appender and derived classes, log events asynchronously. In order to 028 * avoid loss of logging events, this appender should be closed. It is the 029 * user's responsibility to close appenders, typically at the end of the 030 * application lifecycle. 031 * <p> 032 * This appender buffers events in a {@link BlockingQueue}. {@link Worker} 033 * thread created by this appender takes events from the head of the queue, and 034 * dispatches them to the single appender attached to this appender. 035 * <p> 036 * Please refer to the 037 * <a href="http://logback.qos.ch/manual/appenders.html#AsyncAppender">logback 038 * manual</a> for further information about this appender. 039 * </p> 040 * 041 * @param <E> 042 * @author Ceki Gülcü 043 * @author Torsten Juergeleit 044 * @since 1.0.4 045 */ 046public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> { 047 048 AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>(); 049 BlockingQueue<E> blockingQueue; 050 051 /** 052 * The default buffer size. 053 */ 054 public static final int DEFAULT_QUEUE_SIZE = 256; 055 int queueSize = DEFAULT_QUEUE_SIZE; 056 057 int appenderCount = 0; 058 059 static final int UNDEFINED = -1; 060 int discardingThreshold = UNDEFINED; 061 boolean neverBlock = false; 062 063 Worker worker = new Worker(); 064 065 /** 066 * The default maximum queue flush time allowed during appender stop. If the 067 * worker takes longer than this time it will exit, discarding any remaining 068 * items in the queue 069 */ 070 public static final int DEFAULT_MAX_FLUSH_TIME = 1000; 071 int maxFlushTime = DEFAULT_MAX_FLUSH_TIME; 072 073 /** 074 * Is the eventObject passed as parameter discardable? The base class's 075 * implementation of this method always returns 'false' but sub-classes may (and 076 * do) override this method. 077 * <p> 078 * Note that only if the buffer is nearly full are events discarded. Otherwise, 079 * when the buffer is "not full" all events are logged. 080 * 081 * @param eventObject 082 * @return - true if the event can be discarded, false otherwise 083 */ 084 protected boolean isDiscardable(E eventObject) { 085 return false; 086 } 087 088 /** 089 * Pre-process the event prior to queueing. The base class does no 090 * pre-processing but subclasses can override this behavior. 091 * 092 * @param eventObject 093 */ 094 protected void preprocess(E eventObject) { 095 } 096 097 @Override 098 public void start() { 099 if (isStarted()) 100 return; 101 if (appenderCount == 0) { 102 addError("No attached appenders found."); 103 return; 104 } 105 if (queueSize < 1) { 106 addError("Invalid queue size [" + queueSize + "]"); 107 return; 108 } 109 blockingQueue = new ArrayBlockingQueue<E>(queueSize); 110 111 if (discardingThreshold == UNDEFINED) 112 discardingThreshold = queueSize / 5; 113 addInfo("Setting discardingThreshold to " + discardingThreshold); 114 worker.setDaemon(true); 115 worker.setName("AsyncAppender-Worker-" + getName()); 116 // make sure this instance is marked as "started" before staring the worker 117 // Thread 118 super.start(); 119 worker.start(); 120 } 121 122 @Override 123 public void stop() { 124 if (!isStarted()) 125 return; 126 127 // mark this appender as stopped so that Worker can also processPriorToRemoval 128 // if it is invoking 129 // aii.appendLoopOnAppenders 130 // and sub-appenders consume the interruption 131 super.stop(); 132 133 // interrupt the worker thread so that it can terminate. Note that the 134 // interruption can be consumed by sub-appenders 135 worker.interrupt(); 136 137 InterruptUtil interruptUtil = new InterruptUtil(context); 138 139 try { 140 interruptUtil.maskInterruptFlag(); 141 142 worker.join(maxFlushTime); 143 144 // check to see if the thread ended and if not add a warning message 145 if (worker.isAlive()) { 146 addWarn("Max queue flush timeout (" + maxFlushTime + " ms) exceeded. Approximately " 147 + blockingQueue.size() + " queued events were possibly discarded."); 148 } else { 149 addInfo("Queue flush finished successfully within timeout."); 150 } 151 152 } catch (InterruptedException e) { 153 int remaining = blockingQueue.size(); 154 addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e); 155 } finally { 156 interruptUtil.unmaskInterruptFlag(); 157 } 158 } 159 160 @Override 161 protected void append(E eventObject) { 162 if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) { 163 return; 164 } 165 preprocess(eventObject); 166 put(eventObject); 167 } 168 169 private boolean isQueueBelowDiscardingThreshold() { 170 return (blockingQueue.remainingCapacity() < discardingThreshold); 171 } 172 173 private void put(E eventObject) { 174 if (neverBlock) { 175 blockingQueue.offer(eventObject); 176 } else { 177 putUninterruptibly(eventObject); 178 } 179 } 180 181 private void putUninterruptibly(E eventObject) { 182 boolean interrupted = false; 183 try { 184 while (true) { 185 try { 186 blockingQueue.put(eventObject); 187 break; 188 } catch (InterruptedException e) { 189 interrupted = true; 190 } 191 } 192 } finally { 193 if (interrupted) { 194 Thread.currentThread().interrupt(); 195 } 196 } 197 } 198 199 public int getQueueSize() { 200 return queueSize; 201 } 202 203 public void setQueueSize(int queueSize) { 204 this.queueSize = queueSize; 205 } 206 207 public int getDiscardingThreshold() { 208 return discardingThreshold; 209 } 210 211 public void setDiscardingThreshold(int discardingThreshold) { 212 this.discardingThreshold = discardingThreshold; 213 } 214 215 public int getMaxFlushTime() { 216 return maxFlushTime; 217 } 218 219 public void setMaxFlushTime(int maxFlushTime) { 220 this.maxFlushTime = maxFlushTime; 221 } 222 223 /** 224 * Returns the number of elements currently in the blocking queue. 225 * 226 * @return number of elements currently in the queue. 227 */ 228 public int getNumberOfElementsInQueue() { 229 return blockingQueue.size(); 230 } 231 232 public void setNeverBlock(boolean neverBlock) { 233 this.neverBlock = neverBlock; 234 } 235 236 public boolean isNeverBlock() { 237 return neverBlock; 238 } 239 240 /** 241 * The remaining capacity available in the blocking queue. 242 * <p> 243 * See also {@link java.util.concurrent.BlockingQueue#remainingCapacity() 244 * BlockingQueue#remainingCapacity()} 245 * 246 * @return the remaining capacity 247 * 248 */ 249 public int getRemainingCapacity() { 250 return blockingQueue.remainingCapacity(); 251 } 252 253 public void addAppender(Appender<E> newAppender) { 254 if (appenderCount == 0) { 255 appenderCount++; 256 addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender."); 257 aai.addAppender(newAppender); 258 } else { 259 addWarn("One and only one appender may be attached to AsyncAppender."); 260 addWarn("Ignoring additional appender named [" + newAppender.getName() + "]"); 261 } 262 } 263 264 public Iterator<Appender<E>> iteratorForAppenders() { 265 return aai.iteratorForAppenders(); 266 } 267 268 public Appender<E> getAppender(String name) { 269 return aai.getAppender(name); 270 } 271 272 public boolean isAttached(Appender<E> eAppender) { 273 return aai.isAttached(eAppender); 274 } 275 276 public void detachAndStopAllAppenders() { 277 aai.detachAndStopAllAppenders(); 278 } 279 280 public boolean detachAppender(Appender<E> eAppender) { 281 return aai.detachAppender(eAppender); 282 } 283 284 public boolean detachAppender(String name) { 285 return aai.detachAppender(name); 286 } 287 288 class Worker extends Thread { 289 290 public void run() { 291 AsyncAppenderBase<E> parent = AsyncAppenderBase.this; 292 AppenderAttachableImpl<E> aai = parent.aai; 293 294 // loop while the parent is started 295 while (parent.isStarted()) { 296 try { 297 List<E> elements = new ArrayList<E>(); 298 E e0 = parent.blockingQueue.take(); 299 elements.add(e0); 300 parent.blockingQueue.drainTo(elements); 301 for (E e : elements) { 302 aai.appendLoopOnAppenders(e); 303 } 304 } catch (InterruptedException e1) { 305 // exit if interrupted 306 break; 307 } 308 } 309 310 addInfo("Worker thread will flush remaining events before exiting. "); 311 312 for (E e : parent.blockingQueue) { 313 aai.appendLoopOnAppenders(e); 314 parent.blockingQueue.remove(e); 315 } 316 317 aai.detachAndStopAllAppenders(); 318 } 319 } 320}