Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat(transport): Add server graceful shutdown #169

Merged
merged 8 commits into from
Dec 11, 2019
69 changes: 56 additions & 13 deletions tonic/src/transport/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,14 +222,7 @@ impl Server {
Router::new(self.clone(), svc)
}

pub(crate) async fn serve<S>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send,
{
let interceptor = self.interceptor.clone();
let concurrency_limit = self.concurrency_limit;
fn serve_common<I>(self, addr: SocketAddr) -> hyper::server::Builder<I> {
let init_connection_window_size = self.init_connection_window_size;
let init_stream_window_size = self.init_stream_window_size;
let max_concurrent_streams = self.max_concurrent_streams;
Expand Down Expand Up @@ -261,20 +254,57 @@ impl Server {
}
},
);
hyper::Server::builder(incoming)
.http2_only(true)
.http2_initial_connection_window_size(init_connection_window_size)
.http2_initial_stream_window_size(init_stream_window_size)
.http2_max_concurrent_streams(max_concurrent_streams)
}

pub(crate) async fn serve<S>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send,
{
let interceptor = self.interceptor.clone();
let concurrency_limit = self.concurrency_limit;

let svc = MakeSvc {
inner: svc,
interceptor,
concurrency_limit,
// timeout,
};
self.serve_common(addr).serve(svc).await.map_err(map_err)?;

hyper::Server::builder(incoming)
.http2_only(true)
.http2_initial_connection_window_size(init_connection_window_size)
.http2_initial_stream_window_size(init_stream_window_size)
.http2_max_concurrent_streams(max_concurrent_streams)
Ok(())
}

pub(crate) async fn serve_with_shutdown<S, F>(
self,
addr: SocketAddr,
svc: S,
signal: F,
) -> Result<(), super::Error>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<crate::Error> + Send,
F: Future<Output = ()>,
{
let interceptor = self.interceptor.clone();
let concurrency_limit = self.concurrency_limit;

let svc = MakeSvc {
inner: svc,
interceptor,
concurrency_limit,
// timeout,
};
self.serve_common(addr)
.serve(svc)
.with_graceful_shutdown(signal)
.await
.map_err(map_err)?;

Expand Down Expand Up @@ -348,6 +378,19 @@ where
pub async fn serve(self, addr: SocketAddr) -> Result<(), super::Error> {
self.server.serve(addr, self.routes).await
}

/// Consume this [`Server`] creating a future that will execute the server
/// on [`tokio`]'s default executor. And shutdown when the provided signal
/// is received.
///
/// [`Server`]: struct.Server.html
pub async fn serve_with_shutdown<F: Future<Output = ()>>(
self,
addr: SocketAddr,
f: F,
) -> Result<(), super::Error> {
self.server.serve_with_shutdown(addr, self.routes, f).await
}
}

fn map_err(e: impl Into<crate::Error>) -> super::Error {
Expand Down