View Javadoc
1   /**
2    * Logback: the reliable, generic, fast and flexible logging framework.
3    * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
4    *
5    * This program and the accompanying materials are dual-licensed under
6    * either the terms of the Eclipse Public License v1.0 as published by
7    * the Eclipse Foundation
8    *
9    *   or (per the licensee's choosing)
10   *
11   * under the terms of the GNU Lesser General Public License version 2.1
12   * as published by the Free Software Foundation.
13   */
14  package ch.qos.logback.core.net.server;
15  
16  import java.io.IOException;
17  import java.util.ArrayList;
18  import java.util.Collection;
19  import java.util.concurrent.Executor;
20  import java.util.concurrent.RejectedExecutionException;
21  import java.util.concurrent.locks.Lock;
22  import java.util.concurrent.locks.ReentrantLock;
23  
24  import ch.qos.logback.core.spi.ContextAwareBase;
25  
26  /**
27   * A concurrent {@link ServerRunner}.
28   * <p>
29   * An instance of this object is created with a {@link ServerListener} and
30   * an {@link java.util.concurrent.Executor Executor}.  On invocation of the {@code start()} method, it
31   * passes itself to the given {@code Executor} and returns immediately.  On
32   * invocation of its {@link #run()} method by the {@link Executor} it begins 
33   * accepting client connections via its {@code ServerListener}.  As each
34   * new {@link Client} is accepted, the client is configured with the 
35   * runner's LoggingContext and is then passed to the {@code 
36   * Executor} for concurrent execution of the client's service loop.     
37   * <p>
38   * On invocation of the {@link #stop()} method, the runner closes the listener
39   * and each of the connected clients (by invoking {@link Client#close()} 
40   * effectively interrupting any blocked I/O calls and causing these concurrent
41   * subtasks to exit gracefully).  This ensures that before the {@link #stop()}
42   * method returns (1) all I/O resources have been released and (2) all 
43   * of the threads of the {@code Executor} are idle.
44   *
45   * @author Carl Harris
46   */
47  public abstract class ConcurrentServerRunner<T extends Client> extends ContextAwareBase implements Runnable, ServerRunner<T> {
48  
49      private final Lock clientsLock = new ReentrantLock();
50  
51      private final Collection<T> clients = new ArrayList<T>();
52  
53      private final ServerListener<T> listener;
54      private final Executor executor;
55  
56      private boolean running;
57  
58      /**
59       * Constructs a new server runner.
60       * @param listener the listener from which the server will accept new
61       *    clients
62       * @param executor a executor that will facilitate execution of the
63       *    listening and client-handling tasks; while any {@link Executor}
64       *    is allowed here, outside of unit testing the only reasonable choice
65       *    is a bounded thread pool of some kind.
66       */
67      public ConcurrentServerRunner(ServerListener<T> listener, Executor executor) {
68          this.listener = listener;
69          this.executor = executor;
70      }
71  
72      /**
73       * {@inheritDoc}
74       */
75      public boolean isRunning() {
76          return running;
77      }
78  
79      protected void setRunning(boolean running) {
80          this.running = running;
81      }
82  
83      /**
84       * {@inheritDoc}
85       */
86      public void stop() throws IOException {
87          listener.close();
88          accept(new ClientVisitor<T>() {
89              public void visit(T client) {
90                  client.close();
91              }
92          });
93      }
94  
95      /**
96       * {@inheritDoc}
97       */
98      public void accept(ClientVisitor<T> visitor) {
99          Collection<T> clients = copyClients();
100         for (T client : clients) {
101             try {
102                 visitor.visit(client);
103             } catch (RuntimeException ex) {
104                 addError(client + ": " + ex);
105             }
106         }
107     }
108 
109     /**
110      * Creates a copy of the collection of all clients that are presently
111      * being tracked by the server.
112      * @return collection of client objects
113      */
114     private Collection<T> copyClients() {
115         clientsLock.lock();
116         try {
117             Collection<T> copy = new ArrayList<T>(clients);
118             return copy;
119         } finally {
120             clientsLock.unlock();
121         }
122     }
123 
124     /**
125      * {@inheritDoc}
126      */
127     public void run() {
128         setRunning(true);
129         try {
130             addInfo("listening on " + listener);
131             while (!Thread.currentThread().isInterrupted()) {
132                 T client = listener.acceptClient();
133                 if (!configureClient(client)) {
134                     addError(client + ": connection dropped");
135                     client.close();
136                     continue;
137                 }
138                 try {
139                     executor.execute(new ClientWrapper(client));
140                 } catch (RejectedExecutionException ex) {
141                     addError(client + ": connection dropped");
142                     client.close();
143                 }
144             }
145         } catch (InterruptedException ex) {
146             assert true; // ok... we'll shut down
147         } catch (Exception ex) {
148             addError("listener: " + ex);
149         }
150 
151         setRunning(false);
152         addInfo("shutting down");
153         listener.close();
154     }
155 
156     /**
157      * Configures a connected client.
158      * <p>
159      * A subclass implements this method to perform any necessary configuration
160      * of the client object before its {@link Client#run()} method is invoked.
161      * 
162      * @param client the subject client
163      * @return {@code true} if configuration was successful; if the return
164      *    value is {@code false} the client connection will be dropped
165      */
166     protected abstract boolean configureClient(T client);
167 
168     /**
169      * Adds a client to the collection of those being tracked by the server.
170      * @param client the client to add
171      */
172     private void addClient(T client) {
173         clientsLock.lock();
174         try {
175             clients.add(client);
176         } finally {
177             clientsLock.unlock();
178         }
179     }
180 
181     /**
182      * Removes a client from the collection of those being tracked by the server.
183      * @param client the client to remote
184      */
185     private void removeClient(T client) {
186         clientsLock.lock();
187         try {
188             clients.remove(client);
189         } finally {
190             clientsLock.unlock();
191         }
192     }
193 
194     /**
195      * A wrapper for a {@link Client} responsible for ensuring that client
196      * tracking is performed properly.
197      */
198     private class ClientWrapper implements Client {
199 
200         private final T delegate;
201 
202         public ClientWrapper(T client) {
203             this.delegate = client;
204         }
205 
206         public void run() {
207             addClient(delegate);
208             try {
209                 delegate.run();
210             } finally {
211                 removeClient(delegate);
212             }
213         }
214 
215         public void close() {
216             delegate.close();
217         }
218 
219     }
220 
221 }