Skip to content

Commit

Permalink
A0-4817: Improve memory consumption in clique network (Cardinal-Crypt…
Browse files Browse the repository at this point in the history
…ography#1953)

# Description

Attempts at fixing the memory consumption in clique network. Current
approach:
Limit number of messages to send in clique network

## Type of change

- Bug fix (non-breaking change which fixes an issue

# Checklist:
  • Loading branch information
timorleph committed Mar 5, 2025
1 parent 22ea4bd commit 0caac44
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 40 deletions.
2 changes: 2 additions & 0 deletions clique/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub use crypto::{PublicKey, SecretKey};
pub use rate_limiting::{RateLimitingDialer, RateLimitingListener};
pub use service::{Service, SpawnHandleExt, SpawnHandleT};

const SEND_DATA_BUFFER: usize = 10;

const LOG_TARGET: &str = "network-clique";
/// A basic alias for properties we expect basic data to satisfy.
pub trait Data: Clone + Codec + Send + Sync + 'static {}
Expand Down
27 changes: 15 additions & 12 deletions clique/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use direction::DirectedPeers;
pub enum SendError {
/// Outgoing network connection closed
ConnectionClosed,
/// The underlying network is not keeping up with sending.
FullChannel,
/// Peer not added to the manager
PeerNotFound,
}
Expand All @@ -24,6 +26,7 @@ impl Display for SendError {
use SendError::*;
match self {
ConnectionClosed => write!(f, "worker dead"),
FullChannel => write!(f, "too many messages"),
PeerNotFound => write!(f, "peer not found"),
}
}
Expand Down Expand Up @@ -158,7 +161,7 @@ pub struct Manager<PK: PublicKey + PeerId, A: Data, D: Data> {
// Which peers we want to be connected with, and which way.
wanted: DirectedPeers<PK, A>,
// This peers we are connected with. We ensure that this is always a subset of what we want.
have: HashMap<PK, mpsc::UnboundedSender<D>>,
have: HashMap<PK, mpsc::Sender<D>>,
}

impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
Expand Down Expand Up @@ -192,11 +195,7 @@ impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
}

/// Add an established connection with a known peer, but only if the peer is among the peers we want to be connected to.
pub fn add_connection(
&mut self,
peer_id: PK,
data_for_network: mpsc::UnboundedSender<D>,
) -> AddResult {
pub fn add_connection(&mut self, peer_id: PK, data_for_network: mpsc::Sender<D>) -> AddResult {
use AddResult::*;
if !self.wanted.interested(&peer_id) {
return Uninterested;
Expand All @@ -219,10 +218,13 @@ impl<PK: PublicKey + PeerId, A: Data, D: Data> Manager<PK, A, D> {
/// or if the connection is dead.
pub fn send_to(&mut self, peer_id: &PK, data: D) -> Result<(), SendError> {
self.have
.get(peer_id)
.get_mut(peer_id)
.ok_or(SendError::PeerNotFound)?
.unbounded_send(data)
.map_err(|_| SendError::ConnectionClosed)
.try_send(data)
.map_err(|e| match e.is_full() {
true => SendError::FullChannel,
false => SendError::ConnectionClosed,
})
}

/// A status of the manager, to be displayed somewhere.
Expand All @@ -243,6 +245,7 @@ mod tests {
use crate::{
metrics::Metrics,
mock::{key, MockPublicKey},
SEND_DATA_BUFFER,
};

type Data = String;
Expand Down Expand Up @@ -286,7 +289,7 @@ mod tests {
Manager::<MockPublicKey, Address, Data>::new(listening_id.clone(), Metrics::noop());
let data = String::from("DATA");
let address = String::from("43.43.43.43:43000");
let (tx, _rx) = mpsc::unbounded();
let (tx, _rx) = mpsc::channel(SEND_DATA_BUFFER);
// try add unknown peer
assert_eq!(
connecting_manager.add_connection(listening_id.clone(), tx),
Expand All @@ -308,7 +311,7 @@ mod tests {
assert!(connecting_manager.add_peer(listening_id.clone(), address.clone()));
}
// add outgoing to connecting
let (tx, mut rx) = mpsc::unbounded();
let (tx, mut rx) = mpsc::channel(SEND_DATA_BUFFER);
assert_eq!(
connecting_manager.add_connection(listening_id.clone(), tx),
Added
Expand All @@ -319,7 +322,7 @@ mod tests {
.is_ok());
assert_eq!(data, rx.next().await.expect("should receive"));
// add incoming to listening
let (tx, mut rx) = mpsc::unbounded();
let (tx, mut rx) = mpsc::channel(SEND_DATA_BUFFER);
assert_eq!(
listening_manager.add_connection(connecting_id.clone(), tx),
Added
Expand Down
2 changes: 1 addition & 1 deletion clique/src/protocols/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub type Version = u32;
/// What connections send back to the service after they become established. Starts with a public
/// key of the remote node, followed by a channel for sending data to that node, with None if the
/// connection was unsuccessful and should be reestablished.
pub type ResultForService<PK, D> = (PK, Option<mpsc::UnboundedSender<D>>);
pub type ResultForService<PK, D> = (PK, Option<mpsc::Sender<D>>);

/// Defines the protocol for communication. Currently single variant, but left in case of protocol change.
#[derive(Debug, PartialEq, Eq)]
Expand Down
26 changes: 13 additions & 13 deletions clique/src/protocols/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
handshake::{v0_handshake_incoming, v0_handshake_outgoing},
ProtocolError, ResultForService,
},
Data, PublicKey, SecretKey, Splittable, LOG_TARGET,
Data, PublicKey, SecretKey, Splittable, LOG_TARGET, SEND_DATA_BUFFER,
};

const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
Expand All @@ -43,7 +43,7 @@ async fn check_authorization<SK: SecretKey>(

async fn sending<PK: PublicKey, D: Data, S: AsyncWrite + Unpin + Send>(
mut sender: S,
mut data_from_user: mpsc::UnboundedReceiver<D>,
mut data_from_user: mpsc::Receiver<D>,
) -> Result<(), ProtocolError<PK>> {
use Message::*;
loop {
Expand Down Expand Up @@ -94,7 +94,7 @@ async fn manage_connection<
>(
sender: S,
receiver: R,
data_from_user: mpsc::UnboundedReceiver<D>,
data_from_user: mpsc::Receiver<D>,
data_for_user: mpsc::UnboundedSender<D>,
) -> Result<(), ProtocolError<PK>> {
let sending = sending(sender, data_from_user);
Expand Down Expand Up @@ -122,7 +122,7 @@ pub async fn outgoing<SK: SecretKey, D: Data, S: Splittable>(
target: LOG_TARGET,
"Outgoing handshake with {} finished successfully.", public_key
);
let (data_for_network, data_from_user) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::channel(SEND_DATA_BUFFER);
result_for_parent
.unbounded_send((public_key.clone(), Some(data_for_network)))
.map_err(|_| ProtocolError::NoParentConnection)?;
Expand Down Expand Up @@ -160,7 +160,7 @@ pub async fn incoming<SK: SecretKey, D: Data, S: Splittable>(
return Err(ProtocolError::NotAuthorized);
}

let (data_for_network, data_from_user) = mpsc::unbounded();
let (data_for_network, data_from_user) = mpsc::channel(SEND_DATA_BUFFER);
result_for_parent
.unbounded_send((public_key.clone(), Some(data_for_network)))
.map_err(|_| ProtocolError::NoParentConnection)?;
Expand Down Expand Up @@ -288,12 +288,12 @@ mod tests {
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_outgoing.next() => {
let (_, maybe_data_for_outgoing) = result.expect("the channel shouldn't be dropped");
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
let mut data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
data_for_outgoing
.unbounded_send(vec![4, 3, 43])
.try_send(vec![4, 3, 43])
.expect("should send");
data_for_outgoing
.unbounded_send(vec![2, 1, 3, 7])
.try_send(vec![2, 1, 3, 7])
.expect("should send");
data_for_outgoing
},
Expand All @@ -303,12 +303,12 @@ mod tests {
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_incoming.next() => {
let (_, maybe_data_for_incoming) = result.expect("the channel shouldn't be dropped");
let data_for_incoming = maybe_data_for_incoming.expect("successfully connected");
let mut data_for_incoming = maybe_data_for_incoming.expect("successfully connected");
data_for_incoming
.unbounded_send(vec![5, 4, 44])
.try_send(vec![5, 4, 44])
.expect("should send");
data_for_incoming
.unbounded_send(vec![3, 2, 4, 8])
.try_send(vec![3, 2, 4, 8])
.expect("should send");
data_for_incoming
},
Expand Down Expand Up @@ -426,9 +426,9 @@ mod tests {
_ = &mut outgoing_handle => panic!("outgoing process unexpectedly finished"),
result = result_from_outgoing.next() => {
let (_, maybe_data_for_outgoing) = result.expect("the channel shouldn't be dropped");
let data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
let mut data_for_outgoing = maybe_data_for_outgoing.expect("successfully connected");
data_for_outgoing
.unbounded_send(vec![2, 1, 3, 7])
.try_send(vec![2, 1, 3, 7])
.expect("should send");
data_for_outgoing
},
Expand Down
19 changes: 5 additions & 14 deletions clique/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use std::{
};

use futures::{
channel::{
mpsc::{self, UnboundedSender},
oneshot,
},
channel::{mpsc, oneshot},
Future, StreamExt, TryFutureExt,
};
use log::{info, trace, warn};
Expand Down Expand Up @@ -250,18 +247,15 @@ where
fn add_connection(
&mut self,
public_key: SK::PublicKey,
data_for_network: mpsc::UnboundedSender<D>,
data_for_network: mpsc::Sender<D>,
) -> AddResult {
self.manager.add_connection(public_key, data_for_network)
}

fn handle_command(
&mut self,
command: ServiceCommand<<SK as SecretKey>::PublicKey, D, A>,
result_for_parent: &UnboundedSender<(
<SK as SecretKey>::PublicKey,
Option<UnboundedSender<D>>,
)>,
result_for_parent: &mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
) {
use ServiceCommand::*;
match command {
Expand Down Expand Up @@ -306,11 +300,8 @@ where
fn handle_data_for_network(
&mut self,
public_key: <SK as SecretKey>::PublicKey,
maybe_data_for_network: Option<UnboundedSender<D>>,
result_for_parent: &UnboundedSender<(
<SK as SecretKey>::PublicKey,
Option<UnboundedSender<D>>,
)>,
maybe_data_for_network: Option<mpsc::Sender<D>>,
result_for_parent: &mpsc::UnboundedSender<ResultForService<SK::PublicKey, D>>,
) {
use AddResult::*;
match maybe_data_for_network {
Expand Down

0 comments on commit 0caac44

Please # to comment.