1
2
3
4
5
6
7
8
9
10
11
12
13
14 package ch.qos.logback.core.net.server;
15
16 import java.io.IOException;
17 import java.io.ObjectOutputStream;
18 import java.io.OutputStream;
19 import java.io.Serializable;
20 import java.net.Socket;
21 import java.net.SocketException;
22 import java.util.concurrent.BlockingQueue;
23
24 import ch.qos.logback.core.CoreConstants;
25 import ch.qos.logback.core.spi.ContextAwareBase;
26 import ch.qos.logback.core.util.CloseUtil;
27
28
29
30
31
32
33
34 class RemoteReceiverStreamClient extends ContextAwareBase implements RemoteReceiverClient {
35
36 private final String clientId;
37 private final Socket socket;
38 private final OutputStream outputStream;
39
40 private BlockingQueue<Serializable> queue;
41
42
43
44
45
46
47
48 public RemoteReceiverStreamClient(String id, Socket socket) {
49 this.clientId = "client " + id + ": ";
50 this.socket = socket;
51 this.outputStream = null;
52 }
53
54
55
56
57
58
59
60
61
62
63 RemoteReceiverStreamClient(String id, OutputStream outputStream) {
64 this.clientId = "client " + id + ": ";
65 this.socket = null;
66 this.outputStream = outputStream;
67 }
68
69
70
71
72 public void setQueue(BlockingQueue<Serializable> queue) {
73 this.queue = queue;
74 }
75
76
77
78
79 public boolean offer(Serializable event) {
80 if (queue == null) {
81 throw new IllegalStateException("client has no event queue");
82 }
83 return queue.offer(event);
84 }
85
86
87
88
89 public void close() {
90 if (socket == null)
91 return;
92 CloseUtil.closeQuietly(socket);
93 }
94
95
96
97
98 public void run() {
99 addInfo(clientId + "connected");
100
101 ObjectOutputStream oos = null;
102 try {
103 int counter = 0;
104 oos = createObjectOutputStream();
105 while (!Thread.currentThread().isInterrupted()) {
106 try {
107 Serializable event = queue.take();
108 oos.writeObject(event);
109 oos.flush();
110 if (++counter >= CoreConstants.OOS_RESET_FREQUENCY) {
111
112
113 counter = 0;
114 oos.reset();
115 }
116 } catch (InterruptedException ex) {
117 Thread.currentThread().interrupt();
118 }
119 }
120 } catch (SocketException ex) {
121 addInfo(clientId + ex);
122 } catch (IOException ex) {
123 addError(clientId + ex);
124 } catch (RuntimeException ex) {
125 addError(clientId + ex);
126 } finally {
127 if (oos != null) {
128 CloseUtil.closeQuietly(oos);
129 }
130 close();
131 addInfo(clientId + "connection closed");
132 }
133 }
134
135 private ObjectOutputStream createObjectOutputStream() throws IOException {
136 if (socket == null) {
137 return new ObjectOutputStream(outputStream);
138 }
139 return new ObjectOutputStream(socket.getOutputStream());
140 }
141 }