diff --git a/src/client.rs b/src/client.rs index 5dd0b0f87..4147e8a46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -326,6 +326,10 @@ pub struct Builder { /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, + /// Maximum number of remotely reset streams to allow in the pending + /// accept queue. + pending_accept_reset_stream_max: usize, + /// Initial `Settings` frame to send as part of the handshake. settings: Settings, @@ -634,6 +638,7 @@ impl Builder { max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, + pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, initial_target_connection_window_size: None, initial_max_send_streams: usize::MAX, settings: Default::default(), @@ -966,6 +971,49 @@ impl Builder { self } + /// Sets the maximum number of pending-accept remotely-reset streams. + /// + /// Streams that have been received by the peer, but not accepted by the + /// user, can also receive a RST_STREAM. This is a legitimate pattern: one + /// could send a request and then shortly after, realize it is not needed, + /// sending a CANCEL. + /// + /// However, since those streams are now "closed", they don't count towards + /// the max concurrent streams. So, they will sit in the accept queue, + /// using memory. + /// + /// When the number of remotely-reset streams sitting in the pending-accept + /// queue reaches this maximum value, a connection error with the code of + /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the + /// `Future`. + /// + /// The default value is currently 20, but could change. + /// + /// # Examples + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::client::*; + /// # use bytes::Bytes; + /// # + /// # async fn doc(my_io: T) + /// # -> Result<((SendRequest, Connection)), h2::Error> + /// # { + /// // `client_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let client_fut = Builder::new() + /// .max_pending_accept_reset_streams(100) + /// .handshake(my_io); + /// # client_fut.await + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { + self.pending_accept_reset_stream_max = max; + self + } + /// Sets the maximum send buffer size per stream. /// /// Once a stream has buffered up to (or over) the maximum, the stream's @@ -1209,6 +1257,7 @@ where max_send_buffer_size: builder.max_send_buffer_size, reset_stream_duration: builder.reset_stream_duration, reset_stream_max: builder.reset_stream_max, + remote_reset_stream_max: builder.pending_accept_reset_stream_max, settings: builder.settings.clone(), }, ); diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 1fec23102..619973df8 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -14,8 +14,6 @@ use std::task::{Context, Poll}; use std::time::Duration; use tokio::io::{AsyncRead, AsyncWrite}; -const DEFAULT_MAX_REMOTE_RESET_STREAMS: usize = 20; - /// An H2 connection #[derive(Debug)] pub(crate) struct Connection @@ -82,6 +80,7 @@ pub(crate) struct Config { pub max_send_buffer_size: usize, pub reset_stream_duration: Duration, pub reset_stream_max: usize, + pub remote_reset_stream_max: usize, pub settings: frame::Settings, } @@ -120,7 +119,7 @@ where .unwrap_or(false), local_reset_duration: config.reset_stream_duration, local_reset_max: config.reset_stream_max, - remote_reset_max: DEFAULT_MAX_REMOTE_RESET_STREAMS, + remote_reset_max: config.remote_reset_stream_max, remote_init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, remote_max_initiated: config .settings diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 5ec7bf992..d71ee9c42 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -31,6 +31,7 @@ pub type WindowSize = u32; // Constants pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; +pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20; pub const DEFAULT_RESET_STREAM_MAX: usize = 10; pub const DEFAULT_RESET_STREAM_SECS: u64 = 30; pub const DEFAULT_MAX_SEND_BUFFER_SIZE: usize = 1024 * 400; diff --git a/src/server.rs b/src/server.rs index 6f2455e0b..f1f4cf470 100644 --- a/src/server.rs +++ b/src/server.rs @@ -240,6 +240,10 @@ pub struct Builder { /// Maximum number of locally reset streams to keep at a time. reset_stream_max: usize, + /// Maximum number of remotely reset streams to allow in the pending + /// accept queue. + pending_accept_reset_stream_max: usize, + /// Initial `Settings` frame to send as part of the handshake. settings: Settings, @@ -642,6 +646,7 @@ impl Builder { Builder { reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS), reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, + pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, settings: Settings::default(), initial_target_connection_window_size: None, max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE, @@ -882,6 +887,49 @@ impl Builder { self } + /// Sets the maximum number of pending-accept remotely-reset streams. + /// + /// Streams that have been received by the peer, but not accepted by the + /// user, can also receive a RST_STREAM. This is a legitimate pattern: one + /// could send a request and then shortly after, realize it is not needed, + /// sending a CANCEL. + /// + /// However, since those streams are now "closed", they don't count towards + /// the max concurrent streams. So, they will sit in the accept queue, + /// using memory. + /// + /// When the number of remotely-reset streams sitting in the pending-accept + /// queue reaches this maximum value, a connection error with the code of + /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the + /// `Future`. + /// + /// The default value is currently 20, but could change. + /// + /// # Examples + /// + /// + /// ``` + /// # use tokio::io::{AsyncRead, AsyncWrite}; + /// # use h2::server::*; + /// # + /// # fn doc(my_io: T) + /// # -> Handshake + /// # { + /// // `server_fut` is a future representing the completion of the HTTP/2 + /// // handshake. + /// let server_fut = Builder::new() + /// .max_pending_accept_reset_streams(100) + /// .handshake(my_io); + /// # server_fut + /// # } + /// # + /// # pub fn main() {} + /// ``` + pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self { + self.pending_accept_reset_stream_max = max; + self + } + /// Sets the maximum send buffer size per stream. /// /// Once a stream has buffered up to (or over) the maximum, the stream's @@ -1312,6 +1360,7 @@ where max_send_buffer_size: self.builder.max_send_buffer_size, reset_stream_duration: self.builder.reset_stream_duration, reset_stream_max: self.builder.reset_stream_max, + remote_reset_stream_max: self.builder.pending_accept_reset_stream_max, settings: self.builder.settings.clone(), }, ); diff --git a/tests/h2-tests/tests/stream_states.rs b/tests/h2-tests/tests/stream_states.rs index 610d3a530..5d86f7b47 100644 --- a/tests/h2-tests/tests/stream_states.rs +++ b/tests/h2-tests/tests/stream_states.rs @@ -200,6 +200,7 @@ async fn reset_streams_dont_grow_memory_continuously() { let (io, mut client) = mock::new(); const N: u32 = 50; + const MAX: usize = 20; let client = async move { let settings = client.assert_server_handshake().await; @@ -212,7 +213,7 @@ async fn reset_streams_dont_grow_memory_continuously() { } tokio::time::timeout( std::time::Duration::from_secs(1), - client.recv_frame(frames::go_away(41).calm()), + client.recv_frame(frames::go_away((MAX * 2 + 1) as u32).calm()), ) .await .expect("client goaway"); @@ -220,6 +221,7 @@ async fn reset_streams_dont_grow_memory_continuously() { let srv = async move { let mut srv = server::Builder::new() + .max_pending_accept_reset_streams(MAX) .handshake::<_, Bytes>(io) .await .expect("handshake");