diff --git a/RELEASING.md b/RELEASING.md
index f610ab3d6bd..0c7eb846e01 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -47,6 +47,7 @@ $ VERSION_FILES=(
examples/example-gauth/pom.xml
examples/example-kotlin/build.gradle
examples/example-kotlin/android/helloworld/app/build.gradle
+ examples/example-servlet/build.gradle
examples/example-tls/build.gradle
examples/example-tls/pom.xml
)
diff --git a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
index c53c79a564e..3698208946f 100644
--- a/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
+++ b/core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java
@@ -226,7 +226,7 @@ protected void setDeadlineTicker(Deadline.Ticker ticker) {
}
@Override
- public final Server build() {
+ public Server build() {
ServerImpl server = new ServerImpl(
this,
buildTransportServers(getTracerFactories()),
diff --git a/examples/example-servlet/build.gradle b/examples/example-servlet/build.gradle
new file mode 100644
index 00000000000..650193d4bd6
--- /dev/null
+++ b/examples/example-servlet/build.gradle
@@ -0,0 +1,46 @@
+plugins {
+ // ASSUMES GRADLE 2.12 OR HIGHER. Use plugin version 0.7.5 with earlier gradle versions
+ id 'com.google.protobuf' version '0.8.8'
+ // Generate IntelliJ IDEA's .idea & .iml project files
+ id 'idea'
+ id 'war'
+}
+
+repositories {
+ maven { // The google mirror is less flaky than mavenCentral()
+ url "https://maven-central.storage-download.googleapis.com/repos/central/data/" }
+ mavenLocal()
+}
+
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+def grpcVersion = '1.24.0-SNAPSHOT' // CURRENT_GRPC_VERSION
+def protocVersion = '3.9.0'
+
+dependencies {
+ implementation "io.grpc:grpc-protobuf:${grpcVersion}",
+ "io.grpc:grpc-servlet:${grpcVersion}",
+ "io.grpc:grpc-stub:${grpcVersion}"
+
+ providedCompile "javax.annotation:javax.annotation-api:1.2",
+ "javax.servlet:javax.servlet-api:4.0.1"
+}
+
+protobuf {
+ protoc { artifact = "com.google.protobuf:protoc:${protocVersion}" }
+ plugins { grpc { artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" } }
+ generateProtoTasks {
+ all()*.plugins { grpc {} }
+ }
+}
+
+// Inform IDEs like IntelliJ IDEA, Eclipse or NetBeans about the generated code.
+sourceSets {
+ main {
+ java {
+ srcDirs 'build/generated/source/proto/main/grpc'
+ srcDirs 'build/generated/source/proto/main/java'
+ }
+ }
+}
diff --git a/examples/example-servlet/settings.gradle b/examples/example-servlet/settings.gradle
new file mode 100644
index 00000000000..59ef05d47dd
--- /dev/null
+++ b/examples/example-servlet/settings.gradle
@@ -0,0 +1,8 @@
+pluginManagement {
+ repositories {
+ maven { // The google mirror is less flaky than mavenCentral()
+ url "https://maven-central.storage-download.googleapis.com/repos/central/data/"
+ }
+ gradlePluginPortal()
+ }
+}
diff --git a/examples/example-servlet/src/main/java/io/grpc/servlet/examples/helloworld/HelloWorldServlet.java b/examples/example-servlet/src/main/java/io/grpc/servlet/examples/helloworld/HelloWorldServlet.java
new file mode 100644
index 00000000000..a970c26a119
--- /dev/null
+++ b/examples/example-servlet/src/main/java/io/grpc/servlet/examples/helloworld/HelloWorldServlet.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.servlet.examples.helloworld;
+
+import io.grpc.stub.StreamObserver;
+import io.grpc.examples.helloworld.GreeterGrpc;
+import io.grpc.examples.helloworld.HelloReply;
+import io.grpc.examples.helloworld.HelloRequest;
+import io.grpc.servlet.ServletAdapter;
+import io.grpc.servlet.ServletServerBuilder;
+import java.io.IOException;
+import javax.servlet.annotation.WebServlet;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * A servlet that hosts a gRPC server over HTTP/2 and shares the resource URI for the normal servlet
+ * clients over HTTP/1.0+.
+ *
+ *
For creating a servlet that solely serves gRPC services, do not follow this example, simply
+ * extend or register a {@link io.grpc.servlet.GrpcServlet} instead.
+ */
+@WebServlet(urlPatterns = {"/helloworld.Greeter/SayHello"}, asyncSupported = true)
+public class HelloWorldServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ private final ServletAdapter servletAdapter =
+ new ServletServerBuilder().addService(new GreeterImpl()).buildServletAdapter();
+
+ private static final class GreeterImpl extends GreeterGrpc.GreeterImplBase {
+ GreeterImpl() {}
+
+ @Override
+ public void sayHello(HelloRequest req, StreamObserver responseObserver) {
+ HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
+ responseObserver.onNext(reply);
+ responseObserver.onCompleted();
+ }
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws IOException {
+ response.setContentType("text/html");
+ response.getWriter().println("Hello World!
");
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws IOException {
+ if (ServletAdapter.isGrpc(request)) {
+ servletAdapter.doPost(request, response);
+ } else {
+ response.setContentType("text/html");
+ response.getWriter().println("Hello non-gRPC client!
");
+ }
+ }
+
+ @Override
+ public void destroy() {
+ servletAdapter.destroy();
+ super.destroy();
+ }
+}
diff --git a/examples/example-servlet/src/main/proto/helloworld/helloworld.proto b/examples/example-servlet/src/main/proto/helloworld/helloworld.proto
new file mode 100644
index 00000000000..c60d9416f1f
--- /dev/null
+++ b/examples/example-servlet/src/main/proto/helloworld/helloworld.proto
@@ -0,0 +1,37 @@
+// Copyright 2015 The gRPC Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "io.grpc.examples.helloworld";
+option java_outer_classname = "HelloWorldProto";
+option objc_class_prefix = "HLW";
+
+package helloworld;
+
+// The greeting service definition.
+service Greeter {
+ // Sends a greeting
+ rpc SayHello (HelloRequest) returns (HelloReply) {}
+}
+
+// The request message containing the user's name.
+message HelloRequest {
+ string name = 1;
+}
+
+// The response message containing the greetings
+message HelloReply {
+ string message = 1;
+}
diff --git a/examples/example-servlet/src/main/webapp/WEB-INF/glassfish-web.xml b/examples/example-servlet/src/main/webapp/WEB-INF/glassfish-web.xml
new file mode 100644
index 00000000000..426162a9d13
--- /dev/null
+++ b/examples/example-servlet/src/main/webapp/WEB-INF/glassfish-web.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
diff --git a/examples/example-servlet/src/main/webapp/WEB-INF/jboss-web.xml b/examples/example-servlet/src/main/webapp/WEB-INF/jboss-web.xml
new file mode 100644
index 00000000000..9c83263e0c9
--- /dev/null
+++ b/examples/example-servlet/src/main/webapp/WEB-INF/jboss-web.xml
@@ -0,0 +1,9 @@
+
+
+
+ /
+
diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
index ae654754895..6d0615b5df6 100644
--- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
+++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java
@@ -215,10 +215,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
protected static final Empty EMPTY = Empty.getDefaultInstance();
- private void startServer() {
- ServerBuilder> builder = getServerBuilder();
+ private void configBuilder(@Nullable ServerBuilder> builder) {
if (builder == null) {
- server = null;
return;
}
testServiceExecutor = Executors.newScheduledThreadPool(2);
@@ -251,6 +249,13 @@ private void startServer() {
if (metricsExpected()) {
assertThat(builder).isInstanceOf(AbstractServerImplBuilder.class);
}
+ }
+
+ protected void startServer(@Nullable ServerBuilder> builder) {
+ if (builder == null) {
+ server = null;
+ return;
+ }
try {
server = builder.build().start();
} catch (IOException ex) {
@@ -303,7 +308,9 @@ public ClientCall interceptCall(
*/
@Before
public void setUp() {
- startServer();
+ ServerBuilder> builder = getServerBuilder();
+ configBuilder(builder);
+ startServer(builder);
channel = createChannel();
blockingStub =
diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java
index e66f28cac73..35d863e7ee8 100644
--- a/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/InternalNettyChannelBuilder.java
@@ -16,9 +16,11 @@
package io.grpc.netty;
+import com.google.common.annotations.VisibleForTesting;
import io.grpc.Internal;
import io.grpc.internal.ClientTransportFactory;
import io.grpc.internal.SharedResourcePool;
+import io.grpc.internal.TransportTracer;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
@@ -87,5 +89,11 @@ public static ClientTransportFactory buildTransportFactory(NettyChannelBuilder b
return builder.buildTransportFactory();
}
+ @VisibleForTesting
+ public static void setTransportTracerFactory(
+ NettyChannelBuilder builder, TransportTracer.Factory factory) {
+ builder.setTransportTracerFactory(factory);
+ }
+
private InternalNettyChannelBuilder() {}
}
diff --git a/servlet/build.gradle b/servlet/build.gradle
new file mode 100644
index 00000000000..93622b0af72
--- /dev/null
+++ b/servlet/build.gradle
@@ -0,0 +1,44 @@
+description = "gRPC: Servlet"
+sourceCompatibility = 1.8
+targetCompatibility = 1.8
+
+dependencies {
+ compile project(':grpc-core')
+ compileOnly 'javax.servlet:javax.servlet-api:4.0.1',
+ libraries.javax_annotation // java 9, 10 needs it
+
+ testCompile project(':grpc-stub'),
+ project(':grpc-protobuf'),
+ project(':grpc-servlet'),
+ project(':grpc-netty'),
+ project(':grpc-testing'),
+ project(':grpc-auth'),
+ project(':grpc-interop-testing'),
+ project(':grpc-core').sourceSets.test.output,
+ project(':grpc-netty').sourceSets.test.output,
+ libraries.junit,
+ 'io.undertow:undertow-servlet:2.0.22.Final',
+ 'org.apache.tomcat.embed:tomcat-embed-core:9.0.20'
+}
+
+// Jetty only works with Java 11
+if (JavaVersion.current().isJava11Compatible()) {
+ compileTestJava {
+ sourceCompatibility = "11"
+ targetCompatibility = "11"
+ }
+
+ def jettyVersion = '10.0.0-alpha0'
+ dependencies {
+ testCompile "org.eclipse.jetty:jetty-servlet:${jettyVersion}",
+ "org.eclipse.jetty.http2:http2-server:${jettyVersion}"
+ }
+} else {
+ sourceSets {
+ test {
+ java {
+ exclude '**/Jetty*Test.java'
+ }
+ }
+ }
+}
diff --git a/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java
new file mode 100644
index 00000000000..b4f38ddf386
--- /dev/null
+++ b/servlet/src/main/java/io/grpc/servlet/AsyncServletOutputStreamWriter.java
@@ -0,0 +1,244 @@
+/*
+ * Copyright 2019 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.servlet;
+
+import static io.grpc.servlet.ServletServerStream.toHexString;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.FINEST;
+
+import io.grpc.InternalLogId;
+import io.grpc.Status;
+import io.grpc.servlet.ServletServerStream.ServletTransportState;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+import java.util.logging.Logger;
+import javax.annotation.CheckReturnValue;
+import javax.annotation.Nullable;
+import javax.servlet.AsyncContext;
+import javax.servlet.ServletOutputStream;
+
+/** Handles write actions from the container thread and the application thread. */
+final class AsyncServletOutputStreamWriter {
+
+ private static final Logger logger =
+ Logger.getLogger(AsyncServletOutputStreamWriter.class.getName());
+
+ /**
+ * Memory boundary for write actions.
+ *
+ *
+ * WriteState curState = writeState.get(); // mark a boundary
+ * doSomething(); // do something within the boundary
+ * boolean successful = writeState.compareAndSet(curState, newState); // try to mark a boundary
+ * if (successful) {
+ * // state has not changed since
+ * return;
+ * } else {
+ * // state is changed by another thread while doSomething(), need recompute
+ * }
+ *
+ *
+ * There are two threads, the container thread (calling {@code onWritePossible()}) and the
+ * application thread (calling {@code runOrBufferActionItem()}) that read and update the
+ * writeState. Only onWritePossible() may turn readyAndEmpty from false to true, and only
+ * runOrBufferActionItem() may turn it from true to false.
+ */
+ private final AtomicReference writeState = new AtomicReference<>(WriteState.DEFAULT);
+
+ private final ServletOutputStream outputStream;
+ private final ServletTransportState transportState;
+ private final InternalLogId logId;
+ private final ActionItem flushAction;
+ private final ActionItem completeAction;
+
+ /**
+ * New write actions will be buffered into this queue if the servlet output stream is not ready or
+ * the queue is not drained.
+ */
+ // SPSC queue would do
+ private final Queue writeChain = new ConcurrentLinkedQueue<>();
+ // for a theoretical race condition that onWritePossible() is called immediately after isReady()
+ // returns false and before writeState.compareAndSet()
+ @Nullable
+ private volatile Thread parkingThread;
+
+ AsyncServletOutputStreamWriter(
+ AsyncContext asyncContext,
+ ServletOutputStream outputStream,
+ ServletTransportState transportState,
+ InternalLogId logId) {
+ this.outputStream = outputStream;
+ this.transportState = transportState;
+ this.logId = logId;
+ this.flushAction = () -> {
+ logger.log(FINEST, "[{0}] flushBuffer", logId);
+ asyncContext.getResponse().flushBuffer();
+ };
+ this.completeAction = () -> {
+ logger.log(FINE, "[{0}] call is completing", logId);
+ transportState.runOnTransportThread(
+ () -> {
+ transportState.complete();
+ asyncContext.complete();
+ logger.log(FINE, "[{0}] call completed", logId);
+ });
+ };
+ }
+
+ /** Called from application thread. */
+ void writeBytes(byte[] bytes, int numBytes) throws IOException {
+ runOrBufferActionItem(
+ // write bytes action
+ () -> {
+ outputStream.write(bytes, 0, numBytes);
+ transportState.runOnTransportThread(() -> transportState.onSentBytes(numBytes));
+ if (logger.isLoggable(FINEST)) {
+ logger.log(
+ FINEST,
+ "[{0}] outbound data: length = {1}, bytes = {2}",
+ new Object[]{logId, numBytes, toHexString(bytes, numBytes)});
+ }
+ });
+ }
+
+ /** Called from application thread. */
+ void flush() throws IOException {
+ runOrBufferActionItem(flushAction);
+ }
+
+ /** Called from application thread. */
+ void complete() {
+ try {
+ runOrBufferActionItem(completeAction);
+ } catch (IOException e) {
+ // actually completeAction does not throw
+ throw Status.fromThrowable(e).asRuntimeException();
+ }
+ }
+
+ /** Called from the container thread {@link javax.servlet.WriteListener#onWritePossible()}. */
+ void onWritePossible() throws IOException {
+ logger.log(
+ FINEST, "[{0}] onWritePossible: ENTRY. The servlet output stream becomes ready", logId);
+ assureReadyAndEmptyFalse();
+ while (outputStream.isReady()) {
+ WriteState curState = writeState.get();
+
+ ActionItem actionItem = writeChain.poll();
+ if (actionItem != null) {
+ actionItem.run();
+ continue;
+ }
+
+ if (writeState.compareAndSet(curState, curState.withReadyAndEmpty(true))) {
+ // state has not changed since.
+ logger.log(
+ FINEST,
+ "[{0}] onWritePossible: EXIT. All data available now is sent out and the servlet output"
+ + " stream is still ready",
+ logId);
+ return;
+ }
+ // else, state changed by another thread (runOrBufferActionItem), need to drain the writeChain
+ // again
+ }
+ logger.log(
+ FINEST, "[{0}] onWritePossible: EXIT. The servlet output stream becomes not ready", logId);
+ }
+
+ private void runOrBufferActionItem(ActionItem actionItem) throws IOException {
+ WriteState curState = writeState.get();
+ if (curState.readyAndEmpty) { // write to the outputStream directly
+ actionItem.run();
+ if (!outputStream.isReady()) {
+ logger.log(FINEST, "[{0}] the servlet output stream becomes not ready", logId);
+ boolean successful = writeState.compareAndSet(curState, curState.withReadyAndEmpty(false));
+ assert successful;
+ LockSupport.unpark(parkingThread);
+ }
+ } else { // buffer to the writeChain
+ writeChain.offer(actionItem);
+ if (!writeState.compareAndSet(curState, curState.newItemBuffered())) {
+ // state changed by another thread (onWritePossible)
+ assert writeState.get().readyAndEmpty;
+ ActionItem lastItem = writeChain.poll();
+ if (lastItem != null) {
+ assert lastItem == actionItem;
+ runOrBufferActionItem(lastItem);
+ }
+ } // state has not changed since
+ }
+ }
+
+ private void assureReadyAndEmptyFalse() {
+ // readyAndEmpty should have been set to false already or right now
+ // It's very very unlikely readyAndEmpty is still true due to a race condition
+ while (writeState.get().readyAndEmpty) {
+ parkingThread = Thread.currentThread();
+ LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
+ }
+ parkingThread = null;
+ }
+
+ /** Write actions, e.g. writeBytes, flush, complete. */
+ @FunctionalInterface
+ private interface ActionItem {
+ void run() throws IOException;
+ }
+
+ private static final class WriteState {
+
+ static final WriteState DEFAULT = new WriteState(false);
+
+ /**
+ * The servlet output stream is ready and the writeChain is empty.
+ *
+ * readyAndEmpty turns from false to true when:
+ * {@code onWritePossible()} exits while currently there is no more data to write, but the last
+ * check of {@link javax.servlet.ServletOutputStream#isReady()} is true.
+ *
+ *
readyAndEmpty turns from false to true when:
+ * {@code runOrBufferActionItem()} exits while either the action item is written directly to the
+ * servlet output stream and the check of {@link javax.servlet.ServletOutputStream#isReady()}
+ * right after that returns false, or the action item is buffered into the writeChain.
+ */
+ final boolean readyAndEmpty;
+
+ WriteState(boolean readyAndEmpty) {
+ this.readyAndEmpty = readyAndEmpty;
+ }
+
+ /**
+ * Only {@code onWritePossible()} can set readyAndEmpty to true, and only {@code
+ * runOrBufferActionItem()} can set it to false.
+ */
+ @CheckReturnValue
+ WriteState withReadyAndEmpty(boolean readyAndEmpty) {
+ return new WriteState(readyAndEmpty);
+ }
+
+ /** Only {@code runOrBufferActionItem()} can call it, and will set readyAndEmpty to false. */
+ @CheckReturnValue
+ WriteState newItemBuffered() {
+ return new WriteState(false);
+ }
+ }
+}
diff --git a/servlet/src/main/java/io/grpc/servlet/GrpcServlet.java b/servlet/src/main/java/io/grpc/servlet/GrpcServlet.java
new file mode 100644
index 00000000000..a73b1fdfe6d
--- /dev/null
+++ b/servlet/src/main/java/io/grpc/servlet/GrpcServlet.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.servlet;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.BindableService;
+import io.grpc.ExperimentalApi;
+import java.io.IOException;
+import java.util.List;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * A simple servlet backed by a gRPC server. Must set {@code asyncSupported} to true. The {@code
+ * /contextRoot/urlPattern} must match the gRPC services' path, which is
+ * "/full-service-name/short-method-name".
+ *
+ *
The API is experimental. The authors would like to know more about the real usecases. Users
+ * are welcome to provide feedback by commenting on
+ * the tracking issue.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
+public class GrpcServlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ private final ServletAdapter servletAdapter;
+
+ @VisibleForTesting
+ GrpcServlet(ServletAdapter servletAdapter) {
+ this.servletAdapter = servletAdapter;
+ }
+
+ /**
+ * Instantiate the servlet serving the given list of gRPC services. ServerInterceptors can be
+ * added on each gRPC service by {@link
+ * io.grpc.ServerInterceptors#intercept(BindableService, io.grpc.ServerInterceptor...)}
+ */
+ public GrpcServlet(List extends BindableService> bindableServices) {
+ this(loadServices(bindableServices));
+ }
+
+ private static ServletAdapter loadServices(List extends BindableService> bindableServices) {
+ ServletServerBuilder serverBuilder = new ServletServerBuilder();
+ bindableServices.forEach(serverBuilder::addService);
+ return serverBuilder.buildServletAdapter();
+ }
+
+ @Override
+ protected final void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws IOException {
+ servletAdapter.doGet(request, response);
+ }
+
+ @Override
+ protected final void doPost(HttpServletRequest request, HttpServletResponse response)
+ throws IOException {
+ servletAdapter.doPost(request, response);
+ }
+
+ @Override
+ public void destroy() {
+ servletAdapter.destroy();
+ super.destroy();
+ }
+}
diff --git a/servlet/src/main/java/io/grpc/servlet/ServletAdapter.java b/servlet/src/main/java/io/grpc/servlet/ServletAdapter.java
new file mode 100644
index 00000000000..4ca0e32df5f
--- /dev/null
+++ b/servlet/src/main/java/io/grpc/servlet/ServletAdapter.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.servlet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static io.grpc.internal.GrpcUtil.TIMEOUT_KEY;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.FINEST;
+
+import com.google.common.io.BaseEncoding;
+import io.grpc.Attributes;
+import io.grpc.ExperimentalApi;
+import io.grpc.Grpc;
+import io.grpc.InternalLogId;
+import io.grpc.InternalMetadata;
+import io.grpc.Metadata;
+import io.grpc.ServerStreamTracer;
+import io.grpc.Status;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.ReadableBuffers;
+import io.grpc.internal.ServerTransportListener;
+import io.grpc.internal.StatsTraceContext;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Enumeration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Logger;
+import javax.servlet.AsyncContext;
+import javax.servlet.AsyncEvent;
+import javax.servlet.AsyncListener;
+import javax.servlet.ReadListener;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+/**
+ * An adapter that transforms {@link HttpServletRequest} into gRPC request and lets a gRPC server
+ * process it, and transforms the gRPC response into {@link HttpServletResponse}. An adapter can be
+ * instantiated by {@link ServletServerBuilder#buildServletAdapter()}.
+ *
+ *
In a servlet, calling {@link #doPost(HttpServletRequest, HttpServletResponse)} inside {@link
+ * javax.servlet.http.HttpServlet#doPost(HttpServletRequest, HttpServletResponse)} makes the servlet
+ * backed by the gRPC server associated with the adapter. The servlet must support Asynchronous
+ * Processing and must be deployed to a container that supports servlet 4.0 and enables HTTP/2.
+ *
+ *
The API is experimental. The authors would like to know more about the real usecases. Users
+ * are welcome to provide feedback by commenting on
+ * the tracking issue.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
+public final class ServletAdapter {
+
+ static final Logger logger = Logger.getLogger(ServletAdapter.class.getName());
+
+ private final ServerTransportListener transportListener;
+ private final List extends ServerStreamTracer.Factory> streamTracerFactories;
+ private final int maxInboundMessageSize;
+ private final Attributes attributes;
+
+ ServletAdapter(
+ ServerTransportListener transportListener,
+ List extends ServerStreamTracer.Factory> streamTracerFactories,
+ int maxInboundMessageSize) {
+ this.transportListener = transportListener;
+ this.streamTracerFactories = streamTracerFactories;
+ this.maxInboundMessageSize = maxInboundMessageSize;
+ attributes = transportListener.transportReady(Attributes.EMPTY);
+ }
+
+ /**
+ * Call this method inside {@link javax.servlet.http.HttpServlet#doGet(HttpServletRequest,
+ * HttpServletResponse)} to serve gRPC GET request.
+ *
+ *
This method is currently not impelemented.
+ *
+ *
Note that in rare case gRPC client sends GET requests.
+ *
+ *
Do not modify {@code req} and {@code resp} before or after calling this method. However,
+ * calling {@code resp.setBufferSize()} before invocation is allowed.
+ */
+ public void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ // TODO(zdapeng)
+ }
+
+ /**
+ * Call this method inside {@link javax.servlet.http.HttpServlet#doPost(HttpServletRequest,
+ * HttpServletResponse)} to serve gRPC POST request.
+ *
+ *
Do not modify {@code req} and {@code resp} before or after calling this method. However,
+ * calling {@code resp.setBufferSize()} before invocation is allowed.
+ */
+ public void doPost(HttpServletRequest req, HttpServletResponse resp) throws IOException {
+ checkArgument(req.isAsyncSupported(), "servlet does not support asynchronous operation");
+ checkArgument(ServletAdapter.isGrpc(req), "the request is not a gRPC request");
+
+ InternalLogId logId = InternalLogId.allocate(ServletAdapter.class, null);
+ logger.log(FINE, "[{0}] RPC started", logId);
+
+ AsyncContext asyncCtx = req.startAsync(req, resp);
+
+ String method = req.getRequestURI().substring(1); // remove the leading "/"
+ Metadata headers = getHeaders(req);
+
+ if (logger.isLoggable(FINEST)) {
+ logger.log(FINEST, "[{0}] method: {1}", new Object[] {logId, method});
+ logger.log(FINEST, "[{0}] headers: {1}", new Object[] {logId, headers});
+ }
+
+ Long timeoutNanos = headers.get(TIMEOUT_KEY);
+ if (timeoutNanos == null) {
+ timeoutNanos = 0L;
+ }
+ asyncCtx.setTimeout(TimeUnit.NANOSECONDS.toMillis(timeoutNanos));
+ StatsTraceContext statsTraceCtx =
+ StatsTraceContext.newServerContext(streamTracerFactories, method, headers);
+
+ ServletServerStream stream = new ServletServerStream(
+ asyncCtx,
+ statsTraceCtx,
+ maxInboundMessageSize,
+ attributes.toBuilder()
+ .set(
+ Grpc.TRANSPORT_ATTR_REMOTE_ADDR,
+ new InetSocketAddress(req.getRemoteHost(), req.getRemotePort()))
+ .set(
+ Grpc.TRANSPORT_ATTR_LOCAL_ADDR,
+ new InetSocketAddress(req.getLocalAddr(), req.getLocalPort()))
+ .build(),
+ getAuthority(req),
+ logId);
+
+ transportListener.streamCreated(stream, method, headers);
+ stream.transportState().runOnTransportThread(stream.transportState()::onStreamAllocated);
+
+ asyncCtx.getRequest().getInputStream()
+ .setReadListener(new GrpcReadListener(stream, asyncCtx, logId));
+ asyncCtx.addListener(new GrpcAsycListener(stream, logId));
+ }
+
+ private static Metadata getHeaders(HttpServletRequest req) {
+ Enumeration headerNames = req.getHeaderNames();
+ checkNotNull(
+ headerNames, "Servlet container does not allow HttpServletRequest.getHeaderNames()");
+ List byteArrays = new ArrayList<>();
+ while (headerNames.hasMoreElements()) {
+ String headerName = headerNames.nextElement();
+ Enumeration values = req.getHeaders(headerName);
+ if (values == null) {
+ continue;
+ }
+ while (values.hasMoreElements()) {
+ String value = values.nextElement();
+ if (headerName.endsWith(Metadata.BINARY_HEADER_SUFFIX)) {
+ byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
+ byteArrays.add(BaseEncoding.base64().decode(value));
+ } else {
+ byteArrays.add(headerName.getBytes(StandardCharsets.US_ASCII));
+ byteArrays.add(value.getBytes(StandardCharsets.US_ASCII));
+ }
+ }
+ }
+ return InternalMetadata.newMetadata(byteArrays.toArray(new byte[][]{}));
+ }
+
+ private static String getAuthority(HttpServletRequest req) {
+ try {
+ return new URI(req.getRequestURL().toString()).getAuthority();
+ } catch (URISyntaxException e) {
+ logger.log(FINE, "Error getting authority from the request URL {0}" + req.getRequestURL());
+ return req.getServerName() + ":" + req.getServerPort();
+ }
+ }
+
+ /**
+ * Call this method when the adapter is no longer needed. The gRPC server will be terminated.
+ */
+ public void destroy() {
+ transportListener.transportTerminated();
+ }
+
+ private static final class GrpcAsycListener implements AsyncListener {
+ final InternalLogId logId;
+ final ServletServerStream stream;
+
+ GrpcAsycListener(ServletServerStream stream, InternalLogId logId) {
+ this.stream = stream;
+ this.logId = logId;
+ }
+
+ @Override
+ public void onComplete(AsyncEvent event) {}
+
+ @Override
+ public void onTimeout(AsyncEvent event) {
+ if (logger.isLoggable(FINE)) {
+ logger.log(FINE, String.format("[{%s}] Timeout: ", logId), event.getThrowable());
+ }
+ // If the resp is not committed, cancel() to avoid being redirected to an error page.
+ // Else, the container will send RST_STREAM in the end.
+ if (!event.getAsyncContext().getResponse().isCommitted()) {
+ stream.cancel(Status.DEADLINE_EXCEEDED);
+ } else {
+ stream.transportState().runOnTransportThread(
+ () -> stream.transportState().transportReportStatus(Status.DEADLINE_EXCEEDED));
+ }
+ }
+
+ @Override
+ public void onError(AsyncEvent event) {
+ if (logger.isLoggable(FINE)) {
+ logger.log(FINE, String.format("[{%s}] Error: ", logId), event.getThrowable());
+ }
+
+ // If the resp is not committed, cancel() to avoid being redirected to an error page.
+ // Else, the container will send RST_STREAM at the end.
+ if (!event.getAsyncContext().getResponse().isCommitted()) {
+ stream.cancel(Status.fromThrowable(event.getThrowable()));
+ } else {
+ stream.transportState().runOnTransportThread(
+ () -> stream.transportState().transportReportStatus(
+ Status.fromThrowable(event.getThrowable())));
+ }
+ }
+
+ @Override
+ public void onStartAsync(AsyncEvent event) {}
+ }
+
+ private static final class GrpcReadListener implements ReadListener {
+ final ServletServerStream stream;
+ final AsyncContext asyncCtx;
+ final ServletInputStream input;
+ final InternalLogId logId;
+
+ GrpcReadListener(
+ ServletServerStream stream,
+ AsyncContext asyncCtx,
+ InternalLogId logId) throws IOException {
+ this.stream = stream;
+ this.asyncCtx = asyncCtx;
+ input = asyncCtx.getRequest().getInputStream();
+ this.logId = logId;
+ }
+
+ final byte[] buffer = new byte[4 * 1024];
+
+ @Override
+ public void onDataAvailable() throws IOException {
+ logger.log(FINEST, "[{0}] onDataAvailable: ENTRY", logId);
+
+ while (input.isReady()) {
+ int length = input.read(buffer);
+ if (length == -1) {
+ logger.log(FINEST, "[{0}] inbound data: read end of stream", logId);
+ return;
+ } else {
+ if (logger.isLoggable(FINEST)) {
+ logger.log(
+ FINEST,
+ "[{0}] inbound data: length = {1}, bytes = {2}",
+ new Object[] {logId, length, ServletServerStream.toHexString(buffer, length)});
+ }
+
+ byte[] copy = Arrays.copyOf(buffer, length);
+ stream.transportState().runOnTransportThread(
+ () -> stream.transportState().inboundDataReceived(ReadableBuffers.wrap(copy), false));
+ }
+ }
+
+ logger.log(FINEST, "[{0}] onDataAvailable: EXIT", logId);
+ }
+
+ @Override
+ public void onAllDataRead() {
+ logger.log(FINE, "[{0}] onAllDataRead", logId);
+ stream.transportState().runOnTransportThread(() ->
+ stream.transportState().inboundDataReceived(ReadableBuffers.wrap(new byte[] {}), true));
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (logger.isLoggable(FINE)) {
+ logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
+ }
+ // If the resp is not committed, cancel() to avoid being redirected to an error page.
+ // Else, the container will send RST_STREAM at the end.
+ if (!asyncCtx.getResponse().isCommitted()) {
+ stream.cancel(Status.fromThrowable(t));
+ } else {
+ stream.transportState().runOnTransportThread(
+ () -> stream.transportState()
+ .transportReportStatus(Status.fromThrowable(t)));
+ }
+ }
+ }
+
+ /**
+ * Checks whether an incoming {@code HttpServletRequest} may come from a gRPC client.
+ *
+ * @return true if the request comes from a gRPC client
+ */
+ public static boolean isGrpc(HttpServletRequest request) {
+ return request.getContentType() != null
+ && request.getContentType().contains(GrpcUtil.CONTENT_TYPE_GRPC);
+ }
+}
diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
new file mode 100644
index 00000000000..80054d3fa07
--- /dev/null
+++ b/servlet/src/main/java/io/grpc/servlet/ServletServerBuilder.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.servlet;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.ExperimentalApi;
+import io.grpc.InternalChannelz.SocketStats;
+import io.grpc.InternalInstrumented;
+import io.grpc.InternalLogId;
+import io.grpc.Server;
+import io.grpc.ServerStreamTracer;
+import io.grpc.ServerStreamTracer.Factory;
+import io.grpc.Status;
+import io.grpc.internal.AbstractServerImplBuilder;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.InternalServer;
+import io.grpc.internal.ServerListener;
+import io.grpc.internal.ServerTransport;
+import io.grpc.internal.ServerTransportListener;
+import io.grpc.internal.SharedResourceHolder;
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import javax.annotation.concurrent.NotThreadSafe;
+
+/**
+ * Builder to build a gRPC server that can run as a servlet. This is for advanced custom settings.
+ * Normally, users should consider extending the out-of-box {@link GrpcServlet} directly instead.
+ *
+ * The API is experimental. The authors would like to know more about the real usecases. Users
+ * are welcome to provide feedback by commenting on
+ * the tracking issue.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5066")
+@NotThreadSafe
+public final class ServletServerBuilder extends AbstractServerImplBuilder {
+ List extends ServerStreamTracer.Factory> streamTracerFactories;
+ int maxInboundMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
+
+ private ScheduledExecutorService scheduler;
+ private boolean internalCaller;
+ private boolean usingCustomScheduler;
+ private InternalServerImpl internalServer;
+
+ /**
+ * Builds a gRPC server that can run as a servlet.
+ *
+ * The returned server will not be started or bound to a port.
+ *
+ *
Users should not call this method directly. Instead users should call
+ * {@link #buildServletAdapter()} which internally will call {@code build()} and {@code start()}
+ * appropriately.
+ *
+ * @throws IllegalStateException if this method is called by users directly
+ */
+ @Override
+ public Server build() {
+ checkState(internalCaller, "build() method should not be called directly by an application");
+ return super.build();
+ }
+
+ /**
+ * Creates a {@link ServletAdapter}.
+ */
+ public ServletAdapter buildServletAdapter() {
+ return new ServletAdapter(buildAndStart(), streamTracerFactories, maxInboundMessageSize);
+ }
+
+ private ServerTransportListener buildAndStart() {
+ try {
+ internalCaller = true;
+ build().start();
+ } catch (IOException e) {
+ // actually this should never happen
+ throw new RuntimeException(e);
+ } finally {
+ internalCaller = false;
+ }
+
+ if (!usingCustomScheduler) {
+ scheduler = SharedResourceHolder.get(GrpcUtil.TIMER_SERVICE);
+ }
+
+ // Create only one "transport" for all requests because it has no knowledge of which request is
+ // associated with which client socket. This "transport" does not do socket connection, the
+ // container does.
+ ServerTransportImpl serverTransport =
+ new ServerTransportImpl(scheduler, usingCustomScheduler);
+ return internalServer.serverListener.transportCreated(serverTransport);
+ }
+
+ @Override
+ protected List extends InternalServer> buildTransportServers(
+ List extends Factory> streamTracerFactories) {
+ checkNotNull(streamTracerFactories, "streamTracerFactories");
+ this.streamTracerFactories = streamTracerFactories;
+ internalServer = new InternalServerImpl();
+ return ImmutableList.of(internalServer);
+ }
+
+ /**
+ * Throws {@code UnsupportedOperationException}. TLS should be configured by the servlet
+ * container.
+ */
+ @Override
+ public ServletServerBuilder useTransportSecurity(File certChain, File privateKey) {
+ throw new UnsupportedOperationException("TLS should be configured by the servlet container");
+ }
+
+ @Override
+ public ServletServerBuilder maxInboundMessageSize(int bytes) {
+ checkArgument(bytes >= 0, "bytes must be >= 0");
+ maxInboundMessageSize = bytes;
+ return this;
+ }
+
+ /**
+ * Provides a custom scheduled executor service to the server builder.
+ *
+ * @return this
+ */
+ public ServletServerBuilder scheduledExecutorService(ScheduledExecutorService scheduler) {
+ this.scheduler = checkNotNull(scheduler, "scheduler");
+ usingCustomScheduler = true;
+ return this;
+ }
+
+ private static final class InternalServerImpl implements InternalServer {
+
+ ServerListener serverListener;
+
+ InternalServerImpl() {}
+
+ @Override
+ public void start(ServerListener listener) {
+ serverListener = listener;
+ }
+
+ @Override
+ public void shutdown() {
+ if (serverListener != null) {
+ serverListener.serverShutdown();
+ }
+ }
+
+ @Override
+ public SocketAddress getListenSocketAddress() {
+ return new SocketAddress() {
+ @Override
+ public String toString() {
+ return "ServletServer";
+ }
+ };
+ }
+
+ @Override
+ public InternalInstrumented getListenSocketStats() {
+ // sockets are managed by the servlet container, grpc is ignorant of that
+ return null;
+ }
+ }
+
+ @VisibleForTesting
+ static final class ServerTransportImpl implements ServerTransport {
+
+ private final InternalLogId logId = InternalLogId.allocate(ServerTransportImpl.class, null);
+ private final ScheduledExecutorService scheduler;
+ private final boolean usingCustomScheduler;
+
+ ServerTransportImpl(
+ ScheduledExecutorService scheduler, boolean usingCustomScheduler) {
+ this.scheduler = checkNotNull(scheduler, "scheduler");
+ this.usingCustomScheduler = usingCustomScheduler;
+ }
+
+ @Override
+ public void shutdown() {
+ if (!usingCustomScheduler) {
+ SharedResourceHolder.release(GrpcUtil.TIMER_SERVICE, scheduler);
+ }
+ }
+
+ @Override
+ public void shutdownNow(Status reason) {
+ shutdown();
+ }
+
+ @Override
+ public ScheduledExecutorService getScheduledExecutorService() {
+ return scheduler;
+ }
+
+ @Override
+ public ListenableFuture getStats() {
+ // does not support instrumentation
+ return null;
+ }
+
+ @Override
+ public InternalLogId getLogId() {
+ return logId;
+ }
+ }
+}
diff --git a/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
new file mode 100644
index 00000000000..fd3d68546bf
--- /dev/null
+++ b/servlet/src/main/java/io/grpc/servlet/ServletServerStream.java
@@ -0,0 +1,342 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.servlet;
+
+import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_GRPC;
+import static io.grpc.internal.GrpcUtil.CONTENT_TYPE_KEY;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.util.logging.Level.FINE;
+import static java.util.logging.Level.FINEST;
+import static java.util.logging.Level.WARNING;
+
+import com.google.common.io.BaseEncoding;
+import com.google.common.util.concurrent.MoreExecutors;
+import io.grpc.Attributes;
+import io.grpc.InternalLogId;
+import io.grpc.Metadata;
+import io.grpc.Status;
+import io.grpc.Status.Code;
+import io.grpc.internal.AbstractServerStream;
+import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.SerializingExecutor;
+import io.grpc.internal.StatsTraceContext;
+import io.grpc.internal.TransportFrameUtil;
+import io.grpc.internal.TransportTracer;
+import io.grpc.internal.WritableBuffer;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import java.util.logging.Logger;
+import javax.annotation.Nullable;
+import javax.servlet.AsyncContext;
+import javax.servlet.WriteListener;
+import javax.servlet.http.HttpServletResponse;
+
+final class ServletServerStream extends AbstractServerStream {
+
+ private static final Logger logger = Logger.getLogger(ServletServerStream.class.getName());
+
+ private final ServletTransportState transportState;
+ private final Sink sink = new Sink();
+ private final AsyncContext asyncCtx;
+ private final HttpServletResponse resp;
+ private final Attributes attributes;
+ private final String authority;
+ private final InternalLogId logId;
+ private final AsyncServletOutputStreamWriter writer;
+
+ ServletServerStream(
+ AsyncContext asyncCtx,
+ StatsTraceContext statsTraceCtx,
+ int maxInboundMessageSize,
+ Attributes attributes,
+ String authority,
+ InternalLogId logId) throws IOException {
+ super(ByteArrayWritableBuffer::new, statsTraceCtx);
+ transportState =
+ new ServletTransportState(maxInboundMessageSize, statsTraceCtx, new TransportTracer());
+ this.attributes = attributes;
+ this.authority = authority;
+ this.logId = logId;
+ this.asyncCtx = asyncCtx;
+ this.resp = (HttpServletResponse) asyncCtx.getResponse();
+ resp.getOutputStream().setWriteListener(new GrpcWriteListener());
+ this.writer = new AsyncServletOutputStreamWriter(
+ asyncCtx, resp.getOutputStream(), transportState, logId);
+ }
+
+ @Override
+ protected ServletTransportState transportState() {
+ return transportState;
+ }
+
+ @Override
+ public Attributes getAttributes() {
+ return attributes;
+ }
+
+ @Override
+ public String getAuthority() {
+ return authority;
+ }
+
+ @Override
+ public int streamId() {
+ return -1;
+ }
+
+ @Override
+ protected Sink abstractServerStreamSink() {
+ return sink;
+ }
+
+ private void writeHeadersToServletResponse(Metadata metadata) {
+ // Discard any application supplied duplicates of the reserved headers
+ metadata.discardAll(CONTENT_TYPE_KEY);
+ metadata.discardAll(GrpcUtil.TE_HEADER);
+ metadata.discardAll(GrpcUtil.USER_AGENT_KEY);
+
+ if (logger.isLoggable(FINE)) {
+ logger.log(FINE, "[{0}] writeHeaders {1}", new Object[] {logId, metadata});
+ }
+
+ resp.setStatus(HttpServletResponse.SC_OK);
+ resp.setContentType(CONTENT_TYPE_GRPC);
+
+ byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(metadata);
+ for (int i = 0; i < serializedHeaders.length; i += 2) {
+ resp.addHeader(
+ new String(serializedHeaders[i], StandardCharsets.US_ASCII),
+ new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII));
+ }
+ }
+
+ final class ServletTransportState extends TransportState {
+
+ private final SerializingExecutor transportThreadExecutor =
+ new SerializingExecutor(MoreExecutors.directExecutor());
+
+ private ServletTransportState(
+ int maxMessageSize, StatsTraceContext statsTraceCtx, TransportTracer transportTracer) {
+ super(maxMessageSize, statsTraceCtx, transportTracer);
+ }
+
+ @Override
+ public void runOnTransportThread(Runnable r) {
+ transportThreadExecutor.execute(r);
+ }
+
+ @Override
+ public void bytesRead(int numBytes) {
+ // no-op
+ // no flow control yet
+ }
+
+ @Override
+ public void deframeFailed(Throwable cause) {
+ if (logger.isLoggable(FINE)) {
+ logger.log(FINE, String.format("[{%s}] Exception processing message", logId), cause);
+ }
+ cancel(Status.fromThrowable(cause));
+ }
+ }
+
+ private static final class ByteArrayWritableBuffer implements WritableBuffer {
+
+ private final int capacity;
+ final byte[] bytes;
+ private int index;
+
+ ByteArrayWritableBuffer(int capacityHint) {
+ this.bytes = new byte[min(1024 * 1024, max(4096, capacityHint))];
+ this.capacity = bytes.length;
+ }
+
+ @Override
+ public void write(byte[] src, int srcIndex, int length) {
+ System.arraycopy(src, srcIndex, bytes, index, length);
+ index += length;
+ }
+
+ @Override
+ public void write(byte b) {
+ bytes[index++] = b;
+ }
+
+ @Override
+ public int writableBytes() {
+ return capacity - index;
+ }
+
+ @Override
+ public int readableBytes() {
+ return index;
+ }
+
+ @Override
+ public void release() {}
+ }
+
+ private final class GrpcWriteListener implements WriteListener {
+
+ @Override
+ public void onError(Throwable t) {
+ if (logger.isLoggable(FINE)) {
+ logger.log(FINE, String.format("[{%s}] Error: ", logId), t);
+ }
+
+ // If the resp is not committed, cancel() to avoid being redirected to an error page.
+ // Else, the container will send RST_STREAM at the end.
+ if (!resp.isCommitted()) {
+ cancel(Status.fromThrowable(t));
+ } else {
+ transportState.runOnTransportThread(
+ () -> transportState.transportReportStatus(Status.fromThrowable(t)));
+ }
+ }
+
+ @Override
+ public void onWritePossible() throws IOException {
+ writer.onWritePossible();
+ }
+ }
+
+ private final class Sink implements AbstractServerStream.Sink {
+ final TrailerSupplier trailerSupplier = new TrailerSupplier();
+
+ @Override
+ public void writeHeaders(Metadata headers) {
+ writeHeadersToServletResponse(headers);
+ resp.setTrailerFields(trailerSupplier);
+ try {
+ writer.flush();
+ } catch (IOException e) {
+ logger.log(WARNING, String.format("[{%s}] Exception when flushBuffer", logId), e);
+ cancel(Status.fromThrowable(e));
+ }
+ }
+
+ @Override
+ public void writeFrame(@Nullable WritableBuffer frame, boolean flush, int numMessages) {
+ if (frame == null && !flush) {
+ return;
+ }
+
+ if (logger.isLoggable(FINEST)) {
+ logger.log(
+ FINEST,
+ "[{0}] writeFrame: numBytes = {1}, flush = {2}, numMessages = {3}",
+ new Object[]{logId, frame == null ? 0 : frame.readableBytes(), flush, numMessages});
+ }
+
+ try {
+ if (frame != null) {
+ int numBytes = frame.readableBytes();
+ if (numBytes > 0) {
+ onSendingBytes(numBytes);
+ }
+ writer.writeBytes(((ByteArrayWritableBuffer) frame).bytes, frame.readableBytes());
+ }
+
+ if (flush) {
+ writer.flush();
+ }
+ } catch (IOException e) {
+ logger.log(WARNING, String.format("[{%s}] Exception writing message", logId), e);
+ cancel(Status.fromThrowable(e));
+ }
+ }
+
+ @Override
+ public void writeTrailers(Metadata trailers, boolean headersSent, Status status) {
+ if (logger.isLoggable(FINE)) {
+ logger.log(
+ FINE,
+ "[{0}] writeTrailers: {1}, headersSent = {2}, status = {3}",
+ new Object[] {logId, trailers, headersSent, status});
+ }
+ if (!headersSent) {
+ writeHeadersToServletResponse(trailers);
+ } else {
+ byte[][] serializedHeaders = TransportFrameUtil.toHttp2Headers(trailers);
+ for (int i = 0; i < serializedHeaders.length; i += 2) {
+ String key = new String(serializedHeaders[i], StandardCharsets.US_ASCII);
+ String newValue = new String(serializedHeaders[i + 1], StandardCharsets.US_ASCII);
+ trailerSupplier.get().computeIfPresent(key, (k, v) -> v + "," + newValue);
+ trailerSupplier.get().putIfAbsent(key, newValue);
+ }
+ }
+
+ writer.complete();
+ }
+
+ @Override
+ public void request(int numMessages) {
+ transportState.runOnTransportThread(
+ () -> transportState.requestMessagesFromDeframer(numMessages));
+ }
+
+ @Override
+ public void cancel(Status status) {
+ if (resp.isCommitted() && Code.DEADLINE_EXCEEDED == status.getCode()) {
+ return; // let the servlet timeout, the container will sent RST_STREAM automatically
+ }
+ transportState.runOnTransportThread(() -> transportState.transportReportStatus(status));
+ // There is no way to RST_STREAM with CANCEL code, so write trailers instead
+ close(Status.CANCELLED.withCause(status.asRuntimeException()), new Metadata());
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ transportState.runOnTransportThread(() -> {
+ asyncCtx.complete();
+ countDownLatch.countDown();
+ });
+ try {
+ countDownLatch.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private static final class TrailerSupplier implements Supplier