From 86135c37f3830c481ff890625928453b4161bbca Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 15 Aug 2024 12:48:46 -0700 Subject: [PATCH] csds: unskip e2e test (#7502) --- xds/csds/csds_e2e_test.go | 312 ++++++++++++++++++++++++++++++-------- 1 file changed, 247 insertions(+), 65 deletions(-) diff --git a/xds/csds/csds_e2e_test.go b/xds/csds/csds_e2e_test.go index d5611ca97d5e..90dd265a6ce1 100644 --- a/xds/csds/csds_e2e_test.go +++ b/xds/csds/csds_e2e_test.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "io" - "runtime" "slices" "strings" "testing" @@ -69,54 +68,95 @@ func Test(t *testing.T) { // about the callback received by these watchers in the test. We only care // whether CSDS reports the expected state. -type unimplementedListenerWatcher struct{} +type nopListenerWatcher struct{} -func (unimplementedListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { +func (nopListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { +func (nopListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { +func (nopListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { onDone.OnDone() } -type unimplementedRouteConfigWatcher struct{} +type nopRouteConfigWatcher struct{} -func (unimplementedRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { +func (nopRouteConfigWatcher) OnUpdate(_ *xdsresource.RouteConfigResourceData, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { +func (nopRouteConfigWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { +func (nopRouteConfigWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { onDone.OnDone() } -type unimplementedClusterWatcher struct{} +type nopClusterWatcher struct{} -func (unimplementedClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { +func (nopClusterWatcher) OnUpdate(_ *xdsresource.ClusterResourceData, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { +func (nopClusterWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { +func (nopClusterWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { onDone.OnDone() } -type unimplementedEndpointsWatcher struct{} +type nopEndpointsWatcher struct{} -func (unimplementedEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { +func (nopEndpointsWatcher) OnUpdate(_ *xdsresource.EndpointsResourceData, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { +func (nopEndpointsWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { onDone.OnDone() } -func (unimplementedEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { +func (nopEndpointsWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { onDone.OnDone() } +// This watcher writes the onDone callback on to a channel for the test to +// invoke it when it wants to unblock the next read on the ADS stream in the xDS +// client. This is particularly useful when a resource is NACKed, because the +// go-control-plane management server continuously resends the same resource in +// this case, and applying flow control from these watchers ensures that xDS +// client does not spend all of its time receiving and NACKing updates from the +// management server. This was indeed the case on arm64 (before we had support +// for ADS stream level flow control), and was causing CSDS to not receive any +// updates from the xDS client. +type blockingListenerWatcher struct { + testCtxDone <-chan struct{} // Closed when the test is done. + onDoneCh chan xdsresource.DoneNotifier // Channel to write the onDone callback to. +} + +func newBlockingListenerWatcher(testCtxDone <-chan struct{}) *blockingListenerWatcher { + return &blockingListenerWatcher{ + testCtxDone: testCtxDone, + onDoneCh: make(chan xdsresource.DoneNotifier, 1), + } +} + +func (w *blockingListenerWatcher) OnUpdate(_ *xdsresource.ListenerResourceData, onDone xdsresource.DoneNotifier) { + writeOnDone(w.testCtxDone, w.onDoneCh, onDone) +} +func (w *blockingListenerWatcher) OnError(_ error, onDone xdsresource.DoneNotifier) { + writeOnDone(w.testCtxDone, w.onDoneCh, onDone) +} +func (w *blockingListenerWatcher) OnResourceDoesNotExist(onDone xdsresource.DoneNotifier) { + writeOnDone(w.testCtxDone, w.onDoneCh, onDone) +} + +// writeOnDone attempts to writes the onDone callback on the onDone channel. It +// returns when it can successfully write to the channel or when the test is +// done, which is signalled by testCtxDone being closed. +func writeOnDone(testCtxDone <-chan struct{}, onDoneCh chan xdsresource.DoneNotifier, onDone xdsresource.DoneNotifier) { + select { + case <-testCtxDone: + case onDoneCh <- onDone: + } +} + // Creates a gRPC server and starts serving a CSDS service implementation on it. // Returns the address of the newly created gRPC server. // @@ -170,21 +210,10 @@ func startCSDSClientStream(ctx context.Context, t *testing.T, serverAddr string) // - Configures resources on the management server corresponding to the ones // being watched by the clients, and verifies that the CSDS response reports // resources in ACKED state. -// - Modifies resources on the management server such that some of them are -// expected to be NACKed by the client. Verifies that the CSDS response -// contains some resources in ACKED state and some in NACKED state. // -// For all of the above operations, the test also verifies that the client_scope -// field in the CSDS response is populated appropriately. +// For the above operations, the test also verifies that the client_scope field +// in the CSDS response is populated appropriately. func (s) TestCSDS(t *testing.T) { - // TODO(easwars): Once https://github.com/grpc/grpc/issues/34099 is fixed - // for grpc-go, use resource watchers which are able to control how much we - // read from the management server, and thereby allowing this test to not - // starve in the presence of constant updates from the management server. - if runtime.GOARCH == "arm64" { - t.Skip("Skipping test on arm64 due to https://github.com/envoyproxy/go-control-plane/issues/962") - } - // Spin up a xDS management server on a local port. mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) @@ -277,16 +306,16 @@ func (s) TestCSDS(t *testing.T) { // Register watches on the xDS clients for two resources of each type. for _, xdsC := range []xdsclient.XDSClient{xdsClient1, xdsClient2} { for _, target := range ldsTargets { - xdsresource.WatchListener(xdsC, target, unimplementedListenerWatcher{}) + xdsresource.WatchListener(xdsC, target, nopListenerWatcher{}) } for _, target := range rdsTargets { - xdsresource.WatchRouteConfig(xdsC, target, unimplementedRouteConfigWatcher{}) + xdsresource.WatchRouteConfig(xdsC, target, nopRouteConfigWatcher{}) } for _, target := range cdsTargets { - xdsresource.WatchCluster(xdsC, target, unimplementedClusterWatcher{}) + xdsresource.WatchCluster(xdsC, target, nopClusterWatcher{}) } for _, target := range edsTargets { - xdsresource.WatchEndpoints(xdsC, target, unimplementedEndpointsWatcher{}) + xdsresource.WatchEndpoints(xdsC, target, nopEndpointsWatcher{}) } } @@ -361,50 +390,203 @@ func (s) TestCSDS(t *testing.T) { if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { t.Fatal(err) } +} + +// Tests CSDS functionality. The test performs the following: +// - Spins up a management server and creates two xDS clients talking to it. +// - Registers one watch on each xDS client, and verifies that the CSDS +// response reports resources in REQUESTED state. +// - Configures two resources on the management server and verifies that the +// CSDS response reports the resources as being in ACKED state. +// - Updates one of two resources on the management server such that it is +// expected to be NACKed by the client. Verifies that the CSDS response +// contains one resource in ACKED state and one in NACKED state. +// +// For the above operations, the test also verifies that the client_scope field +// in the CSDS response is populated appropriately. +// +// This test does a bunch of similar things to the previous test, but has +// reduced complexity because of having to deal with a single resource type. +// This makes is possible to test the NACKing a resource (which results in +// continuous resending of the resource by the go-control-plane management +// server), in an easier and less flaky way. +func (s) TestCSDS_NACK(t *testing.T) { + // Spin up a xDS management server on a local port. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + + // Create a bootstrap contents pointing to the above management server. + nodeID := uuid.New().String() + bootstrapContents := e2e.DefaultBootstrapContents(t, nodeID, mgmtServer.Address) + + // Create two xDS clients, with different names. These should end up + // creating two different xDS clients. + const xdsClient1Name = "xds-csds-client-1" + xdsClient1, xdsClose1, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: xdsClient1Name, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose1() + const xdsClient2Name = "xds-csds-client-2" + xdsClient2, xdsClose2, err := xdsclient.NewForTesting(xdsclient.OptionsForTesting{ + Name: xdsClient2Name, + Contents: bootstrapContents, + }) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer xdsClose2() - // Update the first resource of each type in the management server to a - // value which is expected to be NACK'ed by the xDS client. - listeners[0].ApiListener = &v3listenerpb.ApiListener{} - routes[0].VirtualHosts = []*v3routepb.VirtualHost{{Routes: []*v3routepb.Route{{}}}} - clusters[0].ClusterDiscoveryType = &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_STATIC} - endpoints[0].Endpoints = []*v3endpointpb.LocalityLbEndpoints{{}} + // Start a CSDS server and create a client stream to it. + addr := startCSDSServer(t) + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + stream := startCSDSClientStream(ctx, t, addr) + + // Verify that the xDS client reports an empty config. + wantNode := &v3corepb.Node{ + Id: nodeID, + UserAgentName: "gRPC Go", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: grpc.Version}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + } + wantResp := &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) + } + + // Initialize the xDS resources to be used in this test. + const ldsTarget0, ldsTarget1 = "lds.target.good:0000", "lds.target.good:1111" + listener0 := e2e.DefaultClientListener(ldsTarget0, "rds-name") + listener1 := e2e.DefaultClientListener(ldsTarget1, "rds-name") + listenerAny0 := testutils.MarshalAny(t, listener0) + listenerAny1 := testutils.MarshalAny(t, listener1) + + // Register the watchers, one for each xDS client. + watcher1 := nopListenerWatcher{} + watcher2 := newBlockingListenerWatcher(ctx.Done()) + xdsresource.WatchListener(xdsClient1, ldsTarget0, watcher1) + xdsresource.WatchListener(xdsClient2, ldsTarget1, watcher2) + + // Verify that the xDS client reports the resources as being in "Requested" + // state, and in version "0". + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + }, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "", v3adminpb.ClientResourceStatus_REQUESTED, nil, nil), + }, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) + } + + // Configure the management server with two listener resources corresponding + // to the watches registered above. if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ NodeID: nodeID, - Listeners: listeners, - Routes: routes, - Clusters: clusters, - Endpoints: endpoints, + Listeners: []*v3listenerpb.Listener{listener0, listener1}, SkipValidation: true, }); err != nil { t.Fatal(err) } - // Verify that the xDS client reports the first resource of each type as - // being in "NACKed" state, and the second resource of each type to be in - // "ACKed" state. The version for the ACKed resource would be "2", while - // that for the NACKed resource would be "1". In the NACKed resource, the - // version which is NACKed is stored in the ErrorState field. - wantConfigs = []*v3statuspb.ClientConfig_GenericXdsConfig{ - makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, clusterAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.cluster.v3.Cluster", cdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, clusterAnys[1], nil), - makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, endpointAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment", edsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, endpointAnys[1], nil), - makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, listenerAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, listenerAnys[1], nil), - makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[0], "1", v3adminpb.ClientResourceStatus_NACKED, routeAnys[0], &v3adminpb.UpdateFailureState{VersionInfo: "2"}), - makeGenericXdsConfig("type.googleapis.com/envoy.config.route.v3.RouteConfiguration", rdsTargets[1], "2", v3adminpb.ClientResourceStatus_ACKED, routeAnys[1], nil), + // Verify that the xDS client reports the resources as being in "ACKed" + // state, and in version "1". + wantResp = &v3statuspb.ClientStatusResponse{ + Config: []*v3statuspb.ClientConfig{ + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "1", v3adminpb.ClientResourceStatus_ACKED, listenerAny0, nil), + }, + ClientScope: xdsClient1Name, + }, + { + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "1", v3adminpb.ClientResourceStatus_ACKED, listenerAny1, nil), + }, + ClientScope: xdsClient2Name, + }, + }, + } + if err := checkClientStatusResponse(ctx, stream, wantResp); err != nil { + t.Fatal(err) + } + + // Unblock reads on the ADS stream by calling the onDone callback sent to + // the watcher. + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for watch callback") + case onDone := <-watcher2.onDoneCh: + onDone.OnDone() + } + + // Update the second resource with an empty ApiListener field which is + // expected to be NACK'ed by the xDS client. + listener1.ApiListener = nil + if err := mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listener0, listener1}, + SkipValidation: true, + }); err != nil { + t.Fatal(err) + } + + // Wait for the update to reach the watchers. + select { + case <-ctx.Done(): + t.Fatal("Timed out waiting for watch callback") + case onDone := <-watcher2.onDoneCh: + onDone.OnDone() } + + // Verify that the xDS client reports the first listener resource as being + // ACKed and the second listener resource as being NACKed. The version for + // the ACKed resource would be "2", while that for the NACKed resource would + // be "1". In the NACKed resource, the version which is NACKed is stored in + // the ErrorState field. wantResp = &v3statuspb.ClientStatusResponse{ Config: []*v3statuspb.ClientConfig{ { - Node: wantNode, - GenericXdsConfigs: wantConfigs, - ClientScope: xdsClient1Name, + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget0, "2", v3adminpb.ClientResourceStatus_ACKED, listenerAny0, nil), + }, + ClientScope: xdsClient1Name, }, { - Node: wantNode, - GenericXdsConfigs: wantConfigs, - ClientScope: xdsClient2Name, + Node: wantNode, + GenericXdsConfigs: []*v3statuspb.ClientConfig_GenericXdsConfig{ + makeGenericXdsConfig("type.googleapis.com/envoy.config.listener.v3.Listener", ldsTarget1, "1", v3adminpb.ClientResourceStatus_NACKED, listenerAny1, &v3adminpb.UpdateFailureState{VersionInfo: "2"}), + }, + ClientScope: xdsClient2Name, }, }, } @@ -475,7 +657,7 @@ func checkClientStatusResponse(ctx context.Context, stream v3statuspbgrpc.Client } lastErr = fmt.Errorf("received unexpected resource dump, diff (-got, +want):\n%s, got: %s\n want:%s", diff, pretty.ToJSON(got), pretty.ToJSON(want)) } - return fmt.Errorf("timeout when waiting for resource dump to reach expected state: %v", lastErr) + return fmt.Errorf("timeout when waiting for resource dump to reach expected state: ctxErr: %v, otherErr: %v", ctx.Err(), lastErr) } func (s) TestCSDSNoXDSClient(t *testing.T) {