001/**
002 * Logback: the reliable, generic, fast and flexible logging framework.
003 * Copyright (C) 1999-2015, QOS.ch. All rights reserved.
004 *
005 * This program and the accompanying materials are dual-licensed under
006 * either the terms of the Eclipse Public License v1.0 as published by
007 * the Eclipse Foundation
008 *
009 *   or (per the licensee's choosing)
010 *
011 * under the terms of the GNU Lesser General Public License version 2.1
012 * as published by the Free Software Foundation.
013 */
014package ch.qos.logback.core.net.server;
015
016import java.io.IOException;
017import java.util.ArrayList;
018import java.util.Collection;
019import java.util.concurrent.Executor;
020import java.util.concurrent.RejectedExecutionException;
021import java.util.concurrent.locks.Lock;
022import java.util.concurrent.locks.ReentrantLock;
023
024import ch.qos.logback.core.spi.ContextAwareBase;
025
026/**
027 * A concurrent {@link ServerRunner}.
028 * <p>
029 * An instance of this object is created with a {@link ServerListener} and an
030 * {@link java.util.concurrent.Executor Executor}. On invocation of the
031 * {@code start()} method, it passes itself to the given {@code Executor} and
032 * returns immediately. On invocation of its {@link #run()} method by the
033 * {@link Executor} it begins accepting client connections via its
034 * {@code ServerListener}. As each new {@link Client} is accepted, the client is
035 * configured with the runner's LoggingContext and is then passed to the {@code 
036 * Executor} for concurrent execution of the client's service loop.
037 * <p>
038 * On invocation of the {@link #stop()} method, the runner closes the listener
039 * and each of the connected clients (by invoking {@link Client#close()}
040 * effectively interrupting any blocked I/O calls and causing these concurrent
041 * subtasks to exit gracefully). This ensures that before the {@link #stop()}
042 * method returns (1) all I/O resources have been released and (2) all of the
043 * threads of the {@code Executor} are idle.
044 *
045 * @author Carl Harris
046 */
047public abstract class ConcurrentServerRunner<T extends Client> extends ContextAwareBase
048        implements Runnable, ServerRunner<T> {
049
050    private final Lock clientsLock = new ReentrantLock();
051
052    private final Collection<T> clients = new ArrayList<T>();
053
054    private final ServerListener<T> listener;
055    private final Executor executor;
056
057    private boolean running;
058
059    /**
060     * Constructs a new server runner.
061     * 
062     * @param listener the listener from which the server will accept new clients
063     * @param executor an executor that will facilitate execution of the listening
064     *                 and client-handling tasks; while any {@link Executor} is
065     *                 allowed here, outside of unit testing the only reasonable
066     *                 choice is a bounded thread pool of some kind.
067     */
068    public ConcurrentServerRunner(ServerListener<T> listener, Executor executor) {
069        this.listener = listener;
070        this.executor = executor;
071    }
072
073    /**
074     * {@inheritDoc}
075     */
076    public boolean isRunning() {
077        return running;
078    }
079
080    protected void setRunning(boolean running) {
081        this.running = running;
082    }
083
084    /**
085     * {@inheritDoc}
086     */
087    public void stop() throws IOException {
088        listener.close();
089        accept(new ClientVisitor<T>() {
090            public void visit(T client) {
091                client.close();
092            }
093        });
094    }
095
096    /**
097     * {@inheritDoc}
098     */
099    public void accept(ClientVisitor<T> visitor) {
100        Collection<T> clients = copyClients();
101        for (T client : clients) {
102            try {
103                visitor.visit(client);
104            } catch (RuntimeException ex) {
105                addError(client + ": " + ex);
106            }
107        }
108    }
109
110    /**
111     * Creates a copy of the collection of all clients that are presently being
112     * tracked by the server.
113     * 
114     * @return collection of client objects
115     */
116    private Collection<T> copyClients() {
117        clientsLock.lock();
118        try {
119            Collection<T> copy = new ArrayList<T>(clients);
120            return copy;
121        } finally {
122            clientsLock.unlock();
123        }
124    }
125
126    /**
127     * {@inheritDoc}
128     */
129    public void run() {
130        setRunning(true);
131        try {
132            addInfo("listening on " + listener);
133            while (!Thread.currentThread().isInterrupted()) {
134                T client = listener.acceptClient();
135                if (!configureClient(client)) {
136                    addError(client + ": connection dropped");
137                    client.close();
138                    continue;
139                }
140                try {
141                    executor.execute(new ClientWrapper(client));
142                } catch (RejectedExecutionException ex) {
143                    addError(client + ": connection dropped");
144                    client.close();
145                }
146            }
147        } catch (InterruptedException ex) {
148            assert true; // ok... we'll shut down
149        } catch (Exception ex) {
150            addError("listener: " + ex);
151        }
152
153        setRunning(false);
154        addInfo("shutting down");
155        listener.close();
156    }
157
158    /**
159     * Configures a connected client.
160     * <p>
161     * A subclass implements this method to perform any necessary configuration of
162     * the client object before its {@link Client#run()} method is invoked.
163     * 
164     * @param client the subject client
165     * @return {@code true} if configuration was successful; if the return value is
166     *         {@code false} the client connection will be dropped
167     */
168    protected abstract boolean configureClient(T client);
169
170    /**
171     * Adds a client to the collection of those being tracked by the server.
172     * 
173     * @param client the client to add
174     */
175    private void addClient(T client) {
176        clientsLock.lock();
177        try {
178            clients.add(client);
179        } finally {
180            clientsLock.unlock();
181        }
182    }
183
184    /**
185     * Removes a client from the collection of those being tracked by the server.
186     * 
187     * @param client the client to remote
188     */
189    private void removeClient(T client) {
190        clientsLock.lock();
191        try {
192            clients.remove(client);
193        } finally {
194            clientsLock.unlock();
195        }
196    }
197
198    /**
199     * A wrapper for a {@link Client} responsible for ensuring that client tracking
200     * is performed properly.
201     */
202    private class ClientWrapper implements Client {
203
204        private final T delegate;
205
206        public ClientWrapper(T client) {
207            this.delegate = client;
208        }
209
210        public void run() {
211            addClient(delegate);
212            try {
213                delegate.run();
214            } finally {
215                removeClient(delegate);
216            }
217        }
218
219        public void close() {
220            delegate.close();
221        }
222
223    }
224
225}