From b9dd5a7b0d10091916b9736063ba3415e19481bc Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 08:45:15 -0800 Subject: [PATCH 1/6] simplifications for blockingpicker creation and cleanup --- clientconn.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/clientconn.go b/clientconn.go index c7bf6849f07e..6d654b6be5f4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -181,6 +181,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) + cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) + defer func() { if err != nil { cc.Close() @@ -359,13 +361,7 @@ func (cc *ClientConn) exitIdleMode() error { }() cc.idlenessState = ccIdlenessStateExitingIdle - exitedIdle := false - if cc.blockingpicker == nil { - cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) - } else { - cc.blockingpicker.exitIdleMode() - exitedIdle = true - } + cc.blockingpicker.exitIdleMode() var credsClone credentials.TransportCredentials if creds := cc.dopts.copts.TransportCredentials; creds != nil { @@ -394,9 +390,7 @@ func (cc *ClientConn) exitIdleMode() error { return err } - if exitedIdle { - cc.addTraceEvent("exiting idle mode") - } + cc.addTraceEvent("exiting idle mode") return nil } @@ -1275,9 +1269,7 @@ func (cc *ClientConn) Close() error { // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. - if pWrapper != nil { - pWrapper.close() - } + pWrapper.close() if bWrapper != nil { bWrapper.close() } From 8e1160351ac36c8760e96c69ffa32cdba29da6be Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 09:13:47 -0800 Subject: [PATCH 2/6] simplifications for balancerWrapper creation and cleanup --- balancer_conn_wrappers.go | 21 ++++++------ clientconn.go | 70 ++++++++++++++++++--------------------- 2 files changed, 43 insertions(+), 48 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index a4411c22bfc8..60fabab03544 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -76,17 +76,14 @@ type ccBalancerWrapper struct { mode ccbMode // Tracks the current mode of the wrapper. } -// newCCBalancerWrapper creates a new balancer wrapper. The underlying balancer -// is not created until the switchTo() method is invoked. +// newCCBalancerWrapper creates a new balancer wrapper in idle state. The +// underlying balancer is not created until the switchTo() method is invoked. func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper { - ctx, cancel := context.WithCancel(context.Background()) ccb := &ccBalancerWrapper{ - cc: cc, - opts: bopts, - serializer: grpcsync.NewCallbackSerializer(ctx), - serializerCancel: cancel, + cc: cc, + opts: bopts, + mode: ccbModeIdle, } - ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts) return ccb } @@ -258,7 +255,7 @@ func (ccb *ccBalancerWrapper) exitIdleMode() { // exitIdleMode(), and since we just created a new serializer, we can be // sure that the below function will be scheduled. done := make(chan struct{}) - ccb.serializer.Schedule(func(_ context.Context) { + ccb.serializer.Schedule(func(context.Context) { defer close(done) ccb.mu.Lock() @@ -271,7 +268,11 @@ func (ccb *ccBalancerWrapper) exitIdleMode() { // Gracefulswitch balancer does not support a switchTo operation after // being closed. Hence we need to create a new one here. - ccb.balancer = gracefulswitch.NewBalancer(ccb, ccb.opts) + opts := ccb.opts + if c := opts.DialCreds; c != nil { + opts.DialCreds = c.Clone() + } + ccb.balancer = gracefulswitch.NewBalancer(ccb, opts) ccb.mode = ccbModeActive channelz.Info(logger, ccb.cc.channelzID, "ccBalancerWrapper: exiting idle mode") diff --git a/clientconn.go b/clientconn.go index 6d654b6be5f4..fb37980bafc1 100644 --- a/clientconn.go +++ b/clientconn.go @@ -160,6 +160,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.exitIdleCond = sync.NewCond(&cc.mu) + // Apply dial options disableGlobalOpts := false for _, opt := range opts { if _, ok := opt.(*disableGlobalDialOptions); ok { @@ -177,23 +178,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * for _, opt := range opts { opt.apply(&cc.dopts) } - chainUnaryClientInterceptors(cc) chainStreamClientInterceptors(cc) - cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) - - defer func() { - if err != nil { - cc.Close() - } - }() - - // Register ClientConn with channelz. - cc.channelzRegistration(target) - - cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) - if err := cc.validateTransportCredentials(); err != nil { return nil, err } @@ -213,6 +200,35 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.dopts.copts.UserAgent = grpcUA } + // Register ClientConn with channelz. + cc.channelzRegistration(target) + + // Determine the resolver to use. + if err := cc.parseTargetAndFindResolver(); err != nil { + return nil, err + } + if err = cc.determineAuthority(); err != nil { + return nil, err + } + + cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) + cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) + cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ + DialCreds: cc.dopts.copts.TransportCredentials, + CredsBundle: cc.dopts.copts.CredsBundle, + Dialer: cc.dopts.copts.Dialer, + Authority: cc.authority, + CustomUserAgent: cc.dopts.copts.UserAgent, + ChannelzParentID: cc.channelzID, + Target: cc.parsedTarget, + }) + + defer func() { + if err != nil { + cc.Close() + } + }() + if cc.dopts.timeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout) @@ -237,14 +253,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.dopts.bs = backoff.DefaultExponential } - // Determine the resolver to use. - if err := cc.parseTargetAndFindResolver(); err != nil { - return nil, err - } - if err = cc.determineAuthority(); err != nil { - return nil, err - } - if cc.dopts.scChan != nil { // Blocking wait for the initial service config. select { @@ -367,19 +375,7 @@ func (cc *ClientConn) exitIdleMode() error { if creds := cc.dopts.copts.TransportCredentials; creds != nil { credsClone = creds.Clone() } - if cc.balancerWrapper == nil { - cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ - DialCreds: credsClone, - CredsBundle: cc.dopts.copts.CredsBundle, - Dialer: cc.dopts.copts.Dialer, - Authority: cc.authority, - CustomUserAgent: cc.dopts.copts.UserAgent, - ChannelzParentID: cc.channelzID, - Target: cc.parsedTarget, - }) - } else { - cc.balancerWrapper.exitIdleMode() - } + cc.balancerWrapper.exitIdleMode() cc.firstResolveEvent = grpcsync.NewEvent() cc.mu.Unlock() @@ -1270,9 +1266,7 @@ func (cc *ClientConn) Close() error { // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. pWrapper.close() - if bWrapper != nil { - bWrapper.close() - } + bWrapper.close() if rWrapper != nil { rWrapper.close() } From 63eaaf7d0d1ed125ba74e2e2ea0b1fe61efe77dd Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 09:16:40 -0800 Subject: [PATCH 3/6] simplify close further by removing local vars and lock complications --- clientconn.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/clientconn.go b/clientconn.go index fb37980bafc1..5af804fa8ba3 100644 --- a/clientconn.go +++ b/clientconn.go @@ -1257,20 +1257,18 @@ func (cc *ClientConn) Close() error { cc.conns = nil cc.csMgr.updateState(connectivity.Shutdown) - pWrapper := cc.blockingpicker - rWrapper := cc.resolverWrapper - bWrapper := cc.balancerWrapper - idlenessMgr := cc.idlenessMgr + // We can safely unlock and continue to access all fields now as + // cc.conns==nil, preventing any further operations on cc. cc.mu.Unlock() // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. - pWrapper.close() - bWrapper.close() - if rWrapper != nil { + cc.blockingpicker.close() + cc.balancerWrapper.close() + if rWrapper := cc.resolverWrapper; rWrapper != nil { rWrapper.close() } - if idlenessMgr != nil { + if idlenessMgr := cc.idlenessMgr; idlenessMgr != nil { idlenessMgr.Close() } From b239f60f39c3d93ec5fc7f25e97a579e661d1d75 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 09:26:37 -0800 Subject: [PATCH 4/6] rename blockingpicker to pickerWrapper to match balancerWrapper and resolverWrapper --- balancer_conn_wrappers.go | 2 +- clientconn.go | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 60fabab03544..4b82ff8168e1 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -338,7 +338,7 @@ func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { // case where we wait for ready and then perform an RPC. If the picker is // updated later, we could call the "connecting" picker when the state is // updated, and then call the "ready" picker after the picker gets updated. - ccb.cc.blockingpicker.updatePicker(s.Picker) + ccb.cc.pickerWrapper.updatePicker(s.Picker) ccb.cc.csMgr.updateState(s.ConnectivityState) } diff --git a/clientconn.go b/clientconn.go index 5af804fa8ba3..a47141b50217 100644 --- a/clientconn.go +++ b/clientconn.go @@ -212,7 +212,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelzID) - cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) + cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers) cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{ DialCreds: cc.dopts.copts.TransportCredentials, CredsBundle: cc.dopts.copts.CredsBundle, @@ -369,7 +369,7 @@ func (cc *ClientConn) exitIdleMode() error { }() cc.idlenessState = ccIdlenessStateExitingIdle - cc.blockingpicker.exitIdleMode() + cc.pickerWrapper.exitIdleMode() var credsClone credentials.TransportCredentials if creds := cc.dopts.copts.TransportCredentials; creds != nil { @@ -417,7 +417,7 @@ func (cc *ClientConn) enterIdleMode() error { // `cc.resolverWrapper`, it makes the code simpler in the wrapper. We should // try to do the same for the balancer and picker wrappers too. cc.resolverWrapper.close() - cc.blockingpicker.enterIdleMode() + cc.pickerWrapper.enterIdleMode() cc.balancerWrapper.enterIdleMode() cc.csMgr.updateState(connectivity.Idle) cc.idlenessState = ccIdlenessStateIdle @@ -645,7 +645,7 @@ type ClientConn struct { // The following provide their own synchronization, and therefore don't // require cc.mu to be held to access them. csMgr *connectivityStateManager - blockingpicker *pickerWrapper + pickerWrapper *pickerWrapper safeConfigSelector iresolver.SafeConfigSelector czData *channelzData retryThrottler atomic.Value // Updated from service config. @@ -900,7 +900,7 @@ func (cc *ClientConn) applyFailingLB(sc *serviceconfig.ParseResult) { err = status.Errorf(codes.Unavailable, "illegal service config type: %T", sc.Config) } cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil}) - cc.blockingpicker.updatePicker(base.NewErrPicker(err)) + cc.pickerWrapper.updatePicker(base.NewErrPicker(err)) cc.csMgr.updateState(connectivity.TransientFailure) } @@ -1164,7 +1164,7 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig { } func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) { - return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{ + return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{ Ctx: ctx, FullMethodName: method, }) @@ -1263,7 +1263,7 @@ func (cc *ClientConn) Close() error { // The order of closing matters here since the balancer wrapper assumes the // picker is closed before it is closed. - cc.blockingpicker.close() + cc.pickerWrapper.close() cc.balancerWrapper.close() if rWrapper := cc.resolverWrapper; rWrapper != nil { rWrapper.close() From 84b7e78fd2e0f644124949b7872f67ceebee15b4 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 09:59:48 -0800 Subject: [PATCH 5/6] clean up channelz in period before Close can happen --- clientconn.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clientconn.go b/clientconn.go index a47141b50217..67d6ce88a50a 100644 --- a/clientconn.go +++ b/clientconn.go @@ -205,9 +205,11 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * // Determine the resolver to use. if err := cc.parseTargetAndFindResolver(); err != nil { + channelz.RemoveEntry(cc.channelzID) return nil, err } if err = cc.determineAuthority(); err != nil { + channelz.RemoveEntry(cc.channelzID) return nil, err } From cab5a88e8d9a236ff0a62c77e716fb985c97d8ec Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Wed, 15 Nov 2023 10:39:13 -0800 Subject: [PATCH 6/6] . --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index 67d6ce88a50a..1e4e74d6cfec 100644 --- a/clientconn.go +++ b/clientconn.go @@ -160,7 +160,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.exitIdleCond = sync.NewCond(&cc.mu) - // Apply dial options + // Apply dial options. disableGlobalOpts := false for _, opt := range opts { if _, ok := opt.(*disableGlobalDialOptions); ok {