Skip to content

Commit 3fd4ca8

Browse files
committed
factor out write path so that easy to replace with other implementation
1 parent 986a885 commit 3fd4ca8

File tree

2 files changed

+269
-168
lines changed

2 files changed

+269
-168
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
/*
2+
* Copyright 2019 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc.servlet;
18+
19+
import static io.grpc.servlet.ServletServerStream.toHexString;
20+
import static java.util.logging.Level.FINE;
21+
import static java.util.logging.Level.FINEST;
22+
23+
import io.grpc.InternalLogId;
24+
import io.grpc.Status;
25+
import io.grpc.servlet.ServletServerStream.ServletTransportState;
26+
import java.io.IOException;
27+
import java.time.Duration;
28+
import java.util.Queue;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
30+
import java.util.concurrent.atomic.AtomicReference;
31+
import java.util.concurrent.locks.LockSupport;
32+
import java.util.logging.Logger;
33+
import javax.annotation.CheckReturnValue;
34+
import javax.annotation.Nullable;
35+
import javax.servlet.AsyncContext;
36+
import javax.servlet.ServletOutputStream;
37+
38+
/** Handles write actions from the container thread and the application thread. */
39+
final class AsyncServletOutputStreamWriter {
40+
41+
private static final Logger logger =
42+
Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
43+
44+
/**
45+
* Memory boundary for write actions.
46+
*
47+
* <pre>
48+
* WriteState curState = writeState.get(); // mark a boundary
49+
* doSomething(); // do something within the boundary
50+
* boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
51+
* if (successful) {
52+
* // state has not changed since
53+
* return;
54+
* } else {
55+
* // state is changed by another thread while doSomething(), need recompute
56+
* }
57+
* </pre>
58+
*
59+
* <p>There are two threads, the container thread (calling {@code onWritePossible()}) and the
60+
* application thread (calling {@code runOrBufferActionItem()}) that read and update the
61+
* writeState. Only onWritePossible() may turn readyAndEmpty from false to true, and only
62+
* runOrBufferActionItem() may turn it from true to false.
63+
*/
64+
private final AtomicReference<WriteState> writeState = new AtomicReference<>(WriteState.DEFAULT);
65+
66+
private final ServletOutputStream outputStream;
67+
private final ServletTransportState transportState;
68+
private final InternalLogId logId;
69+
private final ActionItem flushAction;
70+
private final ActionItem completeAction;
71+
72+
/**
73+
* New write actions will be buffered into this queue if the servlet output stream is not ready or
74+
* the queue is not drained.
75+
*/
76+
// SPSC queue would do
77+
private final Queue<ActionItem> writeChain = new ConcurrentLinkedQueue<>();
78+
// for a theoretical race condition that onWritePossible() is called immediately after isReady()
79+
// returns false and before writeState.compareAndSet()
80+
@Nullable
81+
private volatile Thread parkingThread;
82+
83+
AsyncServletOutputStreamWriter(
84+
AsyncContext asyncContext,
85+
ServletOutputStream outputStream,
86+
ServletTransportState transportState,
87+
InternalLogId logId) {
88+
this.outputStream = outputStream;
89+
this.transportState = transportState;
90+
this.logId = logId;
91+
this.flushAction = () -> {
92+
logger.log(FINEST, "[{0}] flushBuffer", logId);
93+
asyncContext.getResponse().flushBuffer();
94+
};
95+
this.completeAction = () -> {
96+
logger.log(FINE, "[{0}] call is completing", logId);
97+
transportState.runOnTransportThread(
98+
() -> {
99+
transportState.complete();
100+
asyncContext.complete();
101+
logger.log(FINE, "[{0}] call completed", logId);
102+
});
103+
};
104+
}
105+
106+
/** Called from application thread. */
107+
void writeBytes(byte[] bytes, int numBytes) throws IOException {
108+
runOrBufferActionItem(
109+
// write bytes action
110+
() -> {
111+
outputStream.write(bytes, 0, numBytes);
112+
transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
113+
if (logger.isLoggable(FINEST)) {
114+
logger.log(
115+
FINEST,
116+
"[{0}] outbound data: length = {1}, bytes = {2}",
117+
new Object[]{logId, numBytes, toHexString(bytes, numBytes)});
118+
}
119+
});
120+
}
121+
122+
/** Called from application thread. */
123+
void flush() throws IOException {
124+
runOrBufferActionItem(flushAction);
125+
}
126+
127+
/** Called from application thread. */
128+
void complete() {
129+
try {
130+
runOrBufferActionItem(completeAction);
131+
} catch (IOException e) {
132+
// actually completeAction does not throw
133+
throw Status.fromThrowable(e).asRuntimeException();
134+
}
135+
}
136+
137+
/** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
138+
void onWritePossible() throws IOException {
139+
logger.log(
140+
FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId);
141+
assureReadyAndEmptyFalse();
142+
while (outputStream.isReady()) {
143+
WriteState curState = writeState.get();
144+
145+
ActionItem actionItem = writeChain.poll();
146+
if (actionItem != null) {
147+
actionItem.run();
148+
continue;
149+
}
150+
151+
if (writeState.compareAndSet(curState, curState.withReadyAndEmpty(true))) {
152+
// state has not changed since.
153+
logger.log(
154+
FINEST,
155+
"[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output"
156+
+ " stream is still ready",
157+
logId);
158+
return;
159+
}
160+
// else, state changed by another thread (runOrBufferActionItem), need to drain the writeChain
161+
// again
162+
}
163+
logger.log(
164+
FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId);
165+
}
166+
167+
private void runOrBufferActionItem(ActionItem actionItem) throws IOException {
168+
WriteState curState = writeState.get();
169+
if (curState.readyAndEmpty) { // write to the outputStream directly
170+
actionItem.run();
171+
if (!outputStream.isReady()) {
172+
logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId);
173+
boolean successful = writeState.compareAndSet(curState, curState.withReadyAndEmpty(false));
174+
assert successful;
175+
LockSupport.unpark(parkingThread);
176+
}
177+
} else { // buffer to the writeChain
178+
writeChain.offer(actionItem);
179+
if (!writeState.compareAndSet(curState, curState.newItemBuffered())) {
180+
// state changed by another thread (onWritePossible)
181+
assert writeState.get().readyAndEmpty;
182+
ActionItem lastItem = writeChain.poll();
183+
if (lastItem != null) {
184+
assert lastItem == actionItem;
185+
runOrBufferActionItem(lastItem);
186+
}
187+
} // state has not changed since
188+
}
189+
}
190+
191+
private void assureReadyAndEmptyFalse() {
192+
// readyAndEmpty should have been set to false already or right now
193+
// It's very very unlikely readyAndEmpty is still true due to a race condition
194+
while (writeState.get().readyAndEmpty) {
195+
parkingThread = Thread.currentThread();
196+
LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
197+
}
198+
parkingThread = null;
199+
}
200+
201+
/** Write actions, e.g. writeBytes, flush, complete. */
202+
@FunctionalInterface
203+
private interface ActionItem {
204+
void run() throws IOException;
205+
}
206+
207+
private static final class WriteState {
208+
209+
static final WriteState DEFAULT = new WriteState(false);
210+
211+
/**
212+
* The servlet output stream is ready and the writeChain is empty.
213+
*
214+
* <p>readyAndEmpty turns from false to true when:
215+
* {@code onWritePossible()} exits while currently there is no more data to write, but the last
216+
* check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
217+
*
218+
* <p>readyAndEmpty turns from false to true when:
219+
* {@code runOrBufferActionItem()} exits while either the action item is written directly to the
220+
* servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
221+
* right after that returns false, or the action item is buffered into the writeChain.
222+
*/
223+
final boolean readyAndEmpty;
224+
225+
WriteState(boolean readyAndEmpty) {
226+
this.readyAndEmpty = readyAndEmpty;
227+
}
228+
229+
/**
230+
* Only {@code onWritePossible()} can set readyAndEmpty to true, and only {@code
231+
* runOrBufferActionItem()} can set it to false.
232+
*/
233+
@CheckReturnValue
234+
WriteState withReadyAndEmpty(boolean readyAndEmpty) {
235+
return new WriteState(readyAndEmpty);
236+
}
237+
238+
/** Only {@code runOrBufferActionItem()} can call it, and will set readyAndEmpty to false. */
239+
@CheckReturnValue
240+
WriteState newItemBuffered() {
241+
return new WriteState(false);
242+
}
243+
}
244+
}

0 commit comments

Comments
 (0)