From 2d1bb21e4dc97dada938ab7d153df61593a30634 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 12 Sep 2023 13:53:19 -0700 Subject: [PATCH] grpc: ensure transports are closed when the channel enters IDLE (#6620) --- clientconn.go | 38 +++++++++++++++++++++++----------- internal/idle/idle_e2e_test.go | 18 ++++++++++++++-- 2 files changed, 42 insertions(+), 14 deletions(-) diff --git a/clientconn.go b/clientconn.go index d53d91d5d9f3..ff7fea102288 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) { ac.cancel() ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx) - // We have to defer here because GracefulClose => Close => onClose, which - // requires locking ac.mu. + // We have to defer here because GracefulClose => onClose, which requires + // locking ac.mu. if ac.transport != nil { defer ac.transport.GracefulClose() ac.transport = nil @@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) { ac.updateConnectivityState(connectivity.Shutdown, nil) ac.cancel() ac.curAddr = resolver.Address{} - if err == errConnDrain && curTr != nil { - // GracefulClose(...) may be executed multiple times when - // i) receiving multiple GoAway frames from the server; or - // ii) there are concurrent name resolver/Balancer triggered - // address removal and GoAway. - // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. - ac.mu.Unlock() - curTr.GracefulClose() - ac.mu.Lock() - } + channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{ Desc: "Subchannel deleted", Severity: channelz.CtInfo, @@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) { // being deleted right away. channelz.RemoveEntry(ac.channelzID) ac.mu.Unlock() + + // We have to release the lock before the call to GracefulClose/Close here + // because both of them call onClose(), which requires locking ac.mu. + if curTr != nil { + if err == errConnDrain { + // Close the transport gracefully when the subConn is being shutdown. + // + // GracefulClose() may be executed multiple times if: + // - multiple GoAway frames are received from the server + // - there are concurrent name resolver or balancer triggered + // address removal and GoAway + curTr.GracefulClose() + } else { + // Hard close the transport when the channel is entering idle or is + // being shutdown. In the case where the channel is being shutdown, + // closing of transports is also taken care of by cancelation of cc.ctx. + // But in the case where the channel is entering idle, we need to + // explicitly close the transports here. Instead of distinguishing + // between these two cases, it is simpler to close the transport + // unconditionally here. + curTr.Close(err) + } + } } func (ac *addrConn) getState() connectivity.State { diff --git a/internal/idle/idle_e2e_test.go b/internal/idle/idle_e2e_test.go index 5526a88f58b9..84b4ba7bba34 100644 --- a/internal/idle/idle_e2e_test.go +++ b/internal/idle/idle_e2e_test.go @@ -142,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) { } // Tests the case where channel idleness is enabled by passing a small value for -// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE. +// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and +// the connection to the backend is closed. func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { // Create a ClientConn with a short idle_timeout. r := manual.NewBuilderWithScheme("whatever") @@ -159,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { t.Cleanup(func() { cc.Close() }) // Start a test backend and push an address update via the resolver. - backend := stubserver.StartTestService(t, nil) + lis := testutils.NewListenerWrapper(t, nil) + backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis}) t.Cleanup(backend.Stop) r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}}) @@ -168,6 +170,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { defer cancel() testutils.AwaitState(ctx, t, cc, connectivity.Ready) + // Retrieve the wrapped conn from the listener. + v, err := lis.NewConnCh.Receive(ctx) + if err != nil { + t.Fatalf("Failed to retrieve conn from test listener: %v", err) + } + conn := v.(*testutils.ConnWrapper) + // Verify that the ClientConn moves to IDLE as there is no activity. testutils.AwaitState(ctx, t, cc, connectivity.Idle) @@ -175,6 +184,11 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) { if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil { t.Fatal(err) } + + // Verify that the previously open connection is closed. + if _, err := conn.CloseCh.Receive(ctx); err != nil { + t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err) + } } // Tests the case where channel idleness is enabled by passing a small value for