1 /**
2 * Logback: the reliable, generic, fast and flexible logging framework.
3 * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
4 *
5 * This program and the accompanying materials are dual-licensed under
6 * either the terms of the Eclipse Public License v1.0 as published by
7 * the Eclipse Foundation
8 *
9 * or (per the licensee's choosing)
10 *
11 * under the terms of the GNU Lesser General Public License version 2.1
12 * as published by the Free Software Foundation.
13 */
14 package ch.qos.logback.core.net.server;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23
24 import ch.qos.logback.core.spi.ContextAwareBase;
25
26 /**
27 * A concurrent {@link ServerRunner}.
28 * <p>
29 * An instance of this object is created with a {@link ServerListener} and an
30 * {@link java.util.concurrent.Executor Executor}. On invocation of the
31 * {@code start()} method, it passes itself to the given {@code Executor} and
32 * returns immediately. On invocation of its {@link #run()} method by the
33 * {@link Executor} it begins accepting client connections via its
34 * {@code ServerListener}. As each new {@link Client} is accepted, the client is
35 * configured with the runner's LoggingContext and is then passed to the {@code
36 * Executor} for concurrent execution of the client's service loop.
37 * <p>
38 * On invocation of the {@link #stop()} method, the runner closes the listener
39 * and each of the connected clients (by invoking {@link Client#close()}
40 * effectively interrupting any blocked I/O calls and causing these concurrent
41 * subtasks to exit gracefully). This ensures that before the {@link #stop()}
42 * method returns (1) all I/O resources have been released and (2) all of the
43 * threads of the {@code Executor} are idle.
44 *
45 * @author Carl Harris
46 */
47 public abstract class ConcurrentServerRunner<T extends Client> extends ContextAwareBase
48 implements Runnable, ServerRunner<T> {
49
50 private final Lock clientsLock = new ReentrantLock();
51
52 private final Collection<T> clients = new ArrayList<T>();
53
54 private final ServerListener<T> listener;
55 private final Executor executor;
56
57 private boolean running;
58
59 /**
60 * Constructs a new server runner.
61 *
62 * @param listener the listener from which the server will accept new clients
63 * @param executor an executor that will facilitate execution of the listening
64 * and client-handling tasks; while any {@link Executor} is
65 * allowed here, outside of unit testing the only reasonable
66 * choice is a bounded thread pool of some kind.
67 */
68 public ConcurrentServerRunner(ServerListener<T> listener, Executor executor) {
69 this.listener = listener;
70 this.executor = executor;
71 }
72
73 /**
74 * {@inheritDoc}
75 */
76 public boolean isRunning() {
77 return running;
78 }
79
80 protected void setRunning(boolean running) {
81 this.running = running;
82 }
83
84 /**
85 * {@inheritDoc}
86 */
87 public void stop() throws IOException {
88 listener.close();
89 accept(new ClientVisitor<T>() {
90 public void visit(T client) {
91 client.close();
92 }
93 });
94 }
95
96 /**
97 * {@inheritDoc}
98 */
99 public void accept(ClientVisitor<T> visitor) {
100 Collection<T> clients = copyClients();
101 for (T client : clients) {
102 try {
103 visitor.visit(client);
104 } catch (RuntimeException ex) {
105 addError(client + ": " + ex);
106 }
107 }
108 }
109
110 /**
111 * Creates a copy of the collection of all clients that are presently being
112 * tracked by the server.
113 *
114 * @return collection of client objects
115 */
116 private Collection<T> copyClients() {
117 clientsLock.lock();
118 try {
119 Collection<T> copy = new ArrayList<T>(clients);
120 return copy;
121 } finally {
122 clientsLock.unlock();
123 }
124 }
125
126 /**
127 * {@inheritDoc}
128 */
129 public void run() {
130 setRunning(true);
131 try {
132 addInfo("listening on " + listener);
133 while (!Thread.currentThread().isInterrupted()) {
134 T client = listener.acceptClient();
135 if (!configureClient(client)) {
136 addError(client + ": connection dropped");
137 client.close();
138 continue;
139 }
140 try {
141 executor.execute(new ClientWrapper(client));
142 } catch (RejectedExecutionException ex) {
143 addError(client + ": connection dropped");
144 client.close();
145 }
146 }
147 } catch (InterruptedException ex) {
148 assert true; // ok... we'll shut down
149 } catch (Exception ex) {
150 addError("listener: " + ex);
151 }
152
153 setRunning(false);
154 addInfo("shutting down");
155 listener.close();
156 }
157
158 /**
159 * Configures a connected client.
160 * <p>
161 * A subclass implements this method to perform any necessary configuration of
162 * the client object before its {@link Client#run()} method is invoked.
163 *
164 * @param client the subject client
165 * @return {@code true} if configuration was successful; if the return value is
166 * {@code false} the client connection will be dropped
167 */
168 protected abstract boolean configureClient(T client);
169
170 /**
171 * Adds a client to the collection of those being tracked by the server.
172 *
173 * @param client the client to add
174 */
175 private void addClient(T client) {
176 clientsLock.lock();
177 try {
178 clients.add(client);
179 } finally {
180 clientsLock.unlock();
181 }
182 }
183
184 /**
185 * Removes a client from the collection of those being tracked by the server.
186 *
187 * @param client the client to remote
188 */
189 private void removeClient(T client) {
190 clientsLock.lock();
191 try {
192 clients.remove(client);
193 } finally {
194 clientsLock.unlock();
195 }
196 }
197
198 /**
199 * A wrapper for a {@link Client} responsible for ensuring that client tracking
200 * is performed properly.
201 */
202 private class ClientWrapper implements Client {
203
204 private final T delegate;
205
206 public ClientWrapper(T client) {
207 this.delegate = client;
208 }
209
210 public void run() {
211 addClient(delegate);
212 try {
213 delegate.run();
214 } finally {
215 removeClient(delegate);
216 }
217 }
218
219 public void close() {
220 delegate.close();
221 }
222
223 }
224
225 }