View Javadoc
1   /**
2    * Logback: the reliable, generic, fast and flexible logging framework.
3    * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
4    *
5    * This program and the accompanying materials are dual-licensed under
6    * either the terms of the Eclipse Public License v1.0 as published by
7    * the Eclipse Foundation
8    *
9    *   or (per the licensee's choosing)
10   *
11   * under the terms of the GNU Lesser General Public License version 2.1
12   * as published by the Free Software Foundation.
13   */
14  package ch.qos.logback.core.net.server;
15  
16  import java.io.IOException;
17  import java.util.ArrayList;
18  import java.util.Collection;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.RejectedExecutionException;
21  import java.util.concurrent.locks.Lock;
22  import java.util.concurrent.locks.ReentrantLock;
23  
24  import ch.qos.logback.core.spi.ContextAwareBase;
25  
26  /**
27   * A concurrent {@link ServerRunner}.
28   * <p>
29   * An instance of this object is created with a {@link ServerListener} and an
30   * {@link java.util.concurrent.Executor Executor}. On invocation of the
31   * {@code start()} method, it passes itself to the given {@code Executor} and
32   * returns immediately. On invocation of its {@link #run()} method by the
33   * {@link Executor} it begins accepting client connections via its
34   * {@code ServerListener}. As each new {@link Client} is accepted, the client is
35   * configured with the runner's LoggingContext and is then passed to the {@code 
36   * Executor} for concurrent execution of the client's service loop.
37   * <p>
38   * On invocation of the {@link #stop()} method, the runner closes the listener
39   * and each of the connected clients (by invoking {@link Client#close()}
40   * effectively interrupting any blocked I/O calls and causing these concurrent
41   * subtasks to exit gracefully). This ensures that before the {@link #stop()}
42   * method returns (1) all I/O resources have been released and (2) all of the
43   * threads of the {@code Executor} are idle.
44   *
45   * @author Carl Harris
46   */
47  public abstract class ConcurrentServerRunner<T extends Client> extends ContextAwareBase
48          implements Runnable, ServerRunner<T> {
49  
50      private final Lock clientsLock = new ReentrantLock();
51  
52      private final Collection<T> clients = new ArrayList<T>();
53  
54      private final ServerListener<T> listener;
55      private final Executor executor;
56  
57      private boolean running;
58  
59      /**
60       * Constructs a new server runner.
61       * 
62       * @param listener the listener from which the server will accept new clients
63       * @param executor an executor that will facilitate execution of the listening
64       *                 and client-handling tasks; while any {@link Executor} is
65       *                 allowed here, outside of unit testing the only reasonable
66       *                 choice is a bounded thread pool of some kind.
67       */
68      public ConcurrentServerRunner(ServerListener<T> listener, Executor executor) {
69          this.listener = listener;
70          this.executor = executor;
71      }
72  
73      /**
74       * {@inheritDoc}
75       */
76      public boolean isRunning() {
77          return running;
78      }
79  
80      protected void setRunning(boolean running) {
81          this.running = running;
82      }
83  
84      /**
85       * {@inheritDoc}
86       */
87      public void stop() throws IOException {
88          listener.close();
89          accept(new ClientVisitor<T>() {
90              public void visit(T client) {
91                  client.close();
92              }
93          });
94      }
95  
96      /**
97       * {@inheritDoc}
98       */
99      public void accept(ClientVisitor<T> visitor) {
100         Collection<T> clients = copyClients();
101         for (T client : clients) {
102             try {
103                 visitor.visit(client);
104             } catch (RuntimeException ex) {
105                 addError(client + ": " + ex);
106             }
107         }
108     }
109 
110     /**
111      * Creates a copy of the collection of all clients that are presently being
112      * tracked by the server.
113      * 
114      * @return collection of client objects
115      */
116     private Collection<T> copyClients() {
117         clientsLock.lock();
118         try {
119             Collection<T> copy = new ArrayList<T>(clients);
120             return copy;
121         } finally {
122             clientsLock.unlock();
123         }
124     }
125 
126     /**
127      * {@inheritDoc}
128      */
129     public void run() {
130         setRunning(true);
131         try {
132             addInfo("listening on " + listener);
133             while (!Thread.currentThread().isInterrupted()) {
134                 T client = listener.acceptClient();
135                 if (!configureClient(client)) {
136                     addError(client + ": connection dropped");
137                     client.close();
138                     continue;
139                 }
140                 try {
141                     executor.execute(new ClientWrapper(client));
142                 } catch (RejectedExecutionException ex) {
143                     addError(client + ": connection dropped");
144                     client.close();
145                 }
146             }
147         } catch (InterruptedException ex) {
148             assert true; // ok... we'll shut down
149         } catch (Exception ex) {
150             addError("listener: " + ex);
151         }
152 
153         setRunning(false);
154         addInfo("shutting down");
155         listener.close();
156     }
157 
158     /**
159      * Configures a connected client.
160      * <p>
161      * A subclass implements this method to perform any necessary configuration of
162      * the client object before its {@link Client#run()} method is invoked.
163      * 
164      * @param client the subject client
165      * @return {@code true} if configuration was successful; if the return value is
166      *         {@code false} the client connection will be dropped
167      */
168     protected abstract boolean configureClient(T client);
169 
170     /**
171      * Adds a client to the collection of those being tracked by the server.
172      * 
173      * @param client the client to add
174      */
175     private void addClient(T client) {
176         clientsLock.lock();
177         try {
178             clients.add(client);
179         } finally {
180             clientsLock.unlock();
181         }
182     }
183 
184     /**
185      * Removes a client from the collection of those being tracked by the server.
186      * 
187      * @param client the client to remote
188      */
189     private void removeClient(T client) {
190         clientsLock.lock();
191         try {
192             clients.remove(client);
193         } finally {
194             clientsLock.unlock();
195         }
196     }
197 
198     /**
199      * A wrapper for a {@link Client} responsible for ensuring that client tracking
200      * is performed properly.
201      */
202     private class ClientWrapper implements Client {
203 
204         private final T delegate;
205 
206         public ClientWrapper(T client) {
207             this.delegate = client;
208         }
209 
210         public void run() {
211             addClient(delegate);
212             try {
213                 delegate.run();
214             } finally {
215                 removeClient(delegate);
216             }
217         }
218 
219         public void close() {
220             delegate.close();
221         }
222 
223     }
224 
225 }