Skip to content

Commit

Permalink
register rx & tx notify directly after signals
Browse files Browse the repository at this point in the history
  • Loading branch information
hargut committed Feb 4, 2025
1 parent 5501789 commit 537792b
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pingora-core/src/connectors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ pub(crate) mod quic_tests {
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use http::{Response, StatusCode};
use log::{error, info};
use log::info;
use pingora_error::Result;
use pingora_timeout::timeout;
use std::thread;
Expand Down
23 changes: 14 additions & 9 deletions pingora-core/src/protocols/http/v3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ pub(crate) struct Http3Poll {

impl Http3Poll {
pub(crate) async fn start(mut self) -> Result<()> {
let mut notified = self.conn_io.rx_notify.notified();

'poll: loop {
let poll = {
let mut qconn = self.conn_io.quic.lock();
Expand All @@ -471,13 +473,22 @@ impl Http3Poll {
let (stream_id, ev) = match poll {
Ok((stream, ev)) => (stream, ev),
Err(e) => {
housekeeping_drop_sessions(
&self.conn_id().clone(),
&mut self.sessions,
&self.drop_sessions,
);

let conn_alive = self
.conn_io
.error_or_timeout_data_race(e, &self.sessions)
.error_or_timeout_data_race(e, notified, &self.sessions)
.await?;
match conn_alive {
true => continue 'poll,
false => {
Some(updated_notified) => {
notified = updated_notified;
continue 'poll;
}
None => {
self.idle_close.send_replace(true);
break 'poll Ok(());
}
Expand Down Expand Up @@ -507,12 +518,6 @@ impl Http3Poll {
.send(ev)
.await
.explain_err(H3Error, |_| "failed to forward h3 event to session")?;

housekeeping_drop_sessions(
&self.conn_id().clone(),
&mut self.sessions,
&self.drop_sessions,
);
}
}

Expand Down
27 changes: 17 additions & 10 deletions pingora-core/src/protocols/http/v3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::futures::Notified;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::Notify;

Expand Down Expand Up @@ -276,11 +277,9 @@ impl ConnectionIo {
async fn error_or_timeout_data_race(
&self,
error: h3::Error,
data_race: Notified<'_>,
sessions: &StreamIdHashMap<Sender<Event>>,
) -> Result<bool> {
// register before housekeeping to avoid notify misses in high-load scenarios
let data_future = self.rx_notify.notified();

) -> Result<Option<Notified>> {
match error {
h3::Error::Done => {
debug!("H3 connection {:?} no events available", self.conn_id());
Expand Down Expand Up @@ -310,18 +309,22 @@ impl ConnectionIo {
qconn.local_error(),
qconn.peer_error(),
) {
Ok(()) => Ok(false), // signal connection close
Ok(()) => Ok(None), // signal connection close
Err(e) => Err(e),
};
}
timeout = qconn.timeout();
}

// race for new data on connection or timeout
tokio::select! { /* biased, poll data first */
let notified = tokio::select! { /* biased, poll data first */
// to avoid timeout race wins in high load scenarios when data could be available
biased;
_data = data_future => { /* continue */ }
_data = data_race => {
/* continue */
// register notify right away to cover all notify signals
self.rx_notify.notified()
}
_timedout = async {
if let Some(timeout) = timeout {
debug!("connection {:?} timeout {:?}", self.conn_id(), timeout);
Expand All @@ -331,6 +334,9 @@ impl ConnectionIo {
tokio::time::sleep(Duration::MAX).await
}
} => {
// register notify right away to cover all notify signals
let notified = self.rx_notify.notified();

if !sessions.is_empty() {
debug!("connection {:?} timed out with {} open sessions",
self.conn_id(), sessions.len());
Expand All @@ -342,10 +348,11 @@ impl ConnectionIo {

if let Some(timeout) = timeout {
trace!("connection {:?} timed out {:?}", self.conn_id(), timeout);
}
};
notified
}
}
Ok(true) // signal continue
};
Ok(Some(notified)) // signal continue
}
_ => {
// if an error occurs while processing data, the connection is closed with
Expand Down
23 changes: 14 additions & 9 deletions pingora-core/src/protocols/http/v3/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ impl Http3Session {
conn: &mut Http3Connection,
digest: Arc<Digest>,
) -> Result<Option<Self>> {
let mut notified = conn.conn_io.rx_notify.notified();

'poll: loop {
let poll = {
let mut qconn = conn.conn_io.quic.lock();
Expand Down Expand Up @@ -352,22 +354,25 @@ impl Http3Session {
}
}
Err(e) => {
housekeeping_drop_sessions(
&conn.conn_id().clone(),
&mut conn.sessions,
&conn.drop_sessions,
);

let conn_active = conn
.conn_io
.error_or_timeout_data_race(e, &conn.sessions)
.error_or_timeout_data_race(e, notified, &conn.sessions)
.await?;
match conn_active {
true => continue 'poll,
false => return Ok(None),
Some(updated_notified) => {
notified = updated_notified;
continue 'poll;
}
None => return Ok(None),
}
}
}

housekeeping_drop_sessions(
&conn.conn_id().clone(),
&mut conn.sessions,
&conn.drop_sessions,
);
}
}

Expand Down
17 changes: 10 additions & 7 deletions pingora-core/src/protocols/l4/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,11 @@ impl ConnectionTx {
let id = self.connection_id;
let mut out = [0u8; MAX_IPV6_BUF_SIZE];

let mut finished_sending = None;
let mut notified = self.tx_notify.notified();
debug!("connection {:?} tx write", id);
'write: loop {
let mut continue_write = false;
let mut finished_sending = false;

// update tx stats & get current details
let (max_dgram_size, max_send_burst) = self.tx_stats.max_send_burst(&self.connection);
Expand Down Expand Up @@ -178,8 +179,7 @@ impl ConnectionTx {
Err(e) => {
if e == quiche::Error::Done {
trace!("connection {:?} send finished", id);
// register notify before socket send to avoid misses under high load
finished_sending = Some(self.tx_notify.notified());
finished_sending = true;
break 'fill;
}
error!("connection {:?} send error: {:?}", id, e);
Expand All @@ -206,7 +206,9 @@ impl ConnectionTx {

if total_write == 0 || dst_info.is_none() {
trace!("connection {:?} nothing to send", id);
self.tx_notify.notified().await;
notified.await;
// register notify right away to cover all notify signals
notified = self.tx_notify.notified();
continue 'write;
}
let dst_info = dst_info.unwrap();
Expand Down Expand Up @@ -242,10 +244,11 @@ impl ConnectionTx {
continue 'write;
}

if let Some(tx_notified) = finished_sending {
if finished_sending {
trace!("connection {:?} finished sending", id);
tx_notified.await;
finished_sending = None;
notified.await;
// register notify right away to cover all notify signals
notified = self.tx_notify.notified();
continue 'write;
}
}
Expand Down

0 comments on commit 537792b

Please # to comment.