From 537792b95477b467b3ffd02500430fbb455992e5 Mon Sep 17 00:00:00 2001 From: Harald Gutmann Date: Tue, 4 Feb 2025 13:16:22 +0100 Subject: [PATCH] register rx & tx notify directly after signals --- pingora-core/src/connectors/mod.rs | 2 +- pingora-core/src/protocols/http/v3/client.rs | 23 ++++++++++------- pingora-core/src/protocols/http/v3/mod.rs | 27 ++++++++++++-------- pingora-core/src/protocols/http/v3/server.rs | 23 ++++++++++------- pingora-core/src/protocols/l4/quic/mod.rs | 17 +++++++----- 5 files changed, 56 insertions(+), 36 deletions(-) diff --git a/pingora-core/src/connectors/mod.rs b/pingora-core/src/connectors/mod.rs index 364ed4744..5db875d48 100644 --- a/pingora-core/src/connectors/mod.rs +++ b/pingora-core/src/connectors/mod.rs @@ -628,7 +628,7 @@ pub(crate) mod quic_tests { use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use http::{Response, StatusCode}; - use log::{error, info}; + use log::info; use pingora_error::Result; use pingora_timeout::timeout; use std::thread; diff --git a/pingora-core/src/protocols/http/v3/client.rs b/pingora-core/src/protocols/http/v3/client.rs index 34a5bddbf..034b32e38 100644 --- a/pingora-core/src/protocols/http/v3/client.rs +++ b/pingora-core/src/protocols/http/v3/client.rs @@ -460,6 +460,8 @@ pub(crate) struct Http3Poll { impl Http3Poll { pub(crate) async fn start(mut self) -> Result<()> { + let mut notified = self.conn_io.rx_notify.notified(); + 'poll: loop { let poll = { let mut qconn = self.conn_io.quic.lock(); @@ -471,13 +473,22 @@ impl Http3Poll { let (stream_id, ev) = match poll { Ok((stream, ev)) => (stream, ev), Err(e) => { + housekeeping_drop_sessions( + &self.conn_id().clone(), + &mut self.sessions, + &self.drop_sessions, + ); + let conn_alive = self .conn_io - .error_or_timeout_data_race(e, &self.sessions) + .error_or_timeout_data_race(e, notified, &self.sessions) .await?; match conn_alive { - true => continue 'poll, - false => { + Some(updated_notified) => { + notified = updated_notified; + continue 'poll; + } + None => { self.idle_close.send_replace(true); break 'poll Ok(()); } @@ -507,12 +518,6 @@ impl Http3Poll { .send(ev) .await .explain_err(H3Error, |_| "failed to forward h3 event to session")?; - - housekeeping_drop_sessions( - &self.conn_id().clone(), - &mut self.sessions, - &self.drop_sessions, - ); } } diff --git a/pingora-core/src/protocols/http/v3/mod.rs b/pingora-core/src/protocols/http/v3/mod.rs index 6f5cd30ec..704b7dcce 100644 --- a/pingora-core/src/protocols/http/v3/mod.rs +++ b/pingora-core/src/protocols/http/v3/mod.rs @@ -35,6 +35,7 @@ use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; +use tokio::sync::futures::Notified; use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::Notify; @@ -276,11 +277,9 @@ impl ConnectionIo { async fn error_or_timeout_data_race( &self, error: h3::Error, + data_race: Notified<'_>, sessions: &StreamIdHashMap>, - ) -> Result { - // register before housekeeping to avoid notify misses in high-load scenarios - let data_future = self.rx_notify.notified(); - + ) -> Result> { match error { h3::Error::Done => { debug!("H3 connection {:?} no events available", self.conn_id()); @@ -310,7 +309,7 @@ impl ConnectionIo { qconn.local_error(), qconn.peer_error(), ) { - Ok(()) => Ok(false), // signal connection close + Ok(()) => Ok(None), // signal connection close Err(e) => Err(e), }; } @@ -318,10 +317,14 @@ impl ConnectionIo { } // race for new data on connection or timeout - tokio::select! { /* biased, poll data first */ + let notified = tokio::select! { /* biased, poll data first */ // to avoid timeout race wins in high load scenarios when data could be available biased; - _data = data_future => { /* continue */ } + _data = data_race => { + /* continue */ + // register notify right away to cover all notify signals + self.rx_notify.notified() + } _timedout = async { if let Some(timeout) = timeout { debug!("connection {:?} timeout {:?}", self.conn_id(), timeout); @@ -331,6 +334,9 @@ impl ConnectionIo { tokio::time::sleep(Duration::MAX).await } } => { + // register notify right away to cover all notify signals + let notified = self.rx_notify.notified(); + if !sessions.is_empty() { debug!("connection {:?} timed out with {} open sessions", self.conn_id(), sessions.len()); @@ -342,10 +348,11 @@ impl ConnectionIo { if let Some(timeout) = timeout { trace!("connection {:?} timed out {:?}", self.conn_id(), timeout); - } + }; + notified } - } - Ok(true) // signal continue + }; + Ok(Some(notified)) // signal continue } _ => { // if an error occurs while processing data, the connection is closed with diff --git a/pingora-core/src/protocols/http/v3/server.rs b/pingora-core/src/protocols/http/v3/server.rs index b064f440b..6247d37a3 100644 --- a/pingora-core/src/protocols/http/v3/server.rs +++ b/pingora-core/src/protocols/http/v3/server.rs @@ -248,6 +248,8 @@ impl Http3Session { conn: &mut Http3Connection, digest: Arc, ) -> Result> { + let mut notified = conn.conn_io.rx_notify.notified(); + 'poll: loop { let poll = { let mut qconn = conn.conn_io.quic.lock(); @@ -352,22 +354,25 @@ impl Http3Session { } } Err(e) => { + housekeeping_drop_sessions( + &conn.conn_id().clone(), + &mut conn.sessions, + &conn.drop_sessions, + ); + let conn_active = conn .conn_io - .error_or_timeout_data_race(e, &conn.sessions) + .error_or_timeout_data_race(e, notified, &conn.sessions) .await?; match conn_active { - true => continue 'poll, - false => return Ok(None), + Some(updated_notified) => { + notified = updated_notified; + continue 'poll; + } + None => return Ok(None), } } } - - housekeeping_drop_sessions( - &conn.conn_id().clone(), - &mut conn.sessions, - &conn.drop_sessions, - ); } } diff --git a/pingora-core/src/protocols/l4/quic/mod.rs b/pingora-core/src/protocols/l4/quic/mod.rs index dfbd64daa..5fae1878b 100644 --- a/pingora-core/src/protocols/l4/quic/mod.rs +++ b/pingora-core/src/protocols/l4/quic/mod.rs @@ -141,10 +141,11 @@ impl ConnectionTx { let id = self.connection_id; let mut out = [0u8; MAX_IPV6_BUF_SIZE]; - let mut finished_sending = None; + let mut notified = self.tx_notify.notified(); debug!("connection {:?} tx write", id); 'write: loop { let mut continue_write = false; + let mut finished_sending = false; // update tx stats & get current details let (max_dgram_size, max_send_burst) = self.tx_stats.max_send_burst(&self.connection); @@ -178,8 +179,7 @@ impl ConnectionTx { Err(e) => { if e == quiche::Error::Done { trace!("connection {:?} send finished", id); - // register notify before socket send to avoid misses under high load - finished_sending = Some(self.tx_notify.notified()); + finished_sending = true; break 'fill; } error!("connection {:?} send error: {:?}", id, e); @@ -206,7 +206,9 @@ impl ConnectionTx { if total_write == 0 || dst_info.is_none() { trace!("connection {:?} nothing to send", id); - self.tx_notify.notified().await; + notified.await; + // register notify right away to cover all notify signals + notified = self.tx_notify.notified(); continue 'write; } let dst_info = dst_info.unwrap(); @@ -242,10 +244,11 @@ impl ConnectionTx { continue 'write; } - if let Some(tx_notified) = finished_sending { + if finished_sending { trace!("connection {:?} finished sending", id); - tx_notified.await; - finished_sending = None; + notified.await; + // register notify right away to cover all notify signals + notified = self.tx_notify.notified(); continue 'write; } }