View Javadoc
1   /**
2    * Logback: the reliable, generic, fast and flexible logging framework.
3    * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
4    *
5    * This program and the accompanying materials are dual-licensed under
6    * either the terms of the Eclipse Public License v1.0 as published by
7    * the Eclipse Foundation
8    *
9    *   or (per the licensee's choosing)
10   *
11   * under the terms of the GNU Lesser General Public License version 2.1
12   * as published by the Free Software Foundation.
13   */
14  package ch.qos.logback.core;
15  
16  import ch.qos.logback.core.spi.AppenderAttachable;
17  import ch.qos.logback.core.spi.AppenderAttachableImpl;
18  import ch.qos.logback.core.util.InterruptUtil;
19  
20  import java.util.Iterator;
21  import java.util.concurrent.ArrayBlockingQueue;
22  import java.util.concurrent.BlockingQueue;
23  
24  /**
25   * This appender and derived classes, log events asynchronously.  In order to avoid loss of logging events, this
26   * appender should be closed. It is the user's  responsibility to close appenders, typically at the end of the
27   * application lifecycle.
28   * <p>
29   * This appender buffers events in a {@link BlockingQueue}. {@link Worker} thread created by this appender takes
30   * events from the head of the queue, and dispatches them to the single appender attached to this appender.
31   * <p>
32   * Please refer to the <a href="http://logback.qos.ch/manual/appenders.html#AsyncAppender">logback manual</a> for
33   * further information about this appender.</p>
34   *
35   * @param <E>
36   * @author Ceki G&uuml;lc&uuml;
37   * @author Torsten Juergeleit
38   * @since 1.0.4
39   */
40  public class AsyncAppenderBase<E> extends UnsynchronizedAppenderBase<E> implements AppenderAttachable<E> {
41  
42      AppenderAttachableImpl<E> aai = new AppenderAttachableImpl<E>();
43      BlockingQueue<E> blockingQueue;
44  
45      /**
46       * The default buffer size.
47       */
48      public static final int DEFAULT_QUEUE_SIZE = 256;
49      int queueSize = DEFAULT_QUEUE_SIZE;
50  
51      int appenderCount = 0;
52  
53      static final int UNDEFINED = -1;
54      int discardingThreshold = UNDEFINED;
55      boolean neverBlock = false;
56  
57      Worker worker = new Worker();
58  
59      /**
60       * The default maximum queue flush time allowed during appender stop. If the 
61       * worker takes longer than this time it will exit, discarding any remaining 
62       * items in the queue
63       */
64      public static final int DEFAULT_MAX_FLUSH_TIME = 1000;
65      int maxFlushTime = DEFAULT_MAX_FLUSH_TIME;
66  
67      /**
68       * Is the eventObject passed as parameter discardable? The base class's implementation of this method always returns
69       * 'false' but sub-classes may (and do) override this method.
70       * <p>
71       * Note that only if the buffer is nearly full are events discarded. Otherwise, when the buffer is "not full"
72       * all events are logged.
73       *
74       * @param eventObject
75       * @return - true if the event can be discarded, false otherwise
76       */
77      protected boolean isDiscardable(E eventObject) {
78          return false;
79      }
80  
81      /**
82       * Pre-process the event prior to queueing. The base class does no pre-processing but sub-classes can
83       * override this behavior.
84       *
85       * @param eventObject
86       */
87      protected void preprocess(E eventObject) {
88      }
89  
90      @Override
91      public void start() {
92          if (isStarted())
93              return;
94          if (appenderCount == 0) {
95              addError("No attached appenders found.");
96              return;
97          }
98          if (queueSize < 1) {
99              addError("Invalid queue size [" + queueSize + "]");
100             return;
101         }
102         blockingQueue = new ArrayBlockingQueue<E>(queueSize);
103 
104         if (discardingThreshold == UNDEFINED)
105             discardingThreshold = queueSize / 5;
106         addInfo("Setting discardingThreshold to " + discardingThreshold);
107         worker.setDaemon(true);
108         worker.setName("AsyncAppender-Worker-" + getName());
109         // make sure this instance is marked as "started" before staring the worker Thread
110         super.start();
111         worker.start();
112     }
113 
114     @Override
115     public void stop() {
116         if (!isStarted())
117             return;
118 
119         // mark this appender as stopped so that Worker can also processPriorToRemoval if it is invoking
120         // aii.appendLoopOnAppenders
121         // and sub-appenders consume the interruption
122         super.stop();
123 
124         // interrupt the worker thread so that it can terminate. Note that the interruption can be consumed
125         // by sub-appenders
126         worker.interrupt();
127 
128         InterruptUtil interruptUtil = new InterruptUtil(context);
129 
130         try {
131             interruptUtil.maskInterruptFlag();
132 
133             worker.join(maxFlushTime);
134 
135             // check to see if the thread ended and if not add a warning message
136             if (worker.isAlive()) {
137                 addWarn("Max queue flush timeout (" + maxFlushTime + " ms) exceeded. Approximately " + blockingQueue.size()
138                                 + " queued events were possibly discarded.");
139             } else {
140                 addInfo("Queue flush finished successfully within timeout.");
141             }
142 
143         } catch (InterruptedException e) {
144             int remaining = blockingQueue.size();
145             addError("Failed to join worker thread. " + remaining + " queued events may be discarded.", e);
146         } finally {
147             interruptUtil.unmaskInterruptFlag();
148         }
149     }
150 
151 
152 
153 
154 
155     @Override
156     protected void append(E eventObject) {
157         if (isQueueBelowDiscardingThreshold() && isDiscardable(eventObject)) {
158             return;
159         }
160         preprocess(eventObject);
161         put(eventObject);
162     }
163 
164     private boolean isQueueBelowDiscardingThreshold() {
165         return (blockingQueue.remainingCapacity() < discardingThreshold);
166     }
167 
168     private void put(E eventObject) {
169         if (neverBlock) {
170             blockingQueue.offer(eventObject);
171         } else {
172             putUninterruptibly(eventObject);
173         }
174     }
175 
176     private void putUninterruptibly(E eventObject) {
177         boolean interrupted = false;
178         try {
179             while (true) {
180                 try {
181                     blockingQueue.put(eventObject);
182                     break;
183                 } catch (InterruptedException e) {
184                     interrupted = true;
185                 }
186             }
187         } finally {
188             if (interrupted) {
189                 Thread.currentThread().interrupt();
190             }
191         }
192     }
193 
194     public int getQueueSize() {
195         return queueSize;
196     }
197 
198     public void setQueueSize(int queueSize) {
199         this.queueSize = queueSize;
200     }
201 
202     public int getDiscardingThreshold() {
203         return discardingThreshold;
204     }
205 
206     public void setDiscardingThreshold(int discardingThreshold) {
207         this.discardingThreshold = discardingThreshold;
208     }
209 
210     public int getMaxFlushTime() {
211         return maxFlushTime;
212     }
213 
214     public void setMaxFlushTime(int maxFlushTime) {
215         this.maxFlushTime = maxFlushTime;
216     }
217 
218     /**
219      * Returns the number of elements currently in the blocking queue.
220      *
221      * @return number of elements currently in the queue.
222      */
223     public int getNumberOfElementsInQueue() {
224         return blockingQueue.size();
225     }
226 
227     public void setNeverBlock(boolean neverBlock) {
228         this.neverBlock = neverBlock;
229     }
230 
231     public boolean isNeverBlock() {
232         return neverBlock;
233     }
234 
235     /**
236      * The remaining capacity available in the blocking queue.
237      * <p>
238      * See also {@link java.util.concurrent.BlockingQueue#remainingCapacity() BlockingQueue#remainingCapacity()}
239      *
240      * @return the remaining capacity
241      * 
242      */
243     public int getRemainingCapacity() {
244         return blockingQueue.remainingCapacity();
245     }
246 
247     public void addAppender(Appender<E> newAppender) {
248         if (appenderCount == 0) {
249             appenderCount++;
250             addInfo("Attaching appender named [" + newAppender.getName() + "] to AsyncAppender.");
251             aai.addAppender(newAppender);
252         } else {
253             addWarn("One and only one appender may be attached to AsyncAppender.");
254             addWarn("Ignoring additional appender named [" + newAppender.getName() + "]");
255         }
256     }
257 
258     public Iterator<Appender<E>> iteratorForAppenders() {
259         return aai.iteratorForAppenders();
260     }
261 
262     public Appender<E> getAppender(String name) {
263         return aai.getAppender(name);
264     }
265 
266     public boolean isAttached(Appender<E> eAppender) {
267         return aai.isAttached(eAppender);
268     }
269 
270     public void detachAndStopAllAppenders() {
271         aai.detachAndStopAllAppenders();
272     }
273 
274     public boolean detachAppender(Appender<E> eAppender) {
275         return aai.detachAppender(eAppender);
276     }
277 
278     public boolean detachAppender(String name) {
279         return aai.detachAppender(name);
280     }
281 
282     class Worker extends Thread {
283 
284         public void run() {
285             AsyncAppenderBase<E> parent = AsyncAppenderBase.this;
286             AppenderAttachableImpl<E> aai = parent.aai;
287 
288             // loop while the parent is started
289             while (parent.isStarted()) {
290                 try {
291                     E e = parent.blockingQueue.take();
292                     aai.appendLoopOnAppenders(e);
293                 } catch (InterruptedException ie) {
294                     break;
295                 }
296             }
297 
298             addInfo("Worker thread will flush remaining events before exiting. ");
299 
300             for (E e : parent.blockingQueue) {
301                 aai.appendLoopOnAppenders(e);
302                 parent.blockingQueue.remove(e);
303             }
304 
305             aai.detachAndStopAllAppenders();
306         }
307     }
308 }