From e9997a0416a1d40e33a49227d447c351dea5ba54 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Mon, 13 May 2024 18:48:00 +0530 Subject: [PATCH 01/13] rls: update picker synchronously upon receipt of configuration update --- balancer/rls/balancer.go | 17 ++- balancer/rls/balancer_test.go | 195 ++++++++++++++++++++++++++++++++++ 2 files changed, 207 insertions(+), 5 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 3ac28271618b..3ee7181494b3 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -74,9 +74,10 @@ var ( // Following functions are no-ops in actual code, but can be overridden in // tests to give tests visibility into exactly when certain events happen. - clientConnUpdateHook = func() {} - dataCachePurgeHook = func() {} - resetBackoffHook = func() {} + clientConnUpdateHook = func() {} + dataCachePurgeHook = func() {} + resetBackoffHook = func() {} + entryWithValidBackoffEvicted = func() {} ) func init() { @@ -297,13 +298,19 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error if resizeCache { // If the new config changes reduces the size of the data cache, we // might have to evict entries to get the cache size down to the newly - // specified size. + // specified size. If we do evict an entry with valid backoff timer, + // the new picker needs to be sent to the channel to re-process any + // RPCs queued as a result of this backoff timer. // // And we cannot do this operation above (where we compute the // `resizeCache` boolean) because `cacheMu` needs to be grabbed before // `stateMu` if we are to hold both locks at the same time. b.cacheMu.Lock() - b.dataCache.resize(newCfg.cacheSizeBytes) + evicted := b.dataCache.resize(newCfg.cacheSizeBytes) + if evicted { + b.sendNewPickerLocked() + entryWithValidBackoffEvicted() + } b.cacheMu.Unlock() } return nil diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 444b8b99d4a3..75bd16012c29 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -650,6 +650,201 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { verifyRLSRequest(t, rlsReqCh, true) } +// Test that when a data cache entry is evicted due to config change +// in cache size, the picker is updated accordingly. +func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { + // Create a restartable listener which can close existing connections. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Override the clientConn update hook to get notified. + clientConnUpdateDone := make(chan struct{}, 1) + origClientConnUpdateHook := clientConnUpdateHook + clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } + defer func() { clientConnUpdateHook = origClientConnUpdateHook }() + + // Override the cache entry size func, and always return 1. + origEntrySizeFunc := computeDataCacheEntrySize + computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 } + defer func() { computeDataCacheEntrySize = origEntrySizeFunc }() + + // Override the backoff strategy to return a large backoff which + // will make sure the date cache entry remains in backoff for the + // duration of the test. + origBackoffStrategy := defaultBackoffStrategy + defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout} + defer func() { defaultBackoffStrategy = origBackoffStrategy }() + + // Override the minEvictionDuration to ensure that when the config update + // reduces the cache size, the resize operation is not stopped because + // we find an entry whose minExpiryDuration has not elapsed. + origMinEvictDuration := minEvictDuration + minEvictDuration = time.Duration(0) + defer func() { minEvictDuration = origMinEvictDuration }() + + // Override the entryWithValidBackoffEvicted to ensure we update + // picker when an entry with valid backoff time was evicted. + backOffItemEvicted := make(chan struct{}, 1) + origBackoffItemEvicted := entryWithValidBackoffEvicted + entryWithValidBackoffEvicted = func() { backOffItemEvicted <- struct{}{} } + defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }() + + // Register the top-level wrapping balancer which forwards calls to RLS. + topLevelBalancerName := t.Name() + "top-level" + var ccWrapper *testCCWrapper + stub.Register(topLevelBalancerName, stub.BalancerFuncs{ + Init: func(bd *stub.BalancerData) { + ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn} + bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions) + }, + ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + parser := balancer.Get(Name).(balancer.ConfigParser) + return parser.ParseConfig(sc) + }, + UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error { + bal := bd.Data.(balancer.Balancer) + return bal.UpdateClientConnState(ccs) + }, + Close: func(bd *stub.BalancerData) { + bal := bd.Data.(balancer.Balancer) + bal.Close() + }, + }) + + // Start an RLS server and set the throttler to never throttle requests. + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) + overrideAdaptiveThrottler(t, neverThrottlingThrottler()) + + // Register an LB policy to act as the child policy for RLS LB policy. + childPolicyName := "test-child-policy" + t.Name() + e2e.RegisterRLSChildPolicy(childPolicyName, nil) + t.Logf("Registered child policy with name %q", childPolicyName) + + // Start a couple of test backends, and set up the fake RLS server to return + // these as targets in the RLS response, based on request keys. + _, backendAddress1 := startBackend(t) + backendCh2, backendAddress2 := startBackend(t) + backendCh3, backendAddress3 := startBackend(t) + rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { + if req.KeyMap["k1"] == "v1" { + return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + } + if req.KeyMap["k2"] == "v2" { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} + } + if req.KeyMap["k3"] == "v3" { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}} + } + return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} + }) + + // Register a manual resolver and push the RLS service config through it. + r := manual.NewBuilderWithScheme("rls-e2e") + headers := ` + [ + { + "key": "k1", + "names": [ + "n1" + ] + }, + { + "key": "k2", + "names": [ + "n2" + ] + }, + { + "key": "k3", + "names": [ + "n3" + ] + } + ] + ` + scJSON := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": 1000 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) + r.InitialState(resolver.State{ServiceConfig: sc}) + + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("create grpc.Dial() failed: %v", err) + } + defer cc.Close() + + <-clientConnUpdateDone + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) + t.Logf("Verifying if RPC failed when listener is stopped.") + + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) + verifyRLSRequest(t, rlsReqCh, true) + + ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3) + verifyRLSRequest(t, rlsReqCh, true) + + // Setting the size to 1 will cause the entries to be + // evicted. + scJSON1 := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": 1 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) + sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) + r.UpdateState(resolver.State{ServiceConfig: sc1}) + <-clientConnUpdateDone + + // Stop the listener + lis.Stop() + + select { + // Wait for backOffItemEvicted to ensure picker was updated + // synchronously when there was cache resize on config update. + case <-backOffItemEvicted: + case <-ctx.Done(): + t.Error("Error sending picker update on eviction of cache entry with valid backoff: context timed out.") + } +} + // TestDataCachePurging verifies that the LB policy periodically evicts expired // entries from the data cache. func (s) TestDataCachePurging(t *testing.T) { From b2f41554bebeab05361a2c2b8763d774ec9e57e4 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 18 Jul 2024 12:56:48 +0530 Subject: [PATCH 02/13] fix comments: test if updates are synchronous --- balancer/rls/balancer.go | 2 +- balancer/rls/balancer_test.go | 116 ++++++++++++++-------------------- 2 files changed, 50 insertions(+), 68 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 3ee7181494b3..0dfdab048879 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -309,7 +309,6 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error evicted := b.dataCache.resize(newCfg.cacheSizeBytes) if evicted { b.sendNewPickerLocked() - entryWithValidBackoffEvicted() } b.cacheMu.Unlock() } @@ -516,6 +515,7 @@ func (b *rlsBalancer) sendNewPickerLocked() { if !b.inhibitPickerUpdates { b.logger.Infof("New balancer.State: %+v", state) b.cc.UpdateState(state) + entryWithValidBackoffEvicted() } else { b.logger.Infof("Delaying picker update: %+v", state) } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 75bd16012c29..f6295253b46a 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -653,13 +653,6 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { // Test that when a data cache entry is evicted due to config change // in cache size, the picker is updated accordingly. func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { - // Create a restartable listener which can close existing connections. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("net.Listen() failed: %v", err) - } - lis := testutils.NewRestartableListener(l) - // Override the clientConn update hook to get notified. clientConnUpdateDone := make(chan struct{}, 1) origClientConnUpdateHook := clientConnUpdateHook @@ -689,7 +682,13 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // picker when an entry with valid backoff time was evicted. backOffItemEvicted := make(chan struct{}, 1) origBackoffItemEvicted := entryWithValidBackoffEvicted - entryWithValidBackoffEvicted = func() { backOffItemEvicted <- struct{}{} } + entryWithValidBackoffEvicted = func() { + select { + case backOffItemEvicted <- struct{}{}: + default: + // Do nothing if the channel is full + } + } defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }() // Register the top-level wrapping balancer which forwards calls to RLS. @@ -715,7 +714,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { }) // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Register an LB policy to act as the child policy for RLS LB policy. @@ -725,18 +724,17 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // Start a couple of test backends, and set up the fake RLS server to return // these as targets in the RLS response, based on request keys. - _, backendAddress1 := startBackend(t) + backendCh1, backendAddress1 := startBackend(t) backendCh2, backendAddress2 := startBackend(t) - backendCh3, backendAddress3 := startBackend(t) rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { if req.KeyMap["k1"] == "v1" { - return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry")} } if req.KeyMap["k2"] == "v2" { - return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} } if req.KeyMap["k3"] == "v3" { - return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}} + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} } return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} }) @@ -765,35 +763,41 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { } ] ` - scJSON := fmt.Sprintf(` -{ - "loadBalancingConfig": [ - { - "%s": { - "routeLookupConfig": { - "grpcKeybuilders": [{ - "names": [{"service": "grpc.testing.TestService"}], - "headers": %s - }], - "lookupService": "%s", - "cacheSizeBytes": 1000 - }, - "childPolicy": [{"%s": {}}], - "childPolicyConfigTargetFieldName": "Backend" - } - } - ] -}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) + + configJSON := ` + { + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": %d + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] + }` + scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) r.InitialState(resolver.State{ServiceConfig: sc}) - cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - t.Fatalf("create grpc.Dial() failed: %v", err) - } - defer cc.Close() - + ccCh := make(chan *grpc.ClientConn, 1) + go func() { + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return + } + ccCh <- cc + }() <-clientConnUpdateDone + cc := <-ccCh + defer cc.Close() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() @@ -801,48 +805,26 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { t.Logf("Verifying if RPC failed when listener is stopped.") ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2") - makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1) verifyRLSRequest(t, rlsReqCh, true) ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3") - makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3) + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) verifyRLSRequest(t, rlsReqCh, true) // Setting the size to 1 will cause the entries to be // evicted. - scJSON1 := fmt.Sprintf(` -{ - "loadBalancingConfig": [ - { - "%s": { - "routeLookupConfig": { - "grpcKeybuilders": [{ - "names": [{"service": "grpc.testing.TestService"}], - "headers": %s - }], - "lookupService": "%s", - "cacheSizeBytes": 1 - }, - "childPolicy": [{"%s": {}}], - "childPolicyConfigTargetFieldName": "Backend" - } - } - ] -}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) + scJSON1 := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1, childPolicyName) sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) - r.UpdateState(resolver.State{ServiceConfig: sc1}) - <-clientConnUpdateDone - - // Stop the listener - lis.Stop() - + go r.UpdateState(resolver.State{ServiceConfig: sc1}) select { // Wait for backOffItemEvicted to ensure picker was updated // synchronously when there was cache resize on config update. case <-backOffItemEvicted: case <-ctx.Done(): - t.Error("Error sending picker update on eviction of cache entry with valid backoff: context timed out.") + t.Errorf("Error sending picker update on eviction of cache entry with valid backoff: %v", ctx.Err().Error()) } + <-clientConnUpdateDone } // TestDataCachePurging verifies that the LB policy periodically evicts expired From 750332cb064827bc6ad4f8aa1cb72d7d77599757 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 19 Jul 2024 20:48:35 +0530 Subject: [PATCH 03/13] refactor --- balancer/rls/balancer.go | 11 +++--- balancer/rls/balancer_test.go | 66 +++++++++++++++++------------------ 2 files changed, 37 insertions(+), 40 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 0dfdab048879..ed8da1bc5da7 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -74,10 +74,10 @@ var ( // Following functions are no-ops in actual code, but can be overridden in // tests to give tests visibility into exactly when certain events happen. - clientConnUpdateHook = func() {} - dataCachePurgeHook = func() {} - resetBackoffHook = func() {} - entryWithValidBackoffEvicted = func() {} + clientConnUpdateHook = func() {} + dataCachePurgeHook = func() {} + resetBackoffHook = func() {} + newPickerGenerated = func() {} ) func init() { @@ -511,11 +511,10 @@ func (b *rlsBalancer) sendNewPickerLocked() { ConnectivityState: aggregatedState, Picker: picker, } - + newPickerGenerated() if !b.inhibitPickerUpdates { b.logger.Infof("New balancer.State: %+v", state) b.cc.UpdateState(state) - entryWithValidBackoffEvicted() } else { b.logger.Infof("Delaying picker update: %+v", state) } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index f6295253b46a..d8ad4a6261fd 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -654,9 +654,14 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { // in cache size, the picker is updated accordingly. func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // Override the clientConn update hook to get notified. - clientConnUpdateDone := make(chan struct{}, 1) + clientConnUpdateDone := make(chan struct{}) origClientConnUpdateHook := clientConnUpdateHook - clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } + clientConnUpdateHook = func() { + select { + case clientConnUpdateDone <- struct{}{}: + default: + } + } defer func() { clientConnUpdateHook = origClientConnUpdateHook }() // Override the cache entry size func, and always return 1. @@ -678,19 +683,6 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { minEvictDuration = time.Duration(0) defer func() { minEvictDuration = origMinEvictDuration }() - // Override the entryWithValidBackoffEvicted to ensure we update - // picker when an entry with valid backoff time was evicted. - backOffItemEvicted := make(chan struct{}, 1) - origBackoffItemEvicted := entryWithValidBackoffEvicted - entryWithValidBackoffEvicted = func() { - select { - case backOffItemEvicted <- struct{}{}: - default: - // Do nothing if the channel is full - } - } - defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }() - // Register the top-level wrapping balancer which forwards calls to RLS. topLevelBalancerName := t.Name() + "top-level" var ccWrapper *testCCWrapper @@ -728,12 +720,9 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { backendCh2, backendAddress2 := startBackend(t) rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { if req.KeyMap["k1"] == "v1" { - return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry")} - } - if req.KeyMap["k2"] == "v2" { return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} } - if req.KeyMap["k3"] == "v3" { + if req.KeyMap["k2"] == "v2" { return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} } return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} @@ -754,12 +743,6 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { "names": [ "n2" ] - }, - { - "key": "k3", - "names": [ - "n3" - ] } ] ` @@ -804,27 +787,42 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) t.Logf("Verifying if RPC failed when listener is stopped.") - ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2") + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1) verifyRLSRequest(t, rlsReqCh, true) - ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3") + ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2") makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) verifyRLSRequest(t, rlsReqCh, true) - // Setting the size to 1 will cause the entries to be - // evicted. + // Override the newPickerGenerated to measure the number of times + // the picker is generated because state updates can be inhibited. + // + // Note: This needs to be initialised here and not in the beginning + // of the test, as otherwise for every RPC call we make, it'd have + // affected total picker count. + pckrSentBeforeClientConnUpdate := make(chan struct{}, 1) + totalPickerGenerated := 0 + origNewPickerGenerated := newPickerGenerated + newPickerGenerated = func() { + totalPickerGenerated++ + if totalPickerGenerated == 2 { + pckrSentBeforeClientConnUpdate <- struct{}{} + } + } + defer func() { newPickerGenerated = origNewPickerGenerated }() + + // Setting the size to 1 will cause the entries to be evicted. scJSON1 := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1, childPolicyName) sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) go r.UpdateState(resolver.State{ServiceConfig: sc1}) select { - // Wait for backOffItemEvicted to ensure picker was updated - // synchronously when there was cache resize on config update. - case <-backOffItemEvicted: + case <-pckrSentBeforeClientConnUpdate: + case <-clientConnUpdateDone: + t.Fatalf("Client conn update was completed before picker update.") case <-ctx.Done(): - t.Errorf("Error sending picker update on eviction of cache entry with valid backoff: %v", ctx.Err().Error()) + t.Errorf("client conn update could not complete: %v", ctx.Err().Error()) } - <-clientConnUpdateDone } // TestDataCachePurging verifies that the LB policy periodically evicts expired From 255ff6395cb6da5e15115733b5bc15a98d0364e7 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 23 Jul 2024 15:34:24 +0530 Subject: [PATCH 04/13] fix: wait for client conn state to complete at the end --- balancer/rls/balancer_test.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index d8ad4a6261fd..cfdabc251e71 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -656,12 +656,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // Override the clientConn update hook to get notified. clientConnUpdateDone := make(chan struct{}) origClientConnUpdateHook := clientConnUpdateHook - clientConnUpdateHook = func() { - select { - case clientConnUpdateDone <- struct{}{}: - default: - } - } + clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } defer func() { clientConnUpdateHook = origClientConnUpdateHook }() // Override the cache entry size func, and always return 1. @@ -823,6 +818,10 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { case <-ctx.Done(): t.Errorf("client conn update could not complete: %v", ctx.Err().Error()) } + + // Once picker was updated, wait for client conn update + // to complete. + <-clientConnUpdateDone } // TestDataCachePurging verifies that the LB policy periodically evicts expired From f99b76cff207f938ffe0fc360485ccf8b4264096 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Wed, 24 Jul 2024 15:12:29 +0530 Subject: [PATCH 05/13] fix: remove counter for newPickerGenerated func() --- balancer/rls/balancer_test.go | 53 ++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index cfdabc251e71..f566c671858b 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -659,6 +660,19 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } defer func() { clientConnUpdateHook = origClientConnUpdateHook }() + // Override the newPickerGenerated to measure the number of times + // the picker is generated because state updates can be inhibited. + pckrSentBeforeClientConnUpdate := make(chan struct{}, 1) + // Block udpates until the last configuration update + blkUpdates := atomic.Bool{} + origNewPickerGenerated := newPickerGenerated + newPickerGenerated = func() { + if blkUpdates.Load() == true { + pckrSentBeforeClientConnUpdate <- struct{}{} + } + } + defer func() { newPickerGenerated = origNewPickerGenerated }() + // Override the cache entry size func, and always return 1. origEntrySizeFunc := computeDataCacheEntrySize computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 } @@ -773,12 +787,19 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { } ccCh <- cc }() - <-clientConnUpdateDone - cc := <-ccCh - defer cc.Close() ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + + select { + case <-clientConnUpdateDone: + case <-ctx.Done(): + t.Fatalf("error waiting for the client conn update on initial configuration udpate: %v", ctx.Err().Error()) + } + + cc := <-ccCh + defer cc.Close() + makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) t.Logf("Verifying if RPC failed when listener is stopped.") @@ -790,38 +811,26 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) verifyRLSRequest(t, rlsReqCh, true) - // Override the newPickerGenerated to measure the number of times - // the picker is generated because state updates can be inhibited. - // - // Note: This needs to be initialised here and not in the beginning - // of the test, as otherwise for every RPC call we make, it'd have - // affected total picker count. - pckrSentBeforeClientConnUpdate := make(chan struct{}, 1) - totalPickerGenerated := 0 - origNewPickerGenerated := newPickerGenerated - newPickerGenerated = func() { - totalPickerGenerated++ - if totalPickerGenerated == 2 { - pckrSentBeforeClientConnUpdate <- struct{}{} - } - } - defer func() { newPickerGenerated = origNewPickerGenerated }() - // Setting the size to 1 will cause the entries to be evicted. scJSON1 := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1, childPolicyName) sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) + blkUpdates.Store(true) go r.UpdateState(resolver.State{ServiceConfig: sc1}) select { case <-pckrSentBeforeClientConnUpdate: case <-clientConnUpdateDone: t.Fatalf("Client conn update was completed before picker update.") case <-ctx.Done(): - t.Errorf("client conn update could not complete: %v", ctx.Err().Error()) + t.Errorf("error waiting for picker update on receipt of configuration udpate: %v", ctx.Err().Error()) } // Once picker was updated, wait for client conn update // to complete. - <-clientConnUpdateDone + select { + case <-clientConnUpdateDone: + case <-ctx.Done(): + t.Errorf("client conn update could not complete: %v", ctx.Err().Error()) + } } // TestDataCachePurging verifies that the LB policy periodically evicts expired From 8037961888286ebdbc8ea9d1d4a7a565a1c846e3 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 25 Jul 2024 21:49:14 +0530 Subject: [PATCH 06/13] Make pickerUpdateCh blocking --- balancer/rls/balancer_test.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index f566c671858b..9d71de2e4907 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -662,13 +662,13 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // Override the newPickerGenerated to measure the number of times // the picker is generated because state updates can be inhibited. - pckrSentBeforeClientConnUpdate := make(chan struct{}, 1) + pickerUpdateCh := make(chan struct{}) // Block udpates until the last configuration update blkUpdates := atomic.Bool{} origNewPickerGenerated := newPickerGenerated newPickerGenerated = func() { if blkUpdates.Load() == true { - pckrSentBeforeClientConnUpdate <- struct{}{} + pickerUpdateCh <- struct{}{} } } defer func() { newPickerGenerated = origNewPickerGenerated }() @@ -816,12 +816,15 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) blkUpdates.Store(true) go r.UpdateState(resolver.State{ServiceConfig: sc1}) - select { - case <-pckrSentBeforeClientConnUpdate: - case <-clientConnUpdateDone: - t.Fatalf("Client conn update was completed before picker update.") - case <-ctx.Done(): - t.Errorf("error waiting for picker update on receipt of configuration udpate: %v", ctx.Err().Error()) + + for i := 0; i < 2; i++ { + select { + case <-pickerUpdateCh: + case <-clientConnUpdateDone: + t.Fatalf("Client conn update was completed before picker update.") + case <-ctx.Done(): + t.Errorf("error waiting for picker update on receipt of configuration udpate: %v", ctx.Err().Error()) + } } // Once picker was updated, wait for client conn update From 8166fd4858bf274f2f740e37414f16563c77f9eb Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 2 Aug 2024 12:25:34 +0530 Subject: [PATCH 07/13] fix: improved the logs --- balancer/rls/balancer.go | 7 +++---- balancer/rls/balancer_test.go | 22 ++++++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index ed8da1bc5da7..4b71507bd38b 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -77,7 +77,7 @@ var ( clientConnUpdateHook = func() {} dataCachePurgeHook = func() {} resetBackoffHook = func() {} - newPickerGenerated = func() {} + newPickerHook = func() {} ) func init() { @@ -306,8 +306,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // `resizeCache` boolean) because `cacheMu` needs to be grabbed before // `stateMu` if we are to hold both locks at the same time. b.cacheMu.Lock() - evicted := b.dataCache.resize(newCfg.cacheSizeBytes) - if evicted { + if b.dataCache.resize(newCfg.cacheSizeBytes) { b.sendNewPickerLocked() } b.cacheMu.Unlock() @@ -511,10 +510,10 @@ func (b *rlsBalancer) sendNewPickerLocked() { ConnectivityState: aggregatedState, Picker: picker, } - newPickerGenerated() if !b.inhibitPickerUpdates { b.logger.Infof("New balancer.State: %+v", state) b.cc.UpdateState(state) + newPickerHook() } else { b.logger.Infof("Delaying picker update: %+v", state) } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 9d71de2e4907..6832ed5641cf 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -660,18 +660,18 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } defer func() { clientConnUpdateHook = origClientConnUpdateHook }() - // Override the newPickerGenerated to measure the number of times + // Override the newPickerHook to measure the number of times // the picker is generated because state updates can be inhibited. pickerUpdateCh := make(chan struct{}) // Block udpates until the last configuration update blkUpdates := atomic.Bool{} - origNewPickerGenerated := newPickerGenerated - newPickerGenerated = func() { + origNewPickerHook := newPickerHook + newPickerHook = func() { if blkUpdates.Load() == true { pickerUpdateCh <- struct{}{} } } - defer func() { newPickerGenerated = origNewPickerGenerated }() + defer func() { newPickerHook = origNewPickerHook }() // Override the cache entry size func, and always return 1. origEntrySizeFunc := computeDataCacheEntrySize @@ -794,14 +794,16 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { select { case <-clientConnUpdateDone: case <-ctx.Done(): - t.Fatalf("error waiting for the client conn update on initial configuration udpate: %v", ctx.Err().Error()) + t.Fatal("Timed out waiting for the client conn update on initial configuration update") } cc := <-ccCh defer cc.Close() - + // Make an RPC call with empty metadata, which will eventually throw + // the error as no metadata will match from rlsServer response + // callback defined above. This will cause the control channel to + // throw the error and cause the item to get into backoff. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) - t.Logf("Verifying if RPC failed when listener is stopped.") ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1) @@ -821,9 +823,9 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { select { case <-pickerUpdateCh: case <-clientConnUpdateDone: - t.Fatalf("Client conn update was completed before picker update.") + t.Fatal("Client conn update was completed before picker update.") case <-ctx.Done(): - t.Errorf("error waiting for picker update on receipt of configuration udpate: %v", ctx.Err().Error()) + t.Fatal("Timed out waiting for picker update upon receiving a configuration update") } } @@ -832,7 +834,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { select { case <-clientConnUpdateDone: case <-ctx.Done(): - t.Errorf("client conn update could not complete: %v", ctx.Err().Error()) + t.Fatal("Timed out waiting for client conn update upon receiving a configuration update") } } From 2fd2e48e8c61dbc79977232889694fca3618df58 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Tue, 13 Aug 2024 17:12:09 +0530 Subject: [PATCH 08/13] Ensure we send single picker update on config update --- balancer/rls/balancer.go | 70 +++++++++----- balancer/rls/balancer_test.go | 172 +++++++++++++++++----------------- 2 files changed, 133 insertions(+), 109 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 4b71507bd38b..9ac926bf3ec2 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -30,6 +30,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" + estats "google.golang.org/grpc/experimental/stats" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" @@ -74,10 +75,30 @@ var ( // Following functions are no-ops in actual code, but can be overridden in // tests to give tests visibility into exactly when certain events happen. - clientConnUpdateHook = func() {} - dataCachePurgeHook = func() {} - resetBackoffHook = func() {} - newPickerHook = func() {} + clientConnUpdateHook = func() {} + dataCachePurgeHook = func() {} + resetBackoffHook = func() {} + defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.rls.default_target_picks", + Description: "EXPERIMENTAL. Number of LB picks sent to the default target.", + Unit: "pick", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"}, + Default: false, + }) + targetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.rls.target_picks", + Description: "EXPERIMENTAL. Number of LB picks sent to each RLS target. Note that if the default target is also returned by the RLS server, RPCs sent to that target from the cache will be counted in this metric, not in grpc.rls.default_target_picks.", + Unit: "pick", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target", "grpc.lb.rls.data_plane_target", "grpc.lb.pick_result"}, + Default: false, + }) + failedPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ + Name: "grpc.lb.rls.failed_picks", + Description: "EXPERIMENTAL. Number of LB picks failed due to either a failed RLS request or the RLS channel being throttled.", + Unit: "pick", + Labels: []string{"grpc.target", "grpc.lb.rls.server_target"}, + Default: false, + }) ) func init() { @@ -286,14 +307,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // Update the copy of the config in the LB policy before releasing the lock. b.lbCfg = newCfg - - // Enqueue an event which will notify us when the above update has been - // propagated to all child policies, and the child policies have all - // processed their updates, and we have sent a picker update. - done := make(chan struct{}) - b.updateCh.Put(resumePickerUpdates{done: done}) b.stateMu.Unlock() - <-done if resizeCache { // If the new config changes reduces the size of the data cache, we @@ -306,11 +320,17 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // `resizeCache` boolean) because `cacheMu` needs to be grabbed before // `stateMu` if we are to hold both locks at the same time. b.cacheMu.Lock() - if b.dataCache.resize(newCfg.cacheSizeBytes) { - b.sendNewPickerLocked() - } + b.dataCache.resize(newCfg.cacheSizeBytes) b.cacheMu.Unlock() } + b.stateMu.Lock() + // Enqueue an event which will notify us when the above update has been + // propagated to all child policies, and the child policies have all + // processed their updates, and we have sent a picker update. + done := make(chan struct{}) + b.updateCh.Put(resumePickerUpdates{done: done}) + b.stateMu.Unlock() + <-done return nil } @@ -495,25 +515,29 @@ func (b *rlsBalancer) sendNewPickerLocked() { if b.defaultPolicy != nil { b.defaultPolicy.acquireRef() } + picker := &rlsPicker{ - kbm: b.lbCfg.kbMap, - origEndpoint: b.bopts.Target.Endpoint(), - lb: b, - defaultPolicy: b.defaultPolicy, - ctrlCh: b.ctrlCh, - maxAge: b.lbCfg.maxAge, - staleAge: b.lbCfg.staleAge, - bg: b.bg, + kbm: b.lbCfg.kbMap, + origEndpoint: b.bopts.Target.Endpoint(), + lb: b, + defaultPolicy: b.defaultPolicy, + ctrlCh: b.ctrlCh, + maxAge: b.lbCfg.maxAge, + staleAge: b.lbCfg.staleAge, + bg: b.bg, + rlsServerTarget: b.lbCfg.lookupService, + grpcTarget: b.bopts.Target.String(), + metricsRecorder: b.bopts.MetricsRecorder, } picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker)) state := balancer.State{ ConnectivityState: aggregatedState, Picker: picker, } + if !b.inhibitPickerUpdates { b.logger.Infof("New balancer.State: %+v", state) b.cc.UpdateState(state) - newPickerHook() } else { b.logger.Infof("Delaying picker update: %+v", state) } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index 6832ed5641cf..c7a679eb4c8a 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -24,13 +24,13 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "testing" "time" "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/pickfirst" "google.golang.org/grpc/balancer/rls/internal/test/e2e" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" @@ -38,7 +38,6 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/balancer/stub" - rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" internalserviceconfig "google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/testutils" rlstest "google.golang.org/grpc/internal/testutils/rls" @@ -47,6 +46,8 @@ import ( "google.golang.org/grpc/resolver/manual" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/testdata" + + rlspb "google.golang.org/grpc/internal/proto/grpc_lookup_v1" "google.golang.org/protobuf/types/known/durationpb" ) @@ -654,25 +655,19 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { // Test that when a data cache entry is evicted due to config change // in cache size, the picker is updated accordingly. func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { + // Create a restartable listener which can close existing connections. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + // Override the clientConn update hook to get notified. - clientConnUpdateDone := make(chan struct{}) + clientConnUpdateDone := make(chan struct{}, 1) origClientConnUpdateHook := clientConnUpdateHook clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} } defer func() { clientConnUpdateHook = origClientConnUpdateHook }() - // Override the newPickerHook to measure the number of times - // the picker is generated because state updates can be inhibited. - pickerUpdateCh := make(chan struct{}) - // Block udpates until the last configuration update - blkUpdates := atomic.Bool{} - origNewPickerHook := newPickerHook - newPickerHook = func() { - if blkUpdates.Load() == true { - pickerUpdateCh <- struct{}{} - } - } - defer func() { newPickerHook = origNewPickerHook }() - // Override the cache entry size func, and always return 1. origEntrySizeFunc := computeDataCacheEntrySize computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 } @@ -715,7 +710,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { }) // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Register an LB policy to act as the child policy for RLS LB policy. @@ -725,15 +720,19 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // Start a couple of test backends, and set up the fake RLS server to return // these as targets in the RLS response, based on request keys. - backendCh1, backendAddress1 := startBackend(t) + _, backendAddress1 := startBackend(t) backendCh2, backendAddress2 := startBackend(t) + backendCh3, backendAddress3 := startBackend(t) rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { if req.KeyMap["k1"] == "v1" { - return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} } if req.KeyMap["k2"] == "v2" { return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} } + if req.KeyMap["k3"] == "v3" { + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}} + } return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} }) @@ -752,89 +751,90 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { "names": [ "n2" ] + }, + { + "key": "k3", + "names": [ + "n3" + ] } ] ` - - configJSON := ` - { - "loadBalancingConfig": [ - { - "%s": { - "routeLookupConfig": { - "grpcKeybuilders": [{ - "names": [{"service": "grpc.testing.TestService"}], - "headers": %s - }], - "lookupService": "%s", - "cacheSizeBytes": %d - }, - "childPolicy": [{"%s": {}}], - "childPolicyConfigTargetFieldName": "Backend" - } - } - ] - }` - scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName) + scJSON := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": 1000 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) r.InitialState(resolver.State{ServiceConfig: sc}) - ccCh := make(chan *grpc.ClientConn, 1) - go func() { - cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return - } - ccCh <- cc - }() + cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("create grpc.Dial() failed: %v", err) + } + defer cc.Close() + + <-clientConnUpdateDone ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - - select { - case <-clientConnUpdateDone: - case <-ctx.Done(): - t.Fatal("Timed out waiting for the client conn update on initial configuration update") - } - - cc := <-ccCh - defer cc.Close() - // Make an RPC call with empty metadata, which will eventually throw - // the error as no metadata will match from rlsServer response - // callback defined above. This will cause the control channel to - // throw the error and cause the item to get into backoff. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) + t.Logf("Verifying if RPC failed when listener is stopped.") - ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") - makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1) + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) verifyRLSRequest(t, rlsReqCh, true) - ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2") - makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) + ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3) verifyRLSRequest(t, rlsReqCh, true) - // Setting the size to 1 will cause the entries to be evicted. - scJSON1 := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1, childPolicyName) + testutils.AwaitState(ctx, t, cc, connectivity.Ready) + initialStateCnt := len(ccWrapper.getStates()) + // Setting the size to 1 will cause the entries to be + // evicted. + scJSON1 := fmt.Sprintf(` +{ + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": 1 + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] +}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) - blkUpdates.Store(true) - go r.UpdateState(resolver.State{ServiceConfig: sc1}) - - for i := 0; i < 2; i++ { - select { - case <-pickerUpdateCh: - case <-clientConnUpdateDone: - t.Fatal("Client conn update was completed before picker update.") - case <-ctx.Done(): - t.Fatal("Timed out waiting for picker update upon receiving a configuration update") - } - } + r.UpdateState(resolver.State{ServiceConfig: sc1}) + <-clientConnUpdateDone + // Stop the listener + lis.Stop() + finalStateCnt := len(ccWrapper.getStates()) - // Once picker was updated, wait for client conn update - // to complete. - select { - case <-clientConnUpdateDone: - case <-ctx.Done(): - t.Fatal("Timed out waiting for client conn update upon receiving a configuration update") + if finalStateCnt != initialStateCnt+1 { + t.Errorf("Unexpected balancer state count: got %v, want %v", finalStateCnt, initialStateCnt) } } @@ -1107,7 +1107,7 @@ func (s) TestUpdateStatePauses(t *testing.T) { } stub.Register(childPolicyName, stub.BalancerFuncs{ Init: func(bd *stub.BalancerData) { - bd.Data = balancer.Get(grpc.PickFirstBalancerName).Build(bd.ClientConn, bd.BuildOptions) + bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions) }, ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { cfg := &childPolicyConfig{} From 6c27bbd651c22714af7e90dc1f65ea7ebbcecb08 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan <159750762+aranjans@users.noreply.github.com> Date: Tue, 13 Aug 2024 17:24:14 +0530 Subject: [PATCH 09/13] fixed typo --- balancer/rls/balancer.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 04f86f3a4ac4..1caa0af0257e 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -75,9 +75,10 @@ var ( // Following functions are no-ops in actual code, but can be overridden in // tests to give tests visibility into exactly when certain events happen. - clientConnUpdateHook = func() {} - dataCachePurgeHook = func() {} - resetBackoffHook = func() {} + clientConnUpdateHook = func() {} + dataCachePurgeHook = func() {} + resetBackoffHook = func() {} + defaultTargetPicksMetric = estats.RegisterInt64Count(estats.MetricDescriptor{ Name: "grpc.lb.rls.default_target_picks", Description: "EXPERIMENTAL. Number of LB picks sent to the default target.", @@ -175,7 +176,7 @@ type rlsBalancer struct { // default child policy wrapper when a new picker is created. See // sendNewPickerLocked() for details. lastPicker *rlsPicker - // Set during CoUpdateConnState when pushing updates to child policies. + // Set during UpdateClientConnState when pushing updates to child policies. // Prevents state updates from child policies causing new pickers to be sent // up the channel. Cleared after all child policies have processed the // updates sent to them, after which a new picker is sent up the channel. From 9527952807a3bb1e136204a4dc22bacc34b3a63f Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Wed, 14 Aug 2024 21:23:55 +0530 Subject: [PATCH 10/13] fixed earwars comments --- balancer/rls/balancer.go | 2 - balancer/rls/balancer_test.go | 83 +++++++++++++++-------------------- 2 files changed, 35 insertions(+), 50 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 1caa0af0257e..fc19af251ccd 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -324,13 +324,11 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error b.dataCache.resize(newCfg.cacheSizeBytes) b.cacheMu.Unlock() } - b.stateMu.Lock() // Enqueue an event which will notify us when the above update has been // propagated to all child policies, and the child policies have all // processed their updates, and we have sent a picker update. done := make(chan struct{}) b.updateCh.Put(resumePickerUpdates{done: done}) - b.stateMu.Unlock() <-done return nil } diff --git a/balancer/rls/balancer_test.go b/balancer/rls/balancer_test.go index c7a679eb4c8a..3e69a51f9275 100644 --- a/balancer/rls/balancer_test.go +++ b/balancer/rls/balancer_test.go @@ -655,13 +655,6 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) { // Test that when a data cache entry is evicted due to config change // in cache size, the picker is updated accordingly. func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { - // Create a restartable listener which can close existing connections. - l, err := testutils.LocalTCPListener() - if err != nil { - t.Fatalf("net.Listen() failed: %v", err) - } - lis := testutils.NewRestartableListener(l) - // Override the clientConn update hook to get notified. clientConnUpdateDone := make(chan struct{}, 1) origClientConnUpdateHook := clientConnUpdateHook @@ -710,7 +703,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { }) // Start an RLS server and set the throttler to never throttle requests. - rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis) + rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil) overrideAdaptiveThrottler(t, neverThrottlingThrottler()) // Register an LB policy to act as the child policy for RLS LB policy. @@ -720,19 +713,17 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { // Start a couple of test backends, and set up the fake RLS server to return // these as targets in the RLS response, based on request keys. - _, backendAddress1 := startBackend(t) + // Start a couple of test backends, and set up the fake RLS server to return + // these as targets in the RLS response, based on request keys. + backendCh1, backendAddress1 := startBackend(t) backendCh2, backendAddress2 := startBackend(t) - backendCh3, backendAddress3 := startBackend(t) rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse { if req.KeyMap["k1"] == "v1" { - return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} + return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}} } if req.KeyMap["k2"] == "v2" { return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}} } - if req.KeyMap["k3"] == "v3" { - return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}} - } return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")} }) @@ -751,34 +742,30 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { "names": [ "n2" ] - }, - { - "key": "k3", - "names": [ - "n3" - ] } ] ` - scJSON := fmt.Sprintf(` -{ - "loadBalancingConfig": [ - { - "%s": { - "routeLookupConfig": { - "grpcKeybuilders": [{ - "names": [{"service": "grpc.testing.TestService"}], - "headers": %s - }], - "lookupService": "%s", - "cacheSizeBytes": 1000 - }, - "childPolicy": [{"%s": {}}], - "childPolicyConfigTargetFieldName": "Backend" - } - } - ] -}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName) + + configJSON := ` + { + "loadBalancingConfig": [ + { + "%s": { + "routeLookupConfig": { + "grpcKeybuilders": [{ + "names": [{"service": "grpc.testing.TestService"}], + "headers": %s + }], + "lookupService": "%s", + "cacheSizeBytes": %d + }, + "childPolicy": [{"%s": {}}], + "childPolicyConfigTargetFieldName": "Backend" + } + } + ] + }` + scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName) sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON) r.InitialState(resolver.State{ServiceConfig: sc}) @@ -792,18 +779,20 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() + // Make an RPC call with empty metadata, which will eventually throw + // the error as no metadata will match from rlsServer response + // callback defined above. This will cause the control channel to + // throw the error and cause the item to get into backoff. makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil) - t.Logf("Verifying if RPC failed when listener is stopped.") - ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2") - makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) + ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1) verifyRLSRequest(t, rlsReqCh, true) - ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3") - makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3) + ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2") + makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2) verifyRLSRequest(t, rlsReqCh, true) - testutils.AwaitState(ctx, t, cc, connectivity.Ready) initialStateCnt := len(ccWrapper.getStates()) // Setting the size to 1 will cause the entries to be // evicted. @@ -818,7 +807,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { "headers": %s }], "lookupService": "%s", - "cacheSizeBytes": 1 + "cacheSizeBytes": 2 }, "childPolicy": [{"%s": {}}], "childPolicyConfigTargetFieldName": "Backend" @@ -829,8 +818,6 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) { sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1) r.UpdateState(resolver.State{ServiceConfig: sc1}) <-clientConnUpdateDone - // Stop the listener - lis.Stop() finalStateCnt := len(ccWrapper.getStates()) if finalStateCnt != initialStateCnt+1 { From 131bf6d8517308900abab9d91aac25f59d6975e0 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan <159750762+aranjans@users.noreply.github.com> Date: Thu, 15 Aug 2024 13:37:01 +0530 Subject: [PATCH 11/13] fixed vet warning --- balancer/rls/balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 1b4679383ed5..e5ecf9459c00 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -340,7 +340,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // `stateMu` if we are to hold both locks at the same time. b.dataCache.resize(newCfg.cacheSizeBytes) } - b.cacheMu.Unlock() + b.cacheMu.Unlock() // Enqueue an event which will notify us when the above update has been // propagated to all child policies, and the child policies have all // processed their updates, and we have sent a picker update. From 8ebbac4bf9fa898c57664d0edce28163de41532c Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Thu, 15 Aug 2024 18:40:22 +0530 Subject: [PATCH 12/13] fixed vet error --- balancer/rls/balancer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index e5ecf9459c00..8928d773d8a0 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -340,7 +340,7 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // `stateMu` if we are to hold both locks at the same time. b.dataCache.resize(newCfg.cacheSizeBytes) } - b.cacheMu.Unlock() + b.cacheMu.Unlock() // Enqueue an event which will notify us when the above update has been // propagated to all child policies, and the child policies have all // processed their updates, and we have sent a picker update. From 09c0dea09829067ad0a984281aae159c11df7bf3 Mon Sep 17 00:00:00 2001 From: Abhishek Ranjan Date: Fri, 16 Aug 2024 11:00:26 +0530 Subject: [PATCH 13/13] Remove redundant comment --- balancer/rls/balancer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/balancer/rls/balancer.go b/balancer/rls/balancer.go index 8928d773d8a0..5ae4d2e13167 100644 --- a/balancer/rls/balancer.go +++ b/balancer/rls/balancer.go @@ -334,10 +334,6 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error // specified size. If we do evict an entry with valid backoff timer, // the new picker needs to be sent to the channel to re-process any // RPCs queued as a result of this backoff timer. - // - // And we cannot do this operation above (where we compute the - // `resizeCache` boolean) because `cacheMu` needs to be grabbed before - // `stateMu` if we are to hold both locks at the same time. b.dataCache.resize(newCfg.cacheSizeBytes) } b.cacheMu.Unlock()