diff --git a/internal/transport/transport.go b/internal/transport/transport.go index fdd6fa86cc15..924ba4f36533 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -616,7 +616,7 @@ func (t *transportReader) ReadHeader(header []byte) (int, error) { t.er = err return 0, err } - t.windowHandler(len(header)) + t.windowHandler(n) return n, nil } diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 4727c3c21814..79f57b58810f 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2845,3 +2845,34 @@ func (s) TestClientCloseReturnsEarlyWhenGoAwayWriteHangs(t *testing.T) { isGreetingDone.Store(true) ct.Close(errors.New("manually closed by client")) } + +// TestReadHeaderMultipleBuffers tests the stream when the gRPC headers are +// split across multiple buffers. It verifies that the reporting of the +// number of bytes read for flow control is correct. +func (s) TestReadHeaderMultipleBuffers(t *testing.T) { + headerLen := 5 + recvBuffer := newRecvBuffer() + recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, 3)}) + recvBuffer.put(recvMsg{buffer: make(mem.SliceBuffer, headerLen-3)}) + bytesRead := 0 + s := Stream{ + requestRead: func(int) {}, + trReader: &transportReader{ + reader: &recvBufferReader{ + recv: recvBuffer, + }, + windowHandler: func(i int) { + bytesRead += i + }, + }, + } + + header := make([]byte, headerLen) + err := s.ReadHeader(header) + if err != nil { + t.Fatalf("ReadHeader(%v) = %v", header, err) + } + if bytesRead != headerLen { + t.Errorf("bytesRead = %d, want = %d", bytesRead, headerLen) + } +}