From 7c03f1d464c5bbdcdbb31eae282969c4e9ecc6cd Mon Sep 17 00:00:00 2001 From: Matthew Stevenson Date: Tue, 2 Jan 2024 08:41:18 -0800 Subject: [PATCH 1/2] alts: Forward-fix of ALTS queuing of handshake requests. --- credentials/alts/alts_test.go | 9 +++++++-- credentials/alts/internal/handshaker/handshaker.go | 10 ++++------ .../alts/internal/handshaker/handshaker_test.go | 12 ++++++------ 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/credentials/alts/alts_test.go b/credentials/alts/alts_test.go index 20062fe77539..b9dc94783196 100644 --- a/credentials/alts/alts_test.go +++ b/credentials/alts/alts_test.go @@ -44,7 +44,7 @@ import ( ) const ( - defaultTestLongTimeout = 10 * time.Second + defaultTestLongTimeout = 60 * time.Second defaultTestShortTimeout = 10 * time.Millisecond ) @@ -392,17 +392,22 @@ func establishAltsConnection(t *testing.T, handshakerAddress, serverAddress stri ctx, cancel := context.WithTimeout(context.Background(), defaultTestLongTimeout) defer cancel() c := testgrpc.NewTestServiceClient(conn) + success := false for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { _, err = c.UnaryCall(ctx, &testpb.SimpleRequest{}) if err == nil { + success = true break } - if code := status.Code(err); code == codes.Unavailable { + if code := status.Code(err); code == codes.Unavailable || code == codes.DeadlineExceeded { // The server is not ready yet. Try again. continue } t.Fatalf("c.UnaryCall() failed: %v", err) } + if !success { + t.Fatalf("c.UnaryCall() timed out after %v", defaultTestShortTimeout) + } } func startFakeHandshakerService(t *testing.T, wait *sync.WaitGroup) (stop func(), address string) { diff --git a/credentials/alts/internal/handshaker/handshaker.go b/credentials/alts/internal/handshaker/handshaker.go index 64890797f30d..6c867dd85015 100644 --- a/credentials/alts/internal/handshaker/handshaker.go +++ b/credentials/alts/internal/handshaker/handshaker.go @@ -61,8 +61,6 @@ var ( // control number of concurrent created (but not closed) handshakes. clientHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes)) serverHandshakes = semaphore.NewWeighted(int64(envconfig.ALTSMaxConcurrentHandshakes)) - // errDropped occurs when maxPendingHandshakes is reached. - errDropped = errors.New("maximum number of concurrent ALTS handshakes is reached") // errOutOfBound occurs when the handshake service returns a consumed // bytes value larger than the buffer that was passed to it originally. errOutOfBound = errors.New("handshaker service consumed bytes value is out-of-bound") @@ -156,8 +154,8 @@ func NewServerHandshaker(ctx context.Context, conn *grpc.ClientConn, c net.Conn, // ClientHandshake starts and completes a client ALTS handshake for GCP. Once // done, ClientHandshake returns a secure connection. func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) { - if !clientHandshakes.TryAcquire(1) { - return nil, nil, errDropped + if err := clientHandshakes.Acquire(ctx, 1); err != nil { + return nil, nil, err } defer clientHandshakes.Release(1) @@ -209,8 +207,8 @@ func (h *altsHandshaker) ClientHandshake(ctx context.Context) (net.Conn, credent // ServerHandshake starts and completes a server ALTS handshake for GCP. Once // done, ServerHandshake returns a secure connection. func (h *altsHandshaker) ServerHandshake(ctx context.Context) (net.Conn, credentials.AuthInfo, error) { - if !serverHandshakes.TryAcquire(1) { - return nil, nil, errDropped + if err := serverHandshakes.Acquire(ctx, 1); err != nil { + return nil, nil, err } defer serverHandshakes.Release(1) diff --git a/credentials/alts/internal/handshaker/handshaker_test.go b/credentials/alts/internal/handshaker/handshaker_test.go index 956e0fbd1c20..9f877c48dcf3 100644 --- a/credentials/alts/internal/handshaker/handshaker_test.go +++ b/credentials/alts/internal/handshaker/handshaker_test.go @@ -193,10 +193,10 @@ func (s) TestClientHandshake(t *testing.T) { }() } - // Ensure all errors are expected. + // Ensure that there are no errors. for i := 0; i < testCase.numberOfHandshakes; i++ { - if err := <-errc; err != nil && err != errDropped { - t.Errorf("ClientHandshake() = _, %v, want _, or %v", err, errDropped) + if err := <-errc; err != nil { + t.Errorf("ClientHandshake() = _, %v, want _, ", err) } } @@ -250,10 +250,10 @@ func (s) TestServerHandshake(t *testing.T) { }() } - // Ensure all errors are expected. + // Ensure that there are no errors. for i := 0; i < testCase.numberOfHandshakes; i++ { - if err := <-errc; err != nil && err != errDropped { - t.Errorf("ServerHandshake() = _, %v, want _, or %v", err, errDropped) + if err := <-errc; err != nil { + t.Errorf("ServerHandshake() = _, %v, want _, ", err) } } From c1038357b4d2a705f7d3061707606d236da24359 Mon Sep 17 00:00:00 2001 From: Matthew Stevenson Date: Wed, 10 Jan 2024 08:17:56 -0800 Subject: [PATCH 2/2] Update comment when we get a deadline exceeded error. --- credentials/alts/alts_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/credentials/alts/alts_test.go b/credentials/alts/alts_test.go index b9dc94783196..573e3b025a7c 100644 --- a/credentials/alts/alts_test.go +++ b/credentials/alts/alts_test.go @@ -400,7 +400,8 @@ func establishAltsConnection(t *testing.T, handshakerAddress, serverAddress stri break } if code := status.Code(err); code == codes.Unavailable || code == codes.DeadlineExceeded { - // The server is not ready yet. Try again. + // The server is not ready yet or there were too many concurrent handshakes. + // Try again. continue } t.Fatalf("c.UnaryCall() failed: %v", err)