1
2
3
4
5
6
7
8
9
10
11
12
13
14 package ch.qos.logback.core.net.server;
15
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.Collection;
19 import java.util.concurrent.Executor;
20 import java.util.concurrent.RejectedExecutionException;
21 import java.util.concurrent.locks.Lock;
22 import java.util.concurrent.locks.ReentrantLock;
23
24 import ch.qos.logback.core.spi.ContextAwareBase;
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47 public abstract class ConcurrentServerRunner<T extends Client> extends ContextAwareBase
48 implements Runnable, ServerRunner<T> {
49
50 private final Lock clientsLock = new ReentrantLock();
51
52 private final Collection<T> clients = new ArrayList<T>();
53
54 private final ServerListener<T> listener;
55 private final Executor executor;
56
57 private boolean running;
58
59
60
61
62
63
64
65
66
67
68 public ConcurrentServerRunner(ServerListener<T> listener, Executor executor) {
69 this.listener = listener;
70 this.executor = executor;
71 }
72
73
74
75
76 public boolean isRunning() {
77 return running;
78 }
79
80 protected void setRunning(boolean running) {
81 this.running = running;
82 }
83
84
85
86
87 public void stop() throws IOException {
88 listener.close();
89 accept(new ClientVisitor<T>() {
90 public void visit(T client) {
91 client.close();
92 }
93 });
94 }
95
96
97
98
99 public void accept(ClientVisitor<T> visitor) {
100 Collection<T> clients = copyClients();
101 for (T client : clients) {
102 try {
103 visitor.visit(client);
104 } catch (RuntimeException ex) {
105 addError(client + ": " + ex);
106 }
107 }
108 }
109
110
111
112
113
114
115
116 private Collection<T> copyClients() {
117 clientsLock.lock();
118 try {
119 Collection<T> copy = new ArrayList<T>(clients);
120 return copy;
121 } finally {
122 clientsLock.unlock();
123 }
124 }
125
126
127
128
129 public void run() {
130 setRunning(true);
131 try {
132 addInfo("listening on " + listener);
133 while (!Thread.currentThread().isInterrupted()) {
134 T client = listener.acceptClient();
135 if (!configureClient(client)) {
136 addError(client + ": connection dropped");
137 client.close();
138 continue;
139 }
140 try {
141 executor.execute(new ClientWrapper(client));
142 } catch (RejectedExecutionException ex) {
143 addError(client + ": connection dropped");
144 client.close();
145 }
146 }
147 } catch (InterruptedException ex) {
148 assert true;
149 } catch (Exception ex) {
150 addError("listener: " + ex);
151 }
152
153 setRunning(false);
154 addInfo("shutting down");
155 listener.close();
156 }
157
158
159
160
161
162
163
164
165
166
167
168 protected abstract boolean configureClient(T client);
169
170
171
172
173
174
175 private void addClient(T client) {
176 clientsLock.lock();
177 try {
178 clients.add(client);
179 } finally {
180 clientsLock.unlock();
181 }
182 }
183
184
185
186
187
188
189 private void removeClient(T client) {
190 clientsLock.lock();
191 try {
192 clients.remove(client);
193 } finally {
194 clientsLock.unlock();
195 }
196 }
197
198
199
200
201
202 private class ClientWrapper implements Client {
203
204 private final T delegate;
205
206 public ClientWrapper(T client) {
207 this.delegate = client;
208 }
209
210 public void run() {
211 addClient(delegate);
212 try {
213 delegate.run();
214 } finally {
215 removeClient(delegate);
216 }
217 }
218
219 public void close() {
220 delegate.close();
221 }
222
223 }
224
225 }