Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

rls: update picker synchronously on configuration update #7412

Merged
merged 15 commits into from
Aug 16, 2024
Merged
2 changes: 1 addition & 1 deletion balancer/rls/balancer.go
Original file line number Diff line number Diff line change
@@ -75,8 +75,8 @@
// Following functions are no-ops in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}

Check warning on line 79 in balancer/rls/balancer.go

Codecov / codecov/patch

balancer/rls/balancer.go#L78-L79

Added lines #L78 - L79 were not covered by tests
entryWithValidBackoffEvicted = func() {}
)

@@ -309,7 +309,6 @@
evicted := b.dataCache.resize(newCfg.cacheSizeBytes)
if evicted {
b.sendNewPickerLocked()
entryWithValidBackoffEvicted()
}
b.cacheMu.Unlock()
}
@@ -516,6 +515,7 @@
if !b.inhibitPickerUpdates {
b.logger.Infof("New balancer.State: %+v", state)
b.cc.UpdateState(state)
entryWithValidBackoffEvicted()
} else {
b.logger.Infof("Delaying picker update: %+v", state)
}
116 changes: 49 additions & 67 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
@@ -653,13 +653,6 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// Test that when a data cache entry is evicted due to config change
// in cache size, the picker is updated accordingly.
func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
@@ -689,7 +682,13 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
// picker when an entry with valid backoff time was evicted.
backOffItemEvicted := make(chan struct{}, 1)
origBackoffItemEvicted := entryWithValidBackoffEvicted
entryWithValidBackoffEvicted = func() { backOffItemEvicted <- struct{}{} }
entryWithValidBackoffEvicted = func() {
select {
case backOffItemEvicted <- struct{}{}:
default:
// Do nothing if the channel is full
}
}
defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }()

// Register the top-level wrapping balancer which forwards calls to RLS.
@@ -715,7 +714,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
})

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
@@ -725,18 +724,17 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {

// Start a couple of test backends, and set up the fake RLS server to return
// these as targets in the RLS response, based on request keys.
_, backendAddress1 := startBackend(t)
backendCh1, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
backendCh3, backendAddress3 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry")}
}
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k3"] == "v3" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})
@@ -765,84 +763,68 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
}
]
`
scJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": 1000
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)

configJSON := `
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": %d
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`
scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("create grpc.Dial() failed: %v", err)
}
defer cc.Close()

ccCh := make(chan *grpc.ClientConn, 1)
go func() {
cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return
}
ccCh <- cc
}()
<-clientConnUpdateDone
cc := <-ccCh
defer cc.Close()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
t.Logf("Verifying if RPC failed when listener is stopped.")

ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1)
verifyRLSRequest(t, rlsReqCh, true)

ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3)
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
verifyRLSRequest(t, rlsReqCh, true)

// Setting the size to 1 will cause the entries to be
// evicted.
scJSON1 := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": 1
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)
scJSON1 := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1, childPolicyName)
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
r.UpdateState(resolver.State{ServiceConfig: sc1})
<-clientConnUpdateDone

// Stop the listener
lis.Stop()

go r.UpdateState(resolver.State{ServiceConfig: sc1})
select {
// Wait for backOffItemEvicted to ensure picker was updated
// synchronously when there was cache resize on config update.
case <-backOffItemEvicted:
case <-ctx.Done():
t.Error("Error sending picker update on eviction of cache entry with valid backoff: context timed out.")
t.Errorf("Error sending picker update on eviction of cache entry with valid backoff: %v", ctx.Err().Error())
}
<-clientConnUpdateDone
}

// TestDataCachePurging verifies that the LB policy periodically evicts expired