Skip to content

Commit

Permalink
more work on the scaffolding
Browse files Browse the repository at this point in the history
  • Loading branch information
blind-oracle committed Apr 17, 2024
1 parent 89ea890 commit 363666d
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 69 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ futures-util = "0.3"
hickory-resolver = { version = "0.24", features = ["dns-over-https-rustls", "webpki-roots", "dnssec-ring"] }
http = "1.1"
humantime = "2.1"
hyper = "1.2"
hyper = "1.3"
hyper-util = "0.1"
jemallocator = "0.5"
jemalloc-ctl = "0.5"
Expand All @@ -37,7 +37,8 @@ regex = "1.10"
# TODO switch back when Reqwest upgrades to Rustls 0.23
# https://github.com/seanmonstar/reqwest/pull/2225
#reqwest = { version = "0.12", features = ["rustls-tls"] }
reqwest = { git = "https://github.com/blind-oracle/reqwest.git", features = [
reqwest = { git = "https://github.com/blind-oracle/reqwest.git", default_features = false, features = [
"http2",
"rustls-tls",
"deflate",
"gzip",
Expand Down
14 changes: 11 additions & 3 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,24 @@ pub struct Dns {
#[derive(Args)]
pub struct HttpServer {
/// Where to listen for HTTP
#[clap(long = "listen-http", default_value = "[::1]:8080")]
#[clap(long = "http-server-listen-plain", default_value = "[::1]:8080")]
pub http: SocketAddr,

/// Where to listen for HTTPS
#[clap(long = "listen-https", default_value = "[::1]:8443")]
#[clap(long = "http-server-listen-tls", default_value = "[::1]:8443")]
pub https: SocketAddr,

/// Backlog of incoming connections to set on the listening socket.
#[clap(long, default_value = "8192")]
#[clap(long = "http-server-backlog", default_value = "2048")]
pub backlog: u32,

/// Backlog of incoming connections to set on the listening socket.
#[clap(long = "http-server-http2-max-streams", default_value = "100")]
pub http2_max_streams: u32,

/// How long to wait for the existing connections to finish before shutting down
#[clap(long = "http-server-grace-period", default_value = "30s", value_parser = parse_duration)]
pub grace_period: Duration,
}

#[derive(Args)]
Expand Down
6 changes: 4 additions & 2 deletions src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use tracing::{error, warn};

use crate::{
cli::Cli,
http::{client::ReqwestClient, server::Server},
http::{ReqwestClient, Server},
tls,
};

Expand Down Expand Up @@ -39,6 +39,7 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
cli.http_server.http,
cli.http_server.backlog,
router.clone(),
cli.http_server.grace_period,
None,
)) as Arc<dyn Run>;
runners.push(("http_server".into(), http_server));
Expand All @@ -51,6 +52,7 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
cli.http_server.https,
cli.http_server.backlog,
router,
cli.http_server.grace_period,
Some(rustls_cfg),
)) as Arc<dyn Run>;
runners.push(("https_server".into(), https_server));
Expand All @@ -60,7 +62,7 @@ pub async fn main(cli: Cli) -> Result<(), Error> {
let token = token.child_token();
tracker.spawn(async move {
if let Err(e) = obj.run(token).await {
error!("Runner {name} exited with an error: {e}");
error!("Runner '{name}' exited with an error: {e}");
}
});
}
Expand Down
3 changes: 3 additions & 0 deletions src/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
pub mod client;
pub mod dns;
pub mod server;

pub use client::{Client, ReqwestClient};
pub use server::Server;
66 changes: 51 additions & 15 deletions src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder,
};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpSocket, TcpStream};
use tokio::select;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::{TcpListener, TcpSocket, TcpStream},
select,
};
use tokio_rustls::TlsAcceptor;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tower_service::Service;
Expand All @@ -23,10 +25,11 @@ use crate::core::Run;
trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Sync + Unpin {}
impl<T: AsyncRead + AsyncWrite + Send + Sync + Unpin> AsyncReadWrite for T {}

pub struct Conn {
struct Conn {
addr: SocketAddr,
remote_addr: SocketAddr,
router: Router,
token: CancellationToken,
tls_acceptor: Option<TlsAcceptor>,
}

Expand Down Expand Up @@ -57,11 +60,35 @@ impl Conn {
});

// Call the service
Builder::new(TokioExecutor::new())
.serve_connection(stream, service)
.await
// It shouldn't really fail since Axum routers are infallible
.map_err(|e| anyhow!("unable to call service: {e}"))?;
let mut builder = Builder::new(TokioExecutor::new());

// Some sensible defaults
// TODO make configurable?
builder
.http2()
.adaptive_window(true)
.max_concurrent_streams(Some(100))
.keep_alive_interval(Some(Duration::from_secs(20)))
.keep_alive_timeout(Duration::from_secs(10));

let conn = builder.serve_connection(stream, service);
pin_mut!(conn);

loop {
select! {
v = conn.as_mut() => {
if let Err(e) = v {
return Err(anyhow!("Unable to serve connection: {e}"));
}

break;
},

() = self.token.cancelled() => {
conn.as_mut().graceful_shutdown();
}
}
}

debug!(
"Server {}: {}: connection finished",
Expand All @@ -77,6 +104,7 @@ pub struct Server {
addr: SocketAddr,
backlog: u32,
router: Router,
grace_period: Duration,
tracker: TaskTracker,
tls_acceptor: Option<TlsAcceptor>,
}
Expand All @@ -86,12 +114,14 @@ impl Server {
addr: SocketAddr,
backlog: u32,
router: Router,
grace_period: Duration,
rustls_cfg: Option<rustls::ServerConfig>,
) -> Self {
Self {
addr,
backlog,
router,
grace_period,
tracker: TaskTracker::new(),
tls_acceptor: rustls_cfg.map(|x| TlsAcceptor::from(Arc::new(x))),
}
Expand All @@ -104,14 +134,20 @@ impl Run for Server {
let listener = listen_tcp_backlog(self.addr, self.backlog)?;
pin_mut!(listener);

warn!(
"Server {}: running (TLS: {})",
self.addr,
self.tls_acceptor.is_some()
);

loop {
select! {
() = token.cancelled() => {
warn!("Server {}: shutting down, waiting for the active connections to close for 30s", self.addr);
warn!("Server {}: shutting down, waiting for the active connections to close for {}s", self.addr, self.grace_period.as_secs());
self.tracker.close();
select! {
() = self.tracker.wait() => {},
() = tokio::time::sleep(Duration::from_secs(30)) => {},
() = tokio::time::sleep(self.grace_period) => {},
}
warn!("Server {}: shut down", self.addr);
return Ok(());
Expand All @@ -120,7 +156,7 @@ impl Run for Server {
// Try to accept the connection
v = listener.accept() => {
let (stream, remote_addr) = match v {
Ok((a, b)) => (a, b),
Ok(v) => v,
Err(e) => {
warn!("Unable to accept connection: {e}");
continue;
Expand All @@ -133,14 +169,14 @@ impl Run for Server {
addr: self.addr,
remote_addr,
router: self.router.clone(),
token: token.child_token(),
tls_acceptor: self.tls_acceptor.clone(),
};

// Spawn a task to handle connection
self.tracker.spawn(async move {
match conn.handle(stream).await {
Ok(()) => {},
Err(e) => warn!("Server {}: {}: failed to handle connection: {e}", conn.addr, remote_addr),
if let Err(e) = conn.handle(stream).await {
warn!("Server {}: {}: failed to handle connection: {e}", conn.addr, remote_addr);
}
});
}
Expand Down
Loading

0 comments on commit 363666d

Please # to comment.