From 4b6972d195b3aaba3421764c5612dffe4952e9e8 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 18 Apr 2024 09:19:49 -0700 Subject: [PATCH 1/2] transport: misc cleanups --- internal/transport/controlbuf.go | 4 ++-- internal/transport/http2_client.go | 22 ++++++++++------------ internal/transport/http2_server.go | 7 ++++--- internal/transport/transport_test.go | 4 ++-- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/internal/transport/controlbuf.go b/internal/transport/controlbuf.go index ce8fb90655e8..3deadfb4a20c 100644 --- a/internal/transport/controlbuf.go +++ b/internal/transport/controlbuf.go @@ -336,7 +336,7 @@ func (c *controlBuffer) put(it cbItem) error { return err } -func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, error) { +func (c *controlBuffer) executeAndPut(f func() bool, it cbItem) (bool, error) { var wakeUp bool c.mu.Lock() if c.err != nil { @@ -344,7 +344,7 @@ func (c *controlBuffer) executeAndPut(f func(it any) bool, it cbItem) (bool, err return false, c.err } if f != nil { - if !f(it) { // f wasn't successful + if !f() { // f wasn't successful c.mu.Unlock() return false, nil } diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index fe621f991f79..ef461b68f835 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -796,7 +796,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, firstTry := true var ch chan struct{} transportDrainRequired := false - checkForStreamQuota := func(it any) bool { + checkForStreamQuota := func() bool { if t.streamQuota <= 0 { // Can go negative if server decreases it. if firstTry { t.waitingStreams++ @@ -808,15 +808,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, t.waitingStreams-- } t.streamQuota-- - h := it.(*headerFrame) - h.streamID = t.nextID + hdr.streamID = t.nextID t.nextID += 2 // Drain client transport if nextID > MaxStreamID which signals gRPC that // the connection is closed and a new one must be created for subsequent RPCs. transportDrainRequired = t.nextID > MaxStreamID - s.id = h.streamID + s.id = hdr.streamID s.fc = &inFlow{limit: uint32(t.initialWindowSize)} t.mu.Lock() if t.state == draining || t.activeStreams == nil { // Can be niled from Close(). @@ -834,13 +833,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, return true } var hdrListSizeErr error - checkForHeaderListSize := func(it any) bool { + checkForHeaderListSize := func() bool { if t.maxSendHeaderListSize == nil { return true } - hdrFrame := it.(*headerFrame) var sz int64 - for _, f := range hdrFrame.hf { + for _, f := range hdr.hf { if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) return false @@ -849,8 +847,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, return true } for { - success, err := t.controlBuf.executeAndPut(func(it any) bool { - return checkForHeaderListSize(it) && checkForStreamQuota(it) + success, err := t.controlBuf.executeAndPut(func() bool { + return checkForHeaderListSize() && checkForStreamQuota() }, hdr) if err != nil { // Connection closed. @@ -961,7 +959,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. rst: rst, rstCode: rstCode, } - addBackStreamQuota := func(any) bool { + addBackStreamQuota := func() bool { t.streamQuota++ if t.streamQuota > 0 && t.waitingStreams > 0 { select { @@ -1117,7 +1115,7 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) { // for the transport and the stream based on the current bdp // estimation. func (t *http2Client) updateFlowControl(n uint32) { - updateIWS := func(any) bool { + updateIWS := func() bool { t.initialWindowSize = int32(n) t.mu.Lock() for _, s := range t.activeStreams { @@ -1270,7 +1268,7 @@ func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) { } updateFuncs = append(updateFuncs, updateStreamQuota) } - t.controlBuf.executeAndPut(func(any) bool { + t.controlBuf.executeAndPut(func() bool { for _, f := range updateFuncs { f() } diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index 8a554ce408bf..cab0e2d3d447 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -859,7 +859,7 @@ func (t *http2Server) handleSettings(f *http2.SettingsFrame) { } return nil }) - t.controlBuf.executeAndPut(func(any) bool { + t.controlBuf.executeAndPut(func() bool { for _, f := range updateFuncs { f() } @@ -1013,12 +1013,13 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error { headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) } headerFields = appendHeaderFieldsFromMD(headerFields, s.header) - success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{ + hf := &headerFrame{ streamID: s.id, hf: headerFields, endStream: false, onWrite: t.setResetPingStrikes, - }) + } + success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf) if !success { if err != nil { return err diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 6c1ee6b6283c..c0871a6a9468 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2697,11 +2697,11 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { t.Errorf("Expected settings frame, got %v", fr) } fr, _ = sfr.ReadFrame() - if fr, ok := fr.(*http2.SettingsFrame); !ok && fr.IsAck() { + if fr, ok := fr.(*http2.SettingsFrame); !ok || !fr.IsAck() { t.Errorf("Expected settings ACK frame, got %v", fr) } fr, _ = sfr.ReadFrame() - if fr, ok := fr.(*http2.HeadersFrame); !ok && fr.Flags.Has(http2.FlagHeadersEndStream) { + if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndStream) { t.Errorf("Expected Headers frame with END_HEADERS frame, got %v", fr) } close(greetDone) From 1532c7b49d32824e6c002cafff1876df5d53162f Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Thu, 18 Apr 2024 09:27:04 -0700 Subject: [PATCH 2/2] typo --- internal/transport/transport_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index c0871a6a9468..bb305be2220f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2701,7 +2701,7 @@ func (s) TestClientSendsAGoAwayFrame(t *testing.T) { t.Errorf("Expected settings ACK frame, got %v", fr) } fr, _ = sfr.ReadFrame() - if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndStream) { + if fr, ok := fr.(*http2.HeadersFrame); !ok || !fr.Flags.Has(http2.FlagHeadersEndHeaders) { t.Errorf("Expected Headers frame with END_HEADERS frame, got %v", fr) } close(greetDone)