001/** 002 * Logback: the reliable, generic, fast and flexible logging framework. 003 * Copyright (C) 1999-2015, QOS.ch. All rights reserved. 004 * 005 * This program and the accompanying materials are dual-licensed under 006 * either the terms of the Eclipse Public License v1.0 as published by 007 * the Eclipse Foundation 008 * 009 * or (per the licensee's choosing) 010 * 011 * under the terms of the GNU Lesser General Public License version 2.1 012 * as published by the Free Software Foundation. 013 */ 014package ch.qos.logback.core.net.server; 015 016import java.io.IOException; 017import java.util.ArrayList; 018import java.util.Collection; 019import java.util.concurrent.Executor; 020import java.util.concurrent.RejectedExecutionException; 021import java.util.concurrent.locks.Lock; 022import java.util.concurrent.locks.ReentrantLock; 023 024import ch.qos.logback.core.spi.ContextAwareBase; 025 026/** 027 * A concurrent {@link ServerRunner}. 028 * <p> 029 * An instance of this object is created with a {@link ServerListener} and an 030 * {@link java.util.concurrent.Executor Executor}. On invocation of the 031 * {@code start()} method, it passes itself to the given {@code Executor} and 032 * returns immediately. On invocation of its {@link #run()} method by the 033 * {@link Executor} it begins accepting client connections via its 034 * {@code ServerListener}. As each new {@link Client} is accepted, the client is 035 * configured with the runner's LoggingContext and is then passed to the {@code 036 * Executor} for concurrent execution of the client's service loop. 037 * <p> 038 * On invocation of the {@link #stop()} method, the runner closes the listener 039 * and each of the connected clients (by invoking {@link Client#close()} 040 * effectively interrupting any blocked I/O calls and causing these concurrent 041 * subtasks to exit gracefully). This ensures that before the {@link #stop()} 042 * method returns (1) all I/O resources have been released and (2) all of the 043 * threads of the {@code Executor} are idle. 044 * 045 * @author Carl Harris 046 */ 047public abstract class ConcurrentServerRunner<T extends Client> extends ContextAwareBase 048 implements Runnable, ServerRunner<T> { 049 050 private final Lock clientsLock = new ReentrantLock(); 051 052 private final Collection<T> clients = new ArrayList<T>(); 053 054 private final ServerListener<T> listener; 055 private final Executor executor; 056 057 private boolean running; 058 059 /** 060 * Constructs a new server runner. 061 * 062 * @param listener the listener from which the server will accept new clients 063 * @param executor an executor that will facilitate execution of the listening 064 * and client-handling tasks; while any {@link Executor} is 065 * allowed here, outside of unit testing the only reasonable 066 * choice is a bounded thread pool of some kind. 067 */ 068 public ConcurrentServerRunner(ServerListener<T> listener, Executor executor) { 069 this.listener = listener; 070 this.executor = executor; 071 } 072 073 /** 074 * {@inheritDoc} 075 */ 076 public boolean isRunning() { 077 return running; 078 } 079 080 protected void setRunning(boolean running) { 081 this.running = running; 082 } 083 084 /** 085 * {@inheritDoc} 086 */ 087 public void stop() throws IOException { 088 listener.close(); 089 accept(new ClientVisitor<T>() { 090 public void visit(T client) { 091 client.close(); 092 } 093 }); 094 } 095 096 /** 097 * {@inheritDoc} 098 */ 099 public void accept(ClientVisitor<T> visitor) { 100 Collection<T> clients = copyClients(); 101 for (T client : clients) { 102 try { 103 visitor.visit(client); 104 } catch (RuntimeException ex) { 105 addError(client + ": " + ex); 106 } 107 } 108 } 109 110 /** 111 * Creates a copy of the collection of all clients that are presently being 112 * tracked by the server. 113 * 114 * @return collection of client objects 115 */ 116 private Collection<T> copyClients() { 117 clientsLock.lock(); 118 try { 119 Collection<T> copy = new ArrayList<T>(clients); 120 return copy; 121 } finally { 122 clientsLock.unlock(); 123 } 124 } 125 126 /** 127 * {@inheritDoc} 128 */ 129 public void run() { 130 setRunning(true); 131 try { 132 addInfo("listening on " + listener); 133 while (!Thread.currentThread().isInterrupted()) { 134 T client = listener.acceptClient(); 135 if (!configureClient(client)) { 136 addError(client + ": connection dropped"); 137 client.close(); 138 continue; 139 } 140 try { 141 executor.execute(new ClientWrapper(client)); 142 } catch (RejectedExecutionException ex) { 143 addError(client + ": connection dropped"); 144 client.close(); 145 } 146 } 147 } catch (InterruptedException ex) { 148 assert true; // ok... we'll shut down 149 } catch (Exception ex) { 150 addError("listener: " + ex); 151 } 152 153 setRunning(false); 154 addInfo("shutting down"); 155 listener.close(); 156 } 157 158 /** 159 * Configures a connected client. 160 * <p> 161 * A subclass implements this method to perform any necessary configuration of 162 * the client object before its {@link Client#run()} method is invoked. 163 * 164 * @param client the subject client 165 * @return {@code true} if configuration was successful; if the return value is 166 * {@code false} the client connection will be dropped 167 */ 168 protected abstract boolean configureClient(T client); 169 170 /** 171 * Adds a client to the collection of those being tracked by the server. 172 * 173 * @param client the client to add 174 */ 175 private void addClient(T client) { 176 clientsLock.lock(); 177 try { 178 clients.add(client); 179 } finally { 180 clientsLock.unlock(); 181 } 182 } 183 184 /** 185 * Removes a client from the collection of those being tracked by the server. 186 * 187 * @param client the client to remote 188 */ 189 private void removeClient(T client) { 190 clientsLock.lock(); 191 try { 192 clients.remove(client); 193 } finally { 194 clientsLock.unlock(); 195 } 196 } 197 198 /** 199 * A wrapper for a {@link Client} responsible for ensuring that client tracking 200 * is performed properly. 201 */ 202 private class ClientWrapper implements Client { 203 204 private final T delegate; 205 206 public ClientWrapper(T client) { 207 this.delegate = client; 208 } 209 210 public void run() { 211 addClient(delegate); 212 try { 213 delegate.run(); 214 } finally { 215 removeClient(delegate); 216 } 217 } 218 219 public void close() { 220 delegate.close(); 221 } 222 223 } 224 225}