Skip to content

Commit b2be863

Browse files
committed
factor out write path so that easy to replace with other implementation
1 parent 986a885 commit b2be863

File tree

2 files changed

+265
-168
lines changed

2 files changed

+265
-168
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/*
2+
* Copyright 2019 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.servlet;
18+
19+
import static io.grpc.servlet.ServletServerStream.toHexString;
20+
import static java.util.logging.Level.FINE;
21+
import static java.util.logging.Level.FINEST;
22+
23+
import io.grpc.InternalLogId;
24+
import io.grpc.Status;
25+
import io.grpc.servlet.ServletServerStream.ServletTransportState;
26+
import java.io.IOException;
27+
import java.time.Duration;
28+
import java.util.Queue;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.LockSupport;
32+
import java.util.logging.Logger;
33+
import javax.annotation.CheckReturnValue;
34+
import javax.servlet.AsyncContext;
35+
import javax.servlet.ServletOutputStream;
36+
37+
/** Handles write actions from the container thread and the application thread. */
38+
final class AsyncServletOutputStreamWriter {
39+
40+
private static final Logger logger =
41+
Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
42+
43+
/**
44+
* Memory boundary for write actions.
45+
*
46+
* <pre>
47+
* WriteState curState = writeState.get(); // mark a boundary
48+
* doSomething(); // do something within the boundary
49+
* boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
50+
* if (successful) {
51+
* // state has not changed since
52+
* return;
53+
* } else {
54+
* // state is changed by another thread while doSomething(), need recompute
55+
* }
56+
* </pre>
57+
*
58+
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
59+
* application thread (calling {@code runOrBufferActionItem()}) that reads and updates the
60+
* writeState. Only onWritePossible() may turn readyAndEmpty from false to true, and only
61+
* runOrBufferActionItem() may turn it from true to false.
62+
*/
63+
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
64+
65+
private final ServletOutputStream outputStream;
66+
private final ServletTransportState transportState;
67+
private final InternalLogId logId;
68+
private final ActionItem flushAction;
69+
private final ActionItem completeAction;
70+
71+
/**
72+
* New write actions will be buffered into this queue if the servlet output stream is not ready or
73+
* the queue is not drained.
74+
*/
75+
// SPSC queue would do
76+
private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
77+
private volatile Thread parkingThread;
78+
79+
AsyncServletOutputStreamWriter(
80+
AsyncContext asyncContext,
81+
ServletOutputStream outputStream,
82+
ServletTransportState transportState,
83+
InternalLogId logId) {
84+
this.outputStream = outputStream;
85+
this.transportState = transportState;
86+
this.logId = logId;
87+
this.flushAction = () -> {
88+
logger.log(FINEST, "[{0}] flushBuffer", logId);
89+
asyncContext.getResponse().flushBuffer();
90+
};
91+
this.completeAction = () -> {
92+
logger.log(FINE, "[{0}] call is completing", logId);
93+
transportState.runOnTransportThread(
94+
() -> {
95+
transportState.complete();
96+
asyncContext.complete();
97+
logger.log(FINE, "[{0}] call completed", logId);
98+
});
99+
};
100+
}
101+
102+
/** Called from application thread. */
103+
void writeBytes(byte[] bytes, int numBytes) throws IOException {
104+
runOrBufferActionItem(
105+
// write bytes action
106+
() -> {
107+
outputStream.write(bytes, 0, numBytes);
108+
transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
109+
if (logger.isLoggable(FINEST)) {
110+
logger.log(
111+
FINEST,
112+
"[{0}] outbound data: length = {1}, bytes = {2}",
113+
new Object[]{logId, numBytes, toHexString(bytes, numBytes)});
114+
}
115+
});
116+
}
117+
118+
/** Called from application thread. */
119+
void flush() throws IOException {
120+
runOrBufferActionItem(flushAction);
121+
}
122+
123+
/** Called from application thread. */
124+
void complete() {
125+
try {
126+
runOrBufferActionItem(completeAction);
127+
} catch (IOException e) {
128+
// actually completeAction does not throw
129+
throw Status.fromThrowable(e).asRuntimeException();
130+
}
131+
}
132+
133+
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
134+
void onWritePossible() throws IOException {
135+
logger.log(
136+
FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId);
137+
assureReadyAndEmptyFalse();
138+
while (outputStream.isReady()) {
139+
WriteState curState = writeState.get();
140+
141+
ActionItem actionItem = writeChain.poll();
142+
if (actionItem != null) {
143+
actionItem.run();
144+
continue;
145+
}
146+
147+
if (writeState.compareAndSet(curState, curState.withReadyAndEmpty(true))) {
148+
// state has not changed since.
149+
logger.log(
150+
FINEST,
151+
"[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output"
152+
+ " stream is still ready",
153+
logId);
154+
return;
155+
}
156+
// else, state changed by another thread (runOrBufferActionItem), need to drain the writeChain
157+
// again
158+
}
159+
logger.log(
160+
FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId);
161+
}
162+
163+
private void runOrBufferActionItem(ActionItem actionItem) throws IOException {
164+
WriteState curState = writeState.get();
165+
if (curState.readyAndEmpty) { // write to the outputStream directly
166+
actionItem.run();
167+
if (!outputStream.isReady()) {
168+
logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId);
169+
boolean successful = writeState.compareAndSet(curState, curState.withReadyAndEmpty(false));
170+
assert successful;
171+
LockSupport.unpark(parkingThread);
172+
}
173+
} else { // buffer to the writeChain
174+
writeChain.offer(actionItem);
175+
if (!writeState.compareAndSet(curState, curState.newItemBuffered())) {
176+
// state changed by another thread (onWritePossible)
177+
assert writeState.get().readyAndEmpty;
178+
ActionItem lastItem = writeChain.poll();
179+
if (lastItem != null) {
180+
assert lastItem == actionItem;
181+
runOrBufferActionItem(lastItem);
182+
}
183+
} // state has not changed since
184+
}
185+
}
186+
187+
private void assureReadyAndEmptyFalse() {
188+
// readyAndEmpty should have been set to false already or right now
189+
// It's very very unlikely readyAndEmpty is still true due to a race condition
190+
while (writeState.get().readyAndEmpty) {
191+
parkingThread = Thread.currentThread();
192+
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
193+
}
194+
parkingThread = null;
195+
}
196+
197+
/** Write actions, e.g. writeBytes, flush, complete. */
198+
@FunctionalInterface
199+
private interface ActionItem {
200+
void run() throws IOException;
201+
}
202+
203+
private static final class WriteState {
204+
205+
static final WriteState DEFAULT = new WriteState(false);
206+
207+
/**
208+
* The servlet output stream is ready and the writeChain is empty.
209+
*
210+
* <p>readyAndEmpty turns from false to true when:
211+
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
212+
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
213+
*
214+
* <p>readyAndEmpty turns from false to true when:
215+
* {@code runOrBufferActionItem()} exits while either the action item is written directly to the
216+
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
217+
* right after that returns false, or the action item is buffered into the writeChain.
218+
*/
219+
final boolean readyAndEmpty;
220+
221+
WriteState(boolean readyAndEmpty) {
222+
this.readyAndEmpty = readyAndEmpty;
223+
}
224+
225+
/**
226+
* Only {@code onWritePossible()} can set readyAndEmpty to true, and only {@code
227+
* runOrBufferActionItem()} can set it to false.
228+
*/
229+
@CheckReturnValue
230+
WriteState withReadyAndEmpty(boolean readyAndEmpty) {
231+
return new WriteState(readyAndEmpty);
232+
}
233+
234+
/** Only {@code runOrBufferActionItem()} can call it, and will set readyAndEmpty to false. */
235+
@CheckReturnValue
236+
WriteState newItemBuffered() {
237+
return new WriteState(false);
238+
}
239+
}
240+
}

0 commit comments

Comments
 (0)