From 43a18f52468fa52e5d3be7c4b93becd63ad0c6a9 Mon Sep 17 00:00:00 2001 From: Mike Jensen Date: Fri, 27 Oct 2023 09:26:07 -0600 Subject: [PATCH] Limit GRPC Active streams (#33936) Originally there was a default limit of 100 max concurrent streams, however in 2017 the GRPC team removed this default: https://github.com/grpc/grpc-go/pull/1624 With the recent HTTP/2 Rapid Reset DoS, it is now being encouraged to re-introduce a limit. The fix requires this value to be configured in fact: https://github.com/grpc/grpc-go/pull/6703 --- lib/auth/grpcserver.go | 2 ++ lib/defaults/defaults.go | 4 ++++ lib/observability/tracing/collector.go | 4 +++- lib/proxy/peer/server.go | 2 ++ lib/service/service.go | 3 +++ lib/teleterm/apiserver/apiserver.go | 5 ++++- 6 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/auth/grpcserver.go b/lib/auth/grpcserver.go index 99a31296a7f64..9973b39db57fd 100644 --- a/lib/auth/grpcserver.go +++ b/lib/auth/grpcserver.go @@ -58,6 +58,7 @@ import ( wanlib "github.com/gravitational/teleport/lib/auth/webauthn" "github.com/gravitational/teleport/lib/authz" "github.com/gravitational/teleport/lib/backend" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/events" "github.com/gravitational/teleport/lib/httplib" "github.com/gravitational/teleport/lib/joinserver" @@ -5128,6 +5129,7 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) { PermitWithoutStream: true, }, ), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), } server := grpc.NewServer(opts...) authServer := &GRPCServer{ diff --git a/lib/defaults/defaults.go b/lib/defaults/defaults.go index ccadb64c3e969..248776773641e 100644 --- a/lib/defaults/defaults.go +++ b/lib/defaults/defaults.go @@ -100,6 +100,10 @@ const ( // By default all users use /bin/bash DefaultShell = "/bin/bash" + // GRPCMaxConcurrentStreams is the max GRPC streams that can be active at a time. Once the limit is reached new + // RPC calls will queue until capacity is available. + GRPCMaxConcurrentStreams = 1000 + // HTTPMaxIdleConns is the max idle connections across all hosts. HTTPMaxIdleConns = 2000 diff --git a/lib/observability/tracing/collector.go b/lib/observability/tracing/collector.go index d30968fe25d75..ea36ee69ced93 100644 --- a/lib/observability/tracing/collector.go +++ b/lib/observability/tracing/collector.go @@ -31,6 +31,8 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" + + "github.com/gravitational/teleport/lib/defaults" ) // Collector is a simple in memory implementation of an OpenTelemetry Collector @@ -75,7 +77,7 @@ func NewCollector(cfg CollectorConfig) (*Collector, error) { c := &Collector{ grpcLn: grpcLn, httpLn: httpLn, - grpcServer: grpc.NewServer(grpc.Creds(creds)), + grpcServer: grpc.NewServer(grpc.Creds(creds), grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams)), tlsConfing: tlsConfig, exportedC: make(chan struct{}, 1), } diff --git a/lib/proxy/peer/server.go b/lib/proxy/peer/server.go index 0952e18a341c9..b71a7636a207d 100644 --- a/lib/proxy/peer/server.go +++ b/lib/proxy/peer/server.go @@ -31,6 +31,7 @@ import ( "github.com/gravitational/teleport/api/metadata" "github.com/gravitational/teleport/api/utils/grpc/interceptors" "github.com/gravitational/teleport/lib/auth" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/utils" ) @@ -141,6 +142,7 @@ func NewServer(config ServerConfig) (*Server, error) { MinTime: peerKeepAlive, PermitWithoutStream: true, }), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) proto.RegisterProxyServiceServer(server, config.service) diff --git a/lib/service/service.go b/lib/service/service.go index a65b6fa604f80..739d099177446 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -3878,6 +3878,7 @@ func (process *TeleportProcess) initProxyEndpoint(conn *Connector) error { otelgrpc.StreamServerInterceptor(), ), grpc.Creds(credentials.NewTLS(serverTLSConfig)), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) process.RegisterCriticalFunc("proxy.ssh", func() error { @@ -5394,6 +5395,7 @@ func (process *TeleportProcess) initPublicGRPCServer( // available for some time. MaxConnectionIdle: 10 * time.Second, }), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) joinServiceServer := joinserver.NewJoinServiceGRPCServer(conn.Client) proto.RegisterJoinServiceServer(server, joinServiceServer) @@ -5443,6 +5445,7 @@ func (process *TeleportProcess) initSecureGRPCServer(cfg initSecureGRPCServerCfg grpc.Creds(credentials.NewTLS( copyAndConfigureTLS(serverTLSConfig, process.log, cfg.accessPoint, clusterName), )), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), ) kubeServer, err := kubegrpc.New(kubegrpc.Config{ diff --git a/lib/teleterm/apiserver/apiserver.go b/lib/teleterm/apiserver/apiserver.go index 10625b609e951..897105f2ae057 100644 --- a/lib/teleterm/apiserver/apiserver.go +++ b/lib/teleterm/apiserver/apiserver.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc" api "github.com/gravitational/teleport/gen/proto/go/teleport/lib/teleterm/v1" + "github.com/gravitational/teleport/lib/defaults" "github.com/gravitational/teleport/lib/teleterm/apiserver/handler" "github.com/gravitational/teleport/lib/utils" ) @@ -41,7 +42,9 @@ func New(cfg Config) (*APIServer, error) { } grpcServer := grpc.NewServer(cfg.TshdServerCreds, - grpc.ChainUnaryInterceptor(withErrorHandling(cfg.Log))) + grpc.ChainUnaryInterceptor(withErrorHandling(cfg.Log)), + grpc.MaxConcurrentStreams(defaults.GRPCMaxConcurrentStreams), + ) // Create Terminal service.