Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Remove self-imposed limit on max concurrent streams if the server doesn't impose any. #1624

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 0 additions & 46 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3675,52 +3675,6 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
}
}

const defaultMaxStreamsClient = 100

func TestExceedDefaultMaxStreamsLimit(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
if e.name == "handler-tls" {
// The default max stream limit in handler_server is not 100?
continue
}
testExceedDefaultMaxStreamsLimit(t, e)
}
}

func testExceedDefaultMaxStreamsLimit(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise(
"http2Client.notifyError got notified that the client transport was broken",
"Conn.resetTransport failed to create client transport",
"grpc: the connection is closing",
)
// When masStream is set to 0 the server doesn't send a settings frame for
// MaxConcurrentStreams, essentially allowing infinite (math.MaxInt32) streams.
// In such a case, there should be a default cap on the client-side.
te.maxStream = 0
te.startServer(&testServer{security: e.security})
defer te.tearDown()

cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

// Create as many streams as a client can.
for i := 0; i < defaultMaxStreamsClient; i++ {
if _, err := tc.StreamingInputCall(te.ctx); err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
}
}

// Trying to create one more should timeout.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
_, err := tc.StreamingInputCall(ctx)
if err == nil || grpc.Code(err) != codes.DeadlineExceeded {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
}
}

func TestStreamsQuotaRecovery(t *testing.T) {
defer leakcheck.Check(t)
for _, e := range listTestEnv() {
Expand Down
19 changes: 16 additions & 3 deletions transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,15 +904,28 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
s.write(recvMsg{err: io.EOF})
}

func (t *http2Client) handleSettings(f *http2.SettingsFrame) {
func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
if f.IsAck() {
return
}
var ss []http2.Setting
isMaxConcurrentStreamsMissing := true
f.ForeachSetting(func(s http2.Setting) error {
if s.ID == http2.SettingMaxConcurrentStreams {
isMaxConcurrentStreamsMissing = false
}
ss = append(ss, s)
return nil
})
if isFirst && isMaxConcurrentStreamsMissing {
// This means server is imposing no limits on
// maximum number of concurrent streams initiated by client.
// So we must remove our self-imposed limit.
ss = append(ss, http2.Setting{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fishy to me. But not so much this as our settings handling in general.

It seems the way it works right now, we aren't applying any settings until right as we send the ack. This will result in a race if I'm understanding it correctly:

  1. Server and client are in steady state with maximum streams = 5
  2. Client receives a settings frame with a limit of 1. It appends the ack to the control buffer in response.
  3. Client initiates 5 new streams and pushes their headers onto the control buffer, which it thinks is fine because the settings are not applied yet.
  4. Client applies settings and sends settings ack, promising not to initiate more than one stream.
  5. Client sends headers for 5 new streams.

I think we need to make sure settings are applied before queuing the ack message if the settings are more restrictive than they were before. On the other hand, we should queue the ack message and then apply the settings if they are more permissive. And...since it's possible for one setting to become more permissive while another becomes less permissive, we probably need to hold a lock, apply the settings, queue the ack, and then release the lock.

ID: http2.SettingMaxConcurrentStreams,
Val: math.MaxUint32,
})
}
// The settings will be applied once the ack is sent.
t.controlBuf.put(&settings{ack: true, ss: ss})
}
Expand Down Expand Up @@ -1111,7 +1124,7 @@ func (t *http2Client) reader() {
t.Close()
return
}
t.handleSettings(sf)
t.handleSettings(sf, true)

// loop to keep reading incoming messages on this transport.
for {
Expand Down Expand Up @@ -1144,7 +1157,7 @@ func (t *http2Client) reader() {
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
t.handleSettings(frame, false)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.GoAwayFrame:
Expand Down