16
16
17
17
package io .grpc .servlet ;
18
18
19
+ import static com .google .common .base .Preconditions .checkArgument ;
20
+ import static com .google .common .base .Preconditions .checkNotNull ;
21
+ import static io .grpc .servlet .ServletServerStream .toHexString ;
22
+ import static java .util .logging .Level .FINE ;
23
+ import static java .util .logging .Level .FINEST ;
24
+
25
+ import io .grpc .BindableService ;
26
+ import io .grpc .Metadata ;
19
27
import io .grpc .internal .GrpcUtil ;
28
+ import io .grpc .internal .LogId ;
29
+ import io .grpc .internal .ReadableBuffers ;
20
30
import io .grpc .internal .ServerTransportListener ;
31
+ import io .grpc .internal .WritableBufferAllocator ;
32
+ import io .grpc .servlet .ServletServerStream .ByteArrayWritableBuffer ;
33
+ import io .grpc .servlet .ServletServerStream .WriteState ;
21
34
import java .io .IOException ;
35
+ import java .util .Arrays ;
36
+ import java .util .List ;
37
+ import java .util .Queue ;
38
+ import java .util .concurrent .ConcurrentLinkedDeque ;
39
+ import java .util .concurrent .ScheduledExecutorService ;
40
+ import java .util .concurrent .TimeUnit ;
41
+ import java .util .concurrent .atomic .AtomicReference ;
42
+ import java .util .logging .Level ;
43
+ import java .util .logging .Logger ;
22
44
import javax .annotation .PostConstruct ;
23
45
import javax .annotation .PreDestroy ;
46
+ import javax .servlet .AsyncContext ;
47
+ import javax .servlet .ReadListener ;
48
+ import javax .servlet .ServletContext ;
49
+ import javax .servlet .ServletInputStream ;
50
+ import javax .servlet .ServletOutputStream ;
51
+ import javax .servlet .WriteListener ;
24
52
import javax .servlet .http .HttpServletRequest ;
25
53
import javax .servlet .http .HttpServletResponse ;
26
54
27
55
/**
28
56
* An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server
29
57
* process it, and transforms the gRPC response into {@link HttpServletResponse}. An adapter can be
30
- * instantiated by {@link Factory#create}. The gRPC server is built from the ServerBuilder provided
31
- * in {@link Factory#create}.
58
+ * instantiated by {@link Factory#create}.
32
59
*
33
60
* <p>In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside {@link
34
61
* javax.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet
35
62
* backed by the gRPC server associated with the adapter. The servlet must support Asynchronous
36
63
* Processing and must be deployed to a container that supports servlet 4.0 and enables HTTP/2.
37
64
*/
38
- public interface ServletAdapter {
65
+ public final class ServletAdapter {
66
+
67
+ static final Logger logger = Logger .getLogger (ServletServerStream .class .getName ());
68
+
69
+ private final ServerTransportListener transportListener ;
70
+ private final ScheduledExecutorService scheduler ;
71
+
72
+ ServletAdapter (
73
+ ServerTransportListener transportListener , ScheduledExecutorService scheduler ) {
74
+ this .transportListener = transportListener ;
75
+ this .scheduler = checkNotNull (scheduler , "scheduler" );
76
+ }
39
77
40
78
/**
41
- * Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest,
42
- * HttpServletResponse)} to serve gRPC POST request.
79
+ * Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest,
80
+ * HttpServletResponse)} to serve gRPC GET request.
81
+ *
82
+ * <p>Note that in rare case gRPC client sends GET requests.
43
83
*
44
84
* <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
45
85
* calling {@code resp.setBufferSize()} before invocation is allowed.
46
86
*/
47
- void doPost (HttpServletRequest req , HttpServletResponse resp ) throws IOException ;
87
+ public void doGet (HttpServletRequest req , HttpServletResponse resp ) throws IOException {
88
+ // TODO
89
+ }
48
90
49
91
/**
50
- * Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest,
51
- * HttpServletResponse)} to serve gRPC GET request.
52
- *
53
- * <p>Note that in rare case gRPC client sends GET requests.
92
+ * Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest,
93
+ * HttpServletResponse)} to serve gRPC POST request.
54
94
*
55
95
* <p>Do not modify {@code req} and {@code resp} before or after calling this method. However,
56
96
* calling {@code resp.setBufferSize()} before invocation is allowed.
57
97
*/
58
- void doGet (HttpServletRequest req , HttpServletResponse resp ) throws IOException ;
98
+ public void doPost (HttpServletRequest req , HttpServletResponse resp ) throws IOException {
99
+ checkArgument (req .isAsyncSupported (), "servlet does not support asynchronous operation" );
100
+ checkArgument (ServletAdapter .isGrpc (req ), "req is not a gRPC request" );
101
+
102
+ LogId logId = LogId .allocate (getClass ().getName ());
103
+ logger .log (FINE , "[{0}] RPC started" , logId );
104
+
105
+ String method = req .getRequestURI ().substring (1 ); // remove the leading "/"
106
+ Metadata headers = new Metadata ();
107
+
108
+ AtomicReference <WriteState > writeState = new AtomicReference <>(WriteState .DEFAULT );
109
+ AsyncContext asyncCtx = req .startAsync ();
110
+
111
+ ServletOutputStream output = asyncCtx .getResponse ().getOutputStream ();
112
+
113
+ WritableBufferAllocator bufferAllocator =
114
+ capacityHint -> new ByteArrayWritableBuffer (capacityHint );
115
+
116
+ /*
117
+ * The concurrency for pushing and polling on the writeChain is handled by the WriteState state
118
+ * machine, not by the thread-safety of ConcurrentLinkedDeque. Actually the thread-safety of
119
+ * ConcurrentLinkedDeque alone is neither sufficient nor necessary. A plain singly-linked queue
120
+ * would also work with WriteState, but java library only has ConcurrentLinkedDeque.
121
+ */
122
+ Queue <ByteArrayWritableBuffer > writeChain = new ConcurrentLinkedDeque <>();
123
+
124
+ ServletServerStream stream = new ServletServerStream (
125
+ bufferAllocator , asyncCtx , writeState , writeChain , scheduler , logId );
126
+ transportListener .streamCreated (stream , method , headers );
127
+ stream .transportState ().onStreamAllocated ();
128
+
129
+ output .setWriteListener (
130
+ new WriteListener () {
131
+ @ Override
132
+ public void onWritePossible () throws IOException {
133
+ logger .log (FINE , "[{0}] onWritePossible" , logId );
134
+
135
+ WriteState curState = writeState .get ();
136
+ // curState.stillWritePossible should have been set to false already or right now
137
+ while (curState .stillWritePossible ) {
138
+ // it's very unlikely this happens due to a race condition
139
+ Thread .yield ();
140
+ curState = writeState .get ();
141
+ }
142
+
143
+ boolean isReady ;
144
+ while ((isReady = output .isReady ())) {
145
+ curState = writeState .get ();
146
+
147
+ ByteArrayWritableBuffer buffer = writeChain .poll ();
148
+ if (buffer != null ) {
149
+ if (buffer == ByteArrayWritableBuffer .FLUSH ) {
150
+ resp .flushBuffer ();
151
+ } else {
152
+ output .write (buffer .bytes , 0 , buffer .readableBytes ());
153
+ stream .transportState ().onSentBytes (buffer .readableBytes ());
154
+
155
+ if (logger .isLoggable (Level .FINEST )) {
156
+ logger .log (
157
+ Level .FINEST ,
158
+ "[{0}] outbound data: length = {1}, bytes = {2}" ,
159
+ new Object []{
160
+ logId , buffer .readableBytes (),
161
+ toHexString (buffer .bytes , buffer .readableBytes ())});
162
+ }
163
+ }
164
+ continue ;
165
+ }
166
+
167
+ if (writeState .compareAndSet (curState , curState .withStillWritePossible (true ))) {
168
+ logger .log (FINEST , "[{0}] set stillWritePossible to true" , logId );
169
+ // state has not changed since. It's possible a new entry is just enqueued into the
170
+ // writeChain, but this case is handled right after the enqueuing
171
+ break ;
172
+ } // else state changed by another thread, need to drain the writeChain again
173
+ }
174
+
175
+ if (isReady && writeState .get ().trailersSent ) {
176
+ asyncContextComplete (asyncCtx , scheduler );
177
+
178
+ logger .log (FINE , "[{0}] onWritePossible: call complete" , logId );
179
+ }
180
+ }
181
+
182
+ @ Override
183
+ public void onError (Throwable t ) {
184
+ // TODO
185
+ t .printStackTrace ();
186
+ }
187
+ });
188
+
189
+ ServletInputStream input = asyncCtx .getRequest ().getInputStream ();
190
+ input .setReadListener (
191
+ new ReadListener () {
192
+ volatile boolean allDataRead ;
193
+ final byte [] buffer = new byte [4 * 1024 ];
194
+
195
+ @ Override
196
+ public void onDataAvailable () throws IOException {
197
+ logger .log (FINE , "[{0}] onDataAvailable" , logId );
198
+ while (input .isReady ()) {
199
+ int length = input .read (buffer );
200
+ if (length == -1 ) {
201
+ logger .log (FINEST , "[{0}] inbound data: read end of stream" , logId );
202
+ return ;
203
+ } else {
204
+ if (logger .isLoggable (FINEST )) {
205
+ logger .log (
206
+ FINEST ,
207
+ "[{0}] inbound data: length = {1}, bytes = {2}" ,
208
+ new Object []{logId , length , toHexString (buffer , length )});
209
+ }
210
+
211
+ stream
212
+ .transportState ()
213
+ .inboundDataReceived (
214
+ ReadableBuffers .wrap (Arrays .copyOf (buffer , length )), false );
215
+ }
216
+ }
217
+ }
218
+
219
+ @ SuppressWarnings ("FutureReturnValueIgnored" )
220
+ @ Override
221
+ public void onAllDataRead () {
222
+ logger .log (FINE , "[{0}] onAllDataRead" , logId );
223
+ if (input .isFinished () && !allDataRead ) {
224
+ allDataRead = true ;
225
+ ServletContext servletContext = asyncCtx .getRequest ().getServletContext ();
226
+ if (servletContext != null
227
+ && servletContext .getServerInfo ().contains ("GlassFish Server" )
228
+ && servletContext .getServerInfo ().contains ("5.0" )) {
229
+ // Glassfish workaround only:
230
+ // otherwise client may flakily fail with "INTERNAL: Half-closed without a request"
231
+ // for server streaming
232
+ scheduler .schedule (
233
+ () ->
234
+ stream
235
+ .transportState ()
236
+ .inboundDataReceived (ReadableBuffers .wrap (new byte [] {}), true ),
237
+ 1 ,
238
+ TimeUnit .MILLISECONDS );
239
+ } else {
240
+ stream
241
+ .transportState ()
242
+ .inboundDataReceived (ReadableBuffers .wrap (new byte [] {}), true );
243
+ }
244
+ }
245
+ }
246
+
247
+ @ Override
248
+ public void onError (Throwable t ) {
249
+ // TODO
250
+ t .printStackTrace ();
251
+ }
252
+ });
253
+ }
59
254
60
255
/**
61
256
* Call this method before the adapter is in use.
62
257
*/
63
258
@ PostConstruct
64
- default void init () {}
259
+ public void init () {}
65
260
66
261
/**
67
262
* Call this method when the adapter is no longer need.
68
263
*/
69
264
@ PreDestroy
70
- void destroy ();
265
+ public void destroy () {
266
+ transportListener .transportTerminated ();
267
+ }
268
+
269
+ @ SuppressWarnings ("FutureReturnValueIgnored" )
270
+ static void asyncContextComplete (AsyncContext asyncContext , ScheduledExecutorService scheduler ) {
271
+ ServletContext servletContext = asyncContext .getRequest ().getServletContext ();
272
+ if (servletContext != null
273
+ && servletContext .getServerInfo ().contains ("GlassFish Server Open Source Edition 5.0" )) {
274
+ // Glassfish workaround only:
275
+ // otherwise client may receive Encountered end-of-stream mid-frame for
276
+ // server/bidi streaming
277
+ scheduler .schedule (() -> asyncContext .complete (), 100 , TimeUnit .MILLISECONDS );
278
+ return ;
279
+ }
280
+
281
+ asyncContext .complete ();
282
+ }
71
283
72
284
/**
73
285
* Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client.
74
286
*
75
287
* @return true if the request comes from a gRPC client
76
288
*/
77
- static boolean isGrpc (HttpServletRequest request ) {
289
+ public static boolean isGrpc (HttpServletRequest request ) {
78
290
return request .getContentType () != null
79
291
&& request .getContentType ().contains (GrpcUtil .CONTENT_TYPE_GRPC );
80
292
}
81
293
82
294
/** Factory of ServletAdapter. */
83
- final class Factory {
295
+ public static final class Factory {
296
+
297
+ private Factory () {}
84
298
85
299
/**
86
300
* Creates an instance of ServletAdapter. A gRPC server will be built and started with the given
@@ -89,7 +303,18 @@ final class Factory {
89
303
*/
90
304
public static ServletAdapter create (ServletServerBuilder serverBuilder ) {
91
305
ServerTransportListener listener = serverBuilder .buildAndStart ();
92
- return new ServletAdapterImpl (listener , serverBuilder .getScheduledExecutorService ());
306
+ return new ServletAdapter (listener , serverBuilder .getScheduledExecutorService ());
307
+ }
308
+
309
+ /**
310
+ * Creates an instance of ServletAdapter. A gRPC server with the given services and default
311
+ * settings will be built and started. The servlet using this servletAdapter will be backed by
312
+ * the gRPC server.
313
+ */
314
+ public static ServletAdapter create (List <? extends BindableService > services ) {
315
+ ServletServerBuilder serverBuilder = new ServletServerBuilder ();
316
+ services .forEach (service -> serverBuilder .addService (service ));
317
+ return create (serverBuilder );
93
318
}
94
319
}
95
320
}
0 commit comments