Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

rls: update picker synchronously on configuration update #7412

Merged
merged 15 commits into from
Aug 16, 2024
Merged
Prev Previous commit
Next Next commit
fixed earwars comments
aranjans committed Aug 14, 2024
commit 9527952807a3bb1e136204a4dc22bacc34b3a63f
2 changes: 0 additions & 2 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
@@ -324,13 +324,11 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
b.dataCache.resize(newCfg.cacheSizeBytes)
b.cacheMu.Unlock()
}
b.stateMu.Lock()
// Enqueue an event which will notify us when the above update has been
// propagated to all child policies, and the child policies have all
// processed their updates, and we have sent a picker update.
done := make(chan struct{})
b.updateCh.Put(resumePickerUpdates{done: done})
b.stateMu.Unlock()
<-done
return nil
}
83 changes: 35 additions & 48 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
@@ -655,13 +655,6 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
// Test that when a data cache entry is evicted due to config change
// in cache size, the picker is updated accordingly.
func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
@@ -710,7 +703,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
})

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, nil)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
@@ -720,19 +713,17 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {

// Start a couple of test backends, and set up the fake RLS server to return
// these as targets in the RLS response, based on request keys.
_, backendAddress1 := startBackend(t)
// Start a couple of test backends, and set up the fake RLS server to return
// these as targets in the RLS response, based on request keys.
backendCh1, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
backendCh3, backendAddress3 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &rlstest.RouteLookupResponse{Err: errors.New("throwing error from control channel for first entry"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
if req.KeyMap["k3"] == "v3" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}}
}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})

@@ -751,34 +742,30 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
"names": [
"n2"
]
},
{
"key": "k3",
"names": [
"n3"
]
}
]
`
scJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": 1000
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)

configJSON := `
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": %d
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`
scJSON := fmt.Sprintf(configJSON, topLevelBalancerName, headers, rlsServer.Address, 1000, childPolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

@@ -792,18 +779,20 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Make an RPC call with empty metadata, which will eventually throw
// the error as no metadata will match from rlsServer response
// callback defined above. This will cause the control channel to
// throw the error and cause the item to get into backoff.
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
t.Logf("Verifying if RPC failed when listener is stopped.")

ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n1", "v1")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh1)
verifyRLSRequest(t, rlsReqCh, true)

ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3)
ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
verifyRLSRequest(t, rlsReqCh, true)

testutils.AwaitState(ctx, t, cc, connectivity.Ready)
initialStateCnt := len(ccWrapper.getStates())
// Setting the size to 1 will cause the entries to be
// evicted.
@@ -818,7 +807,7 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": 1
"cacheSizeBytes": 2
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
@@ -829,8 +818,6 @@ func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
r.UpdateState(resolver.State{ServiceConfig: sc1})
<-clientConnUpdateDone
// Stop the listener
lis.Stop()
finalStateCnt := len(ccWrapper.getStates())

if finalStateCnt != initialStateCnt+1 {
Loading