-
Notifications
You must be signed in to change notification settings - Fork 235
refactor: use a separate queue for inbound disco packets from relays #3309
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
base: main
Are you sure you want to change the base?
Conversation
Documentation for this PR has been generated and is available at: https://n0-computer.github.io/iroh/pr/3309/docs/iroh/ Last updated: 2025-05-16T07:57:13Z |
iroh/src/magicsock/relay_actor.rs
Outdated
relay_remote_node_id: datagram.src, | ||
}; | ||
if let Err(err) = self.relay_disco_recv.try_send(message) { | ||
warn!("Dropping received relay disco packet: {err:#}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be tempted to push this relay_disco_recv
all the way into ActiveRelayActor
and then it can queue this message and stop reading from the relay channel instead of having to drop it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already in the ActiveRelayActor! So all it would take is making ActiveRelayActor::handle_relay_msg
an async
fn, and using send(message).await
here I think? It is already only called from async fns, and that should then add backpressure I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I.e., is this what you meant?
apply backpressure on relay disco recv
diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs
index 46b6e4d4c..a3479da42 100644
--- a/iroh/src/magicsock/relay_actor.rs
+++ b/iroh/src/magicsock/relay_actor.rs
@@ -619,5 +619,5 @@ impl ActiveRelayActor {
match msg {
Ok(msg) => {
- self.handle_relay_msg(msg, &mut state);
+ self.handle_relay_msg(msg, &mut state).await;
// reset the ping timer, we have just received a message
ping_interval.reset();
@@ -642,5 +642,5 @@ impl ActiveRelayActor {
}
- fn handle_relay_msg(&mut self, msg: ReceivedMessage, state: &mut ConnectedRelayState) {
+ async fn handle_relay_msg(&mut self, msg: ReceivedMessage, state: &mut ConnectedRelayState) {
match msg {
ReceivedMessage::ReceivedPacket {
@@ -677,5 +677,5 @@ impl ActiveRelayActor {
relay_remote_node_id: datagram.src,
};
- if let Err(err) = self.relay_disco_recv.try_send(message) {
+ if let Err(err) = self.relay_disco_recv.send(message).await {
warn!("Dropping received relay disco packet: {err:#}");
}
@@ -775,5 +775,5 @@ impl ActiveRelayActor {
};
match msg {
- Ok(msg) => self.handle_relay_msg(msg, state),
+ Ok(msg) => self.handle_relay_msg(msg, state).await,
Err(err) => break Err(anyhow!("Client stream read error: {err:#}")),
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pushed that change. Tests seem fine with it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, github was giving me the wrong context and I thought this was the RelayActor...
Conceptually the "queue the message" is the same as my "we can't do async stuf" comment above. If the self.relay_disco_recv.try_send
fails somehow the message needs to be stored on the actor and then the main loop should stop polling for further messages from the relay until it was finally able to send this. The RelayActor and ActiveRelayActor already go to some length to do this and it has worked very well. Exactly where the state should be stored and how to keep yet another case of this readable and maintainable is another question. E.g. in the RelayActor stores a datagram_send_fut
"sending future" (the MaybeFuture
) and if that's set then don't poll something. The ActiveRelayActor has various versions of the run loops. That may not work so well if it also has to do this.
iroh/src/magicsock/relay_actor.rs
Outdated
@@ -605,7 +618,7 @@ impl ActiveRelayActor { | |||
}; | |||
match msg { | |||
Ok(msg) => { | |||
self.handle_relay_msg(msg, &mut state); | |||
self.handle_relay_msg(msg, &mut state).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can not make this async. Actors can not do async stuff in their main loop. We violate this in plenty of places, but this actor has finally been cleaned up. We should not regress this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. But how would you apply backpressure then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed that commit again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. But how would you apply backpressure then?
For completeness answering that here as well (it is mentioned above): by keeping the blocked message around in the actor state somehow and not polling for new messages from the relay while you have this pending message.
Of course in this case it means you're backpressuring the entire relay TCP stream while you only really want to backpressure disco messages. But to really do something about this you'd have to make two tcp streams to the relay and... lots of things we don't want to do.
d288672
to
7c100fa
Compare
}, | ||
); | ||
} | ||
debug!("relay-disco-recv actor closed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: "actor closed" is sufficient since you already have an info span with the actor name. But I don't mind if you do this either.
@flub I pushed a commit that should apply backpressure onto the TCP stream to the relay, without ever |
106f7f3
to
bcbcd23
Compare
iroh/src/magicsock/relay_actor.rs
Outdated
@@ -159,6 +162,20 @@ struct ActiveRelayActor { | |||
/// Token indicating the [`ActiveRelayActor`] should stop. | |||
stop_token: CancellationToken, | |||
metrics: Arc<MagicsockMetrics>, | |||
/// Received relay packets that could not yet be forwarded to the magicsocket. | |||
pending_received: Option<PendingRecv>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this live on ConnectedRelayState
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this into a new struct together with the queue senders in the latest commit, but still kept it on the ActiveRelayActor
. My reasoning: If the connection to a relay breaks, but we still have pending received items queued, we can still try pushing those to the magicsock.
iroh/src/magicsock/relay_actor.rs
Outdated
// no further `await`s occur after here. | ||
// The unwrap is guaranteed to be safe because we checked above that it is not none. | ||
#[allow(clippy::unwrap_used, reason = "checked above")] | ||
let pending = maybe_pending.take().unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use .expect()
for cases like this, I thought .unwrap()
was denied by some linter even?
Oh yeah, that's complex. Though I agree that it looks about right. I also suspect your suggestion that this could be simplified a bit might be right. I'm still on the fence whether this change is worth it though. If we do this, the dropping becomes less of an issue in the receiving magicsock. We'd then be dropping these in the relay server, which hides the problem entirely from the users. To make this now work better we need to make the relay server behave more like a router. It can't backpressure this up to the sender, because the sender can be sending to multiple destinations from it's single TCP stream. What the relay server can do is start marking ECN codepoints on packets when queues it travels through start filling up. Quinn will then notice those and start to back off before there is full congestion. Given that DISCO is really relatively few packets, the fact they're not congestion controlled is then probably not an issue. So only if we start doing this will we start to get any benefit from this. Point is, this is a bigger project. And if we do it we need to write this up in a tracking issue and make an informed decision that we'll do this and get to the finish line. Technically I'd be great to have the relay server mark ECN points though. I'd love it and think it'll make a real impact. |
I pushed two commits:
|
bcbcd23
to
ba9c167
Compare
Description
We have reports that the queue of inbound packets from the relay becomes full (i.e. a node is not polling the endpoint fast enough to process all incoming packets from a relay). When this happens, we currently lose not only data packets but also disco packets. This is bad because it can break (and not only stall) connectivity.
This PR adds a separate queue for disco packets coming from relays by testing if a packet is a disco packet or not already in the ActiveRelayActor instead of within the magicsocket. Disco packets are put on a separate queue, and processed in the relay actor.
Breaking Changes
Notes & open questions
Change checklist
quic-rpc
iroh-gossip
iroh-blobs
dumbpipe
sendme