Skip to content

netty: support listening on multiple port #5067

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 4 commits into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions core/src/main/java/io/grpc/InternalChannelz.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ public static final class Builder {
private long callsSucceeded;
private long callsFailed;
private long lastCallStartedNanos;
public List<InternalInstrumented<SocketStats>> listenSockets = Collections.emptyList();
public List<InternalInstrumented<SocketStats>> listenSockets = new ArrayList<>();

public Builder setCallsStarted(long callsStarted) {
this.callsStarted = callsStarted;
Expand All @@ -334,10 +334,11 @@ public Builder setLastCallStartedNanos(long lastCallStartedNanos) {
}

/** Sets the listen sockets. */
public Builder setListenSockets(List<InternalInstrumented<SocketStats>> listenSockets) {
checkNotNull(listenSockets);
this.listenSockets = Collections.unmodifiableList(
new ArrayList<InternalInstrumented<SocketStats>>(listenSockets));
public Builder addListenSockets(List<InternalInstrumented<SocketStats>> listenSockets) {
checkNotNull(listenSockets, "listenSockets");
for (InternalInstrumented<SocketStats> ss : listenSockets) {
this.listenSockets.add(checkNotNull(ss, "null listen socket"));
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.grpc.internal.ObjectPool;
import io.grpc.internal.SharedResourcePool;
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -145,9 +146,9 @@ public InProcessServerBuilder maxInboundMetadataSize(int bytes) {
}

@Override
protected InProcessServer buildTransportServer(
protected List<InProcessServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
return new InProcessServer(this, streamTracerFactories);
return Collections.singletonList(new InProcessServer(this, streamTracerFactories));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ protected void setTracingEnabled(boolean value) {
public final Server build() {
ServerImpl server = new ServerImpl(
this,
buildTransportServer(getTracerFactories()),
buildTransportServers(getTracerFactories()),
Context.ROOT);
for (InternalNotifyOnServerBuild notifyTarget : notifyOnBuildList) {
notifyTarget.notifyOnBuild(server);
Expand Down Expand Up @@ -266,7 +266,7 @@ protected final TransportTracer.Factory getTransportTracerFactory() {
*
* @param streamTracerFactories an immutable list of stream tracer factories
*/
protected abstract io.grpc.internal.InternalServer buildTransportServer(
protected abstract List<? extends io.grpc.internal.InternalServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories);

private T thisT() {
Expand Down
70 changes: 47 additions & 23 deletions core/src/main/java/io/grpc/internal/ServerImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
Expand Down Expand Up @@ -101,12 +102,12 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
@GuardedBy("lock") private boolean serverShutdownCallbackInvoked;
@GuardedBy("lock") private boolean terminated;
/** Service encapsulating something similar to an accept() socket. */
private final InternalServer transportServer;
private final List<? extends InternalServer> transportServers;
private final Object lock = new Object();
@GuardedBy("lock") private boolean transportServerTerminated;
@GuardedBy("lock") private boolean transportServersTerminated;
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
@GuardedBy("lock") private final Collection<ServerTransport> transports =
new HashSet<ServerTransport>();
@GuardedBy("lock") private final Set<ServerTransport> transports = new HashSet<>();
@GuardedBy("lock") private int activeTransportServers;

private final Context rootContext;

Expand All @@ -126,14 +127,18 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
*/
ServerImpl(
AbstractServerImplBuilder<?> builder,
InternalServer transportServer,
List<? extends InternalServer> transportServers,
Context rootContext) {
this.executorPool = Preconditions.checkNotNull(builder.executorPool, "executorPool");
this.registry = Preconditions.checkNotNull(builder.registryBuilder.build(), "registryBuilder");
this.fallbackRegistry =
Preconditions.checkNotNull(builder.fallbackRegistry, "fallbackRegistry");
this.transportServer = Preconditions.checkNotNull(transportServer, "transportServer");
this.logId = InternalLogId.allocate("Server", String.valueOf(transportServer.getPort()));
Preconditions.checkNotNull(transportServers, "transportServers");
Preconditions.checkArgument(!transportServers.isEmpty(), "no servers provided");
this.transportServers = new ArrayList<>(transportServers);
// TODO(notcarl): concatenate all listening ports in the Log Id.
this.logId =
InternalLogId.allocate("Server", String.valueOf(transportServers.get(0).getPort()));
// Fork from the passed in context so that it does not propagate cancellation, it only
// inherits values.
this.rootContext = Preconditions.checkNotNull(rootContext, "rootContext").fork();
Expand Down Expand Up @@ -163,8 +168,13 @@ public ServerImpl start() throws IOException {
synchronized (lock) {
checkState(!started, "Already started");
checkState(!shutdown, "Shutting down");
// Start and wait for any port to actually be bound.
transportServer.start(new ServerListenerImpl());
// Start and wait for any ports to actually be bound.

ServerListenerImpl listener = new ServerListenerImpl();
for (InternalServer ts : transportServers) {
ts.start(listener);
activeTransportServers++;
}
executor = Preconditions.checkNotNull(executorPool.getObject(), "executor");
started = true;
return this;
Expand All @@ -176,7 +186,13 @@ public int getPort() {
synchronized (lock) {
checkState(started, "Not started");
checkState(!terminated, "Already terminated");
return transportServer.getPort();
for (InternalServer ts : transportServers) {
int port = ts.getPort();
if (port != -1) {
return port;
}
}
return -1;
}
}

Expand Down Expand Up @@ -211,20 +227,22 @@ public List<ServerServiceDefinition> getMutableServices() {
*/
@Override
public ServerImpl shutdown() {
boolean shutdownTransportServer;
boolean shutdownTransportServers;
synchronized (lock) {
if (shutdown) {
return this;
}
shutdown = true;
shutdownTransportServer = started;
if (!shutdownTransportServer) {
transportServerTerminated = true;
shutdownTransportServers = started;
if (!shutdownTransportServers) {
transportServersTerminated = true;
checkForTermination();
}
}
if (shutdownTransportServer) {
transportServer.shutdown();
if (shutdownTransportServers) {
for (InternalServer ts : transportServers) {
ts.shutdown();
}
}
return this;
}
Expand Down Expand Up @@ -311,7 +329,7 @@ private void transportClosed(ServerTransport transport) {
/** Notify of complete shutdown if necessary. */
private void checkForTermination() {
synchronized (lock) {
if (shutdown && transports.isEmpty() && transportServerTerminated) {
if (shutdown && transports.isEmpty() && transportServersTerminated) {
if (terminated) {
throw new AssertionError("Server already terminated");
}
Expand All @@ -320,13 +338,13 @@ private void checkForTermination() {
if (executor != null) {
executor = executorPool.returnObject(executor);
}
// TODO(carl-mastrangelo): move this outside the synchronized block.
lock.notifyAll();
}
}
}

private final class ServerListenerImpl implements ServerListener {

@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
synchronized (lock) {
Expand All @@ -342,6 +360,11 @@ public void serverShutdown() {
ArrayList<ServerTransport> copiedTransports;
Status shutdownNowStatusCopy;
synchronized (lock) {
activeTransportServers--;
if (activeTransportServers != 0) {
return;
}

// transports collection can be modified during shutdown(), even if we hold the lock, due
// to reentrancy.
copiedTransports = new ArrayList<>(transports);
Expand All @@ -356,7 +379,7 @@ public void serverShutdown() {
}
}
synchronized (lock) {
transportServerTerminated = true;
transportServersTerminated = true;
checkForTermination();
}
}
Expand Down Expand Up @@ -577,9 +600,10 @@ public InternalLogId getLogId() {

@Override
public ListenableFuture<ServerStats> getStats() {
ServerStats.Builder builder
= new ServerStats.Builder()
.setListenSockets(transportServer.getListenSockets());
ServerStats.Builder builder = new ServerStats.Builder();
for (InternalServer ts : transportServers) {
builder.addListenSockets(ts.getListenSockets());
}
serverCallTracer.updateBuilder(builder);
SettableFuture<ServerStats> ret = SettableFuture.create();
ret.set(builder.build());
Expand All @@ -590,7 +614,7 @@ public ListenableFuture<ServerStats> getStats() {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("logId", logId.getId())
.add("transportServer", transportServer)
.add("transportServers", transportServers)
.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;

import com.google.common.collect.Iterables;
import io.grpc.ServerStreamTracer.Factory;
import io.grpc.internal.FakeClock;
import io.grpc.internal.ObjectPool;
Expand Down Expand Up @@ -54,7 +55,8 @@ public void generateName() {
@Test
public void scheduledExecutorService_default() {
InProcessServerBuilder builder = InProcessServerBuilder.forName("foo");
InProcessServer server = builder.buildTransportServer(new ArrayList<Factory>());
InProcessServer server =
Iterables.getOnlyElement(builder.buildTransportServers(new ArrayList<Factory>()));

ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
server.getScheduledExecutorServicePool();
Expand All @@ -78,7 +80,8 @@ public void scheduledExecutorService_custom() {
InProcessServerBuilder builder1 = builder.scheduledExecutorService(scheduledExecutorService);
assertSame(builder, builder1);

InProcessServer server = builder1.buildTransportServer(new ArrayList<Factory>());
InProcessServer server =
Iterables.getOnlyElement(builder1.buildTransportServers(new ArrayList<Factory>()));
ObjectPool<ScheduledExecutorService> scheduledExecutorServicePool =
server.getScheduledExecutorServicePool();

Expand Down
10 changes: 6 additions & 4 deletions core/src/test/java/io/grpc/inprocess/InProcessTransportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.grpc.internal.InternalServer;
import io.grpc.internal.ManagedClientTransport;
import io.grpc.internal.testing.AbstractTransportTest;
import java.util.Collections;
import java.util.List;
import org.junit.Ignore;
import org.junit.Test;
Expand All @@ -35,16 +36,17 @@ public class InProcessTransportTest extends AbstractTransportTest {
private static final String USER_AGENT = "a-testing-user-agent";

@Override
protected InternalServer newServer(List<ServerStreamTracer.Factory> streamTracerFactories) {
protected List<? extends InternalServer> newServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
InProcessServerBuilder builder = InProcessServerBuilder
.forName(TRANSPORT_NAME)
.maxInboundMetadataSize(GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE);
return new InProcessServer(builder, streamTracerFactories);
return Collections.singletonList(new InProcessServer(builder, streamTracerFactories));
}

@Override
protected InternalServer newServer(
InternalServer server, List<ServerStreamTracer.Factory> streamTracerFactories) {
protected List<? extends InternalServer> newServer(
int port, List<ServerStreamTracer.Factory> streamTracerFactories) {
return newServer(streamTracerFactories);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ static class Builder extends AbstractServerImplBuilder<Builder> {
}

@Override
protected io.grpc.internal.InternalServer buildTransportServer(
protected List<io.grpc.internal.InternalServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
throw new UnsupportedOperationException();
}
Expand Down
42 changes: 40 additions & 2 deletions core/src/test/java/io/grpc/internal/ServerImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
Expand Down Expand Up @@ -87,6 +88,7 @@
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -210,6 +212,42 @@ public void noPendingTasks() {
assertEquals(0, timer.numPendingTasks());
}

@Test
public void multiport() throws Exception {
final CountDownLatch starts = new CountDownLatch(2);
final CountDownLatch shutdowns = new CountDownLatch(2);

final class Serv extends SimpleServer {
@Override
public void start(ServerListener listener) throws IOException {
super.start(listener);
starts.countDown();
}

@Override
public void shutdown() {
super.shutdown();
shutdowns.countDown();
}
}

SimpleServer transportServer1 = new Serv();
SimpleServer transportServer2 = new Serv();
assertNull(server);
builder.fallbackHandlerRegistry(fallbackRegistry);
builder.executorPool = executorPool;
server = new ServerImpl(
builder, ImmutableList.of(transportServer1, transportServer2), SERVER_CONTEXT);

server.start();
assertTrue(starts.await(1, TimeUnit.SECONDS));
assertEquals(2, shutdowns.getCount());

server.shutdown();
assertTrue(shutdowns.await(1, TimeUnit.SECONDS));
assertTrue(server.awaitTermination(1, TimeUnit.SECONDS));
}

@Test
public void startStopImmediate() throws IOException {
transportServer = new SimpleServer() {
Expand Down Expand Up @@ -1326,7 +1364,7 @@ private void createServer() {

builder.fallbackHandlerRegistry(fallbackRegistry);
builder.executorPool = executorPool;
server = new ServerImpl(builder, transportServer, SERVER_CONTEXT);
server = new ServerImpl(builder, Collections.singletonList(transportServer), SERVER_CONTEXT);
}

private void verifyExecutorsAcquired() {
Expand Down Expand Up @@ -1411,7 +1449,7 @@ public ListenableFuture<SocketStats> getStats() {
}

private static class Builder extends AbstractServerImplBuilder<Builder> {
@Override protected InternalServer buildTransportServer(
@Override protected List<InternalServer> buildTransportServers(
List<? extends ServerStreamTracer.Factory> streamTracerFactories) {
throw new UnsupportedOperationException();
}
Expand Down
Loading