diff --git a/.cursor/rules/p2p.mdc b/.cursor/rules/p2p.mdc new file mode 100644 index 000000000..f1302339c --- /dev/null +++ b/.cursor/rules/p2p.mdc @@ -0,0 +1,46 @@ +--- +description: Peer to peer networking expert & Rust engineer +globs: **/*.rs +--- +# Peer to peer networking expert + +## P2P Development Cursor (Rust + libp2p) + +1. **Initial Analysis** + - Review input requirements and resources + - Identify minimal viable protocol components + - Review code that exists. + - Never build new when it already exists. Improve instead, remove, and optimize. + - Move tests to a `src/tests` directory, create it if it doesn't exist. + +2. **Implementation Flow** + - Start with concise implementations and build from there. + - Suggest improvements with sound reasoning. + - Leverage libp2p's modular design patterns + - Leverage existing code and improve it, optimize it. + - When a code snippet is provided, understand it, and adapt it with existing code. These are meant as resources not as copy/pastes. + +3. **Code Standards** + - Implement proper error handling with custom error types + - Prioritize concise and efficient code. + - Add relevant and detailed documentation + - Always put tests inside a `src/tests` directory + +4. **Efficiency Guidelines** + - Prefer bounded channels for peer message handling + - Implement connection pooling where appropriate + - Leverage existing libp2p protocols before custom ones + +5. **Review & Integration** + - Verify protocol compatibility + - Test network behaviour under various conditions + - Test in `src/tests` directory + - Ensure proper resource cleanup + - Document failure modes and recovery + +6. **Core Principles** + - Start minimal, expand as needed + - Test thoroughly between iterations + - Maintain clear protocol boundaries + - Document network assumptions and requirements + - \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 51ad6242a..30e3ecbc9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7147,6 +7147,7 @@ dependencies = [ name = "gadget-networking" version = "0.1.0" dependencies = [ + "anyhow", "async-trait", "auto_impl", "bincode", @@ -7168,6 +7169,7 @@ dependencies = [ "serde_json", "thiserror 2.0.11", "tokio", + "tokio-stream", "tracing", "tracing-subscriber 0.3.19", ] diff --git a/Cargo.toml b/Cargo.toml index 520139240..2aa2e85ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -104,9 +104,12 @@ gadget-runner-symbiotic = { version = "0.1.0", path = "./crates/runners/symbioti gadget-config = { version = "0.1.0", path = "./crates/config", default-features = false } gadget-keystore = { version = "0.1.0", path = "./crates/keystore", default-features = false } gadget-logging = { version = "0.1.0", path = "./crates/logging", default-features = false } -gadget-networking = { version = "0.1.0", path = "./crates/networking", default-features = false } gadget-std = { version = "0.1.0", path = "./crates/std", default-features = false } +# P2P +gadget-networking = { version = "0.1.0", path = "./crates/networking", default-features = false } +gadget-networking-behaviours = { version = "0.1.0", path = "./crates/networking/behaviours", default-features = false } + # Utilities gadget-utils = { version = "0.1.0", path = "./crates/utils", default-features = false } gadget-utils-evm = { version = "0.1.0", path = "./crates/utils/evm", default-features = false } @@ -145,6 +148,7 @@ futures-util = { version = "0.3.31", default-features = false } tokio = { version = "1.40", default-features = false } tokio-util = { version = "0.7.12", default-features = false } tokio-cron-scheduler = "0.13.0" +tokio-stream = { version = "0.1.17", default-features = false } # CLI & Configuration cargo-generate = { version = "0.22.1", default-features = false } diff --git a/crates/networking/Cargo.toml b/crates/networking/Cargo.toml index 1f17f1047..5e245f343 100644 --- a/crates/networking/Cargo.toml +++ b/crates/networking/Cargo.toml @@ -19,6 +19,7 @@ blake3 = { workspace = true } dashmap = { workspace = true } libp2p = { workspace = true } tokio = { workspace = true, features = ["macros"] } +tokio-stream = { workspace = true, features = ["time"] } futures = { workspace = true } tracing = { workspace = true } bincode = { workspace = true } @@ -30,6 +31,7 @@ hex = { workspace = true } itertools = { workspace = true, features = ["use_alloc"] } parking_lot = { workspace = true } thiserror = { workspace = true } +anyhow = { workspace = true } # Crypto dependencies gadget-crypto = { workspace = true, features = ["k256", "hashing"] } @@ -58,6 +60,7 @@ features = [ "ping", "dns", "autonat", + "upnp", ] [dev-dependencies] diff --git a/crates/networking/src/behaviours.rs b/crates/networking/src/behaviours.rs index 32257bd93..3c843d0e1 100644 --- a/crates/networking/src/behaviours.rs +++ b/crates/networking/src/behaviours.rs @@ -1,55 +1,119 @@ -use crate::key_types::{GossipMsgPublicKey, GossipSignedMsgSignature}; -use libp2p::{gossipsub, kad::store::MemoryStore, mdns, request_response, swarm::NetworkBehaviour}; -use serde::{Deserialize, Serialize}; - -#[non_exhaustive] -#[derive(Serialize, Deserialize, Debug)] -// TODO: Needs better name -pub enum GossipOrRequestResponse { - Gossip(GossipMessage), - Request(MyBehaviourRequest), - Response(MyBehaviourResponse), -} +use crate::key_types::{InstanceMsgKeyPair, InstanceMsgPublicKey}; +use crate::{ + blueprint_protocol::{BlueprintProtocolBehaviour, BlueprintProtocolEvent}, + discovery::{ + behaviour::{DiscoveryBehaviour, DiscoveryEvent}, + config::DiscoveryConfig, + PeerInfo, PeerManager, + }, +}; +use libp2p::{ + connection_limits::{self, ConnectionLimits}, + identity::Keypair, + kad::QueryId, + ping, + swarm::NetworkBehaviour, + Multiaddr, PeerId, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; -#[derive(Serialize, Deserialize, Debug)] -pub struct GossipMessage { - pub topic: String, - pub raw_payload: Vec, -} +const MAX_ESTABLISHED_PER_PEER: u32 = 4; -#[non_exhaustive] -#[derive(Serialize, Deserialize, Debug)] -pub enum MyBehaviourRequest { - Handshake { - public_key: GossipMsgPublicKey, - signature: GossipSignedMsgSignature, - }, - Message { - topic: String, - raw_payload: Vec, - }, +/// Events that can be emitted by the GadgetBehavior +#[derive(Debug)] +pub enum GadgetEvent { + /// Discovery-related events + Discovery(DiscoveryEvent), + /// Ping events for connection liveness + Ping(ping::Event), + /// Blueprint protocol events + Blueprint(BlueprintProtocolEvent), } -#[non_exhaustive] -#[derive(Serialize, Deserialize, Debug)] -pub enum MyBehaviourResponse { - Handshaked { - public_key: GossipMsgPublicKey, - signature: GossipSignedMsgSignature, - }, - MessageHandled, +#[derive(NetworkBehaviour)] +pub struct GadgetBehaviour { + /// Connection limits to prevent DoS + connection_limits: connection_limits::Behaviour, + /// Discovery mechanisms (Kademlia, mDNS, etc) + pub(super) discovery: DiscoveryBehaviour, + /// Direct P2P messaging and gossip + pub(super) blueprint_protocol: BlueprintProtocolBehaviour, + /// Connection liveness checks + ping: ping::Behaviour, } -// We create a custom network behaviour that combines Gossipsub and Mdns. -#[derive(NetworkBehaviour)] -pub struct MyBehaviour { - pub gossipsub: gossipsub::Behaviour, - pub mdns: mdns::tokio::Behaviour, - pub p2p: request_response::cbor::Behaviour, - pub identify: libp2p::identify::Behaviour, - pub kadmelia: libp2p::kad::Behaviour, - pub dcutr: libp2p::dcutr::Behaviour, - pub relay: libp2p::relay::Behaviour, - pub ping: libp2p::ping::Behaviour, - pub autonat: libp2p::autonat::Behaviour, +impl GadgetBehaviour { + pub fn new( + network_name: &str, + blueprint_protocol_name: &str, + local_key: &Keypair, + instance_secret_key: &InstanceMsgKeyPair, + instance_public_key: &InstanceMsgPublicKey, + target_peer_count: u64, + peer_manager: Arc, + ) -> Self { + let connection_limits = connection_limits::Behaviour::new( + ConnectionLimits::default() + .with_max_pending_incoming(Some( + target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER, + )) + .with_max_pending_outgoing(Some( + target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER, + )) + .with_max_established_incoming(Some( + target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER, + )) + .with_max_established_outgoing(Some( + target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER, + )) + .with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER)), + ); + + let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(30))); + + let discovery = DiscoveryConfig::new(local_key.public(), network_name) + .with_mdns(true) + .with_kademlia(true) + .with_target_peer_count(target_peer_count) + .build() + .unwrap(); + + let blueprint_protocol = BlueprintProtocolBehaviour::new( + local_key, + instance_secret_key, + instance_public_key, + peer_manager, + blueprint_protocol_name, + ); + + Self { + connection_limits, + discovery, + blueprint_protocol, + ping, + } + } + + /// Bootstrap Kademlia network + pub fn bootstrap(&mut self) -> Result { + self.discovery.bootstrap() + } + + /// Returns a set of peer ids + pub fn peers(&self) -> &HashSet { + self.discovery.get_peers() + } + + /// Returns a map of peer ids and their multi-addresses + pub fn peer_addresses(&self) -> HashMap> { + self.discovery.get_peer_addresses() + } + + pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.discovery.get_peer_info(peer_id) + } } diff --git a/crates/networking/src/blueprint_protocol/behaviour.rs b/crates/networking/src/blueprint_protocol/behaviour.rs new file mode 100644 index 000000000..86603a6be --- /dev/null +++ b/crates/networking/src/blueprint_protocol/behaviour.rs @@ -0,0 +1,372 @@ +use crate::{ + Curve, InstanceMsgKeyPair, InstanceMsgPublicKey, InstanceSignedMsgSignature, KeySignExt, +}; +use dashmap::{DashMap, DashSet}; +use gadget_crypto::{hashing::blake3_256, KeyType}; +use gadget_logging::{debug, trace, warn}; +use libp2p::{ + core::transport::PortUse, + gossipsub::{self, IdentTopic, MessageAuthenticity, MessageId, Sha256Topic}, + identity::Keypair, + request_response::{self, OutboundRequestId, ResponseChannel}, + swarm::{ + ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, + THandlerOutEvent, ToSwarm, + }, + Multiaddr, PeerId, StreamProtocol, +}; +use std::{ + sync::Arc, + task::Poll, + time::{Duration, Instant}, +}; + +use crate::discovery::PeerManager; + +use super::{InstanceMessageRequest, InstanceMessageResponse}; + +#[derive(NetworkBehaviour)] +pub struct DerivedBlueprintProtocolBehaviour { + /// Request/response protocol for p2p messaging + request_response: + request_response::cbor::Behaviour, + /// Gossipsub for broadcast messaging + gossipsub: gossipsub::Behaviour, +} + +/// Events emitted by the BlueprintProtocolBehaviour +#[derive(Debug)] +pub enum BlueprintProtocolEvent { + /// Request received from a peer + Request { + peer: PeerId, + request: InstanceMessageRequest, + channel: ResponseChannel, + }, + /// Response received from a peer + Response { + peer: PeerId, + request_id: OutboundRequestId, + response: InstanceMessageResponse, + }, + /// Gossip message received + GossipMessage { + source: PeerId, + message: Vec, + topic: IdentTopic, + }, +} + +/// Behaviour that handles the blueprint protocol request/response and gossip +pub struct BlueprintProtocolBehaviour { + /// Request/response protocol for direct messaging + blueprint_protocol: DerivedBlueprintProtocolBehaviour, + /// Name of the blueprint protocol + pub(crate) blueprint_protocol_name: String, + /// Peer manager for tracking peer states + pub(crate) peer_manager: Arc, + /// Libp2p peer ID + pub(crate) local_peer_id: PeerId, + /// Instance public key for handshakes and blueprint_protocol + pub(crate) instance_public_key: InstanceMsgPublicKey, + /// Instance secret key for handshakes and blueprint_protocol + pub(crate) instance_secret_key: InstanceMsgKeyPair, + /// Peers with pending inbound handshakes + pub(crate) inbound_handshakes: DashMap, + /// Peers with pending outbound handshakes + pub(crate) outbound_handshakes: DashMap, + /// Active response channels + pub(crate) response_channels: + DashMap>, +} + +impl BlueprintProtocolBehaviour { + /// Create a new blueprint protocol behaviour + pub fn new( + local_key: &Keypair, + instance_secret_key: &InstanceMsgKeyPair, + instance_public_key: &InstanceMsgPublicKey, + peer_manager: Arc, + blueprint_protocol_name: &str, + ) -> Self { + let blueprint_protocol_name = blueprint_protocol_name.to_string(); + let protocols = vec![( + StreamProtocol::try_from_owned(blueprint_protocol_name.to_string()) + .unwrap_or_else(|_| StreamProtocol::new("/blueprint_protocol/1.0.0")), + request_response::ProtocolSupport::Full, + )]; + + // Initialize gossipsub with message signing + let gossipsub_config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(Duration::from_secs(1)) + .validation_mode(gossipsub::ValidationMode::Strict) + .build() + .expect("Valid gossipsub config"); + + let gossipsub = gossipsub::Behaviour::new( + MessageAuthenticity::Signed(local_key.clone()), + gossipsub_config, + ) + .expect("Valid gossipsub behaviour"); + + let config = request_response::Config::default() + .with_request_timeout(Duration::from_secs(30)) + .with_max_concurrent_streams(50); + + let blueprint_protocol = DerivedBlueprintProtocolBehaviour { + request_response: request_response::cbor::Behaviour::new(protocols, config), + gossipsub, + }; + + let local_peer_id = local_key.public().to_peer_id(); + + Self { + blueprint_protocol, + blueprint_protocol_name, + peer_manager, + local_peer_id, + instance_public_key: instance_public_key.clone(), + instance_secret_key: instance_secret_key.clone(), + inbound_handshakes: DashMap::new(), + outbound_handshakes: DashMap::new(), + response_channels: DashMap::new(), + } + } + + /// Sign a handshake message for a peer + pub(crate) fn sign_handshake(&self, peer: &PeerId) -> InstanceSignedMsgSignature { + let msg = peer.to_bytes(); + let msg_hash = blake3_256(&msg); + self.instance_secret_key.sign_prehash(&msg_hash) + } + + /// Send a request to a peer + pub fn send_request( + &mut self, + peer: &PeerId, + request: InstanceMessageRequest, + ) -> OutboundRequestId { + debug!(%peer, ?request, "sending request"); + self.blueprint_protocol + .request_response + .send_request(peer, request) + } + + /// Send a response through a response channel + pub fn send_response( + &mut self, + channel: ResponseChannel, + response: InstanceMessageResponse, + ) -> Result<(), InstanceMessageResponse> { + debug!(?response, "sending response"); + self.blueprint_protocol + .request_response + .send_response(channel, response) + } + + /// Subscribe to a gossip topic + pub fn subscribe(&mut self, topic: &str) -> Result { + let topic = Sha256Topic::new(topic); + self.blueprint_protocol.gossipsub.subscribe(&topic) + } + + /// Publish a message to a gossip topic + pub fn publish( + &mut self, + topic: &str, + data: impl Into>, + ) -> Result { + let topic = Sha256Topic::new(topic); + self.blueprint_protocol.gossipsub.publish(topic, data) + } + + /// Verify and handle a handshake with a peer + pub fn verify_handshake( + &self, + peer: &PeerId, + public_key: &InstanceMsgPublicKey, + signature: &InstanceSignedMsgSignature, + ) -> Result<(), InstanceMessageResponse> { + let msg = peer.to_bytes(); + let valid = ::verify(public_key, &msg, signature); + if !valid { + warn!("Invalid initial handshake signature from peer: {peer}"); + return Err(InstanceMessageResponse::Error { + code: 400, + message: "Invalid handshake signature".to_string(), + }); + } + + trace!("Received valid handshake from peer: {peer}"); + + Ok(()) + } + + pub fn handle_handshake( + &self, + peer: &PeerId, + public_key: &InstanceMsgPublicKey, + signature: &InstanceSignedMsgSignature, + ) -> Result<(), InstanceMessageResponse> { + self.verify_handshake(peer, public_key, signature)?; + self.peer_manager + .add_peer_id_to_public_key(peer, public_key); + + Ok(()) + } + /// Handle a failed handshake with a peer + pub fn handle_handshake_failure(&self, peer: &PeerId, reason: &str) { + // Update peer info and potentially ban peer + if let Some(mut peer_info) = self.peer_manager.get_peer_info(peer) { + peer_info.failures += 1; + self.peer_manager.update_peer(*peer, peer_info.clone()); + + // Ban peer if too many failures + if peer_info.failures >= 3 { + self.peer_manager + .ban_peer(*peer, reason, Some(Duration::from_secs(300))); + } + } + } +} + +impl NetworkBehaviour for BlueprintProtocolBehaviour { + type ConnectionHandler = + ::ConnectionHandler; + + type ToSwarm = BlueprintProtocolEvent; + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result, ConnectionDenied> { + self.blueprint_protocol + .handle_established_inbound_connection(connection_id, peer, local_addr, remote_addr) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &Multiaddr, + role_override: libp2p::core::Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { + self.blueprint_protocol + .handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn handle_pending_inbound_connection( + &mut self, + connection_id: ConnectionId, + local_addr: &libp2p::Multiaddr, + remote_addr: &libp2p::Multiaddr, + ) -> Result<(), ConnectionDenied> { + self.blueprint_protocol.handle_pending_inbound_connection( + connection_id, + local_addr, + remote_addr, + ) + } + + fn handle_pending_outbound_connection( + &mut self, + connection_id: ConnectionId, + maybe_peer: Option, + addresses: &[libp2p::Multiaddr], + effective_role: libp2p::core::Endpoint, + ) -> Result, ConnectionDenied> { + self.blueprint_protocol.handle_pending_outbound_connection( + connection_id, + maybe_peer, + addresses, + effective_role, + ) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection_id: ConnectionId, + event: THandlerOutEvent, + ) { + self.blueprint_protocol + .on_connection_handler_event(peer_id, connection_id, event) + } + + fn on_swarm_event(&mut self, event: FromSwarm<'_>) { + if let FromSwarm::ConnectionEstablished(e) = &event { + if e.other_established == 0 { + self.inbound_handshakes.insert(e.peer_id, Instant::now()); + } + } + + self.blueprint_protocol.on_swarm_event(event) + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> Poll>> { + while let Poll::Ready(ev) = self.blueprint_protocol.poll(cx) { + match ev { + ToSwarm::GenerateEvent(ev) => match ev { + DerivedBlueprintProtocolBehaviourEvent::RequestResponse( + blueprint_protocol_event, + ) => self.handle_request_response_event(blueprint_protocol_event), + DerivedBlueprintProtocolBehaviourEvent::Gossipsub(gossip_event) => { + self.handle_gossipsub_event(gossip_event) + } + }, + ToSwarm::Dial { opts } => { + return Poll::Ready(ToSwarm::Dial { opts }); + } + ToSwarm::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(ToSwarm::NotifyHandler { + peer_id, + handler, + event, + }) + } + ToSwarm::CloseConnection { + peer_id, + connection, + } => { + return Poll::Ready(ToSwarm::CloseConnection { + peer_id, + connection, + }) + } + ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), + ToSwarm::RemoveListener { id } => { + return Poll::Ready(ToSwarm::RemoveListener { id }) + } + ToSwarm::NewExternalAddrCandidate(addr) => { + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr)) + } + ToSwarm::ExternalAddrConfirmed(addr) => { + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) + } + ToSwarm::ExternalAddrExpired(addr) => { + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) + } + _ => {} + } + } + + Poll::Pending + } +} diff --git a/crates/networking/src/blueprint_protocol/handler.rs b/crates/networking/src/blueprint_protocol/handler.rs new file mode 100644 index 000000000..40ab5b2a0 --- /dev/null +++ b/crates/networking/src/blueprint_protocol/handler.rs @@ -0,0 +1,244 @@ +use std::time::{Duration, Instant}; + +use libp2p::{gossipsub, request_response, PeerId}; +use tracing::{debug, warn}; + +use crate::key_types::InstanceMsgPublicKey; + +use super::{BlueprintProtocolBehaviour, InstanceMessageRequest, InstanceMessageResponse}; + +const INBOUND_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); +const OUTBOUND_HANDSHAKE_TIMEOUT: Duration = Duration::from_secs(30); + +impl BlueprintProtocolBehaviour { + pub fn handle_request_response_event( + &mut self, + event: request_response::Event, + ) { + match event { + request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request: + InstanceMessageRequest::Handshake { + public_key, + signature, + }, + channel, + .. + }, + .. + } => { + debug!(%peer, "Received handshake request"); + + // Check if we already have a pending outbound handshake + if let Some(outbound_time) = self.outbound_handshakes.get(&peer) { + // If we have an outbound handshake and their peer_id is less than ours, + // we should wait for their response instead of responding to their request + // TODO: Fix + // if peer < &self.local_peer_id { + // debug!(%peer, "Deferring inbound handshake - waiting for outbound response"); + // return; + // } + // If we have an outbound handshake and their peer_id is greater than ours, + // we should handle their request and cancel our outbound attempt + self.outbound_handshakes.remove(&peer); + } + + // Verify the handshake + match self.verify_handshake(&peer, &public_key, &signature) { + Ok(()) => { + // Store the handshake request + self.inbound_handshakes.insert(peer, Instant::now()); + + // Send handshake response + let response = InstanceMessageResponse::Handshake { + public_key: self.instance_public_key.clone(), + signature: self.sign_handshake(&peer), + }; + + if let Err(e) = self.send_response(channel, response) { + warn!(%peer, "Failed to send handshake response: {:?}", e); + self.handle_handshake_failure(&peer, "Failed to send response"); + return; + } + + // Mark handshake as completed + self.complete_handshake(&peer, &public_key); + } + Err(e) => { + warn!(%peer, "Invalid handshake request: {:?}", e); + let response = InstanceMessageResponse::Error { + code: 400, + message: format!("Invalid handshake: {:?}", e), + }; + if let Err(e) = self.send_response(channel, response) { + warn!(%peer, "Failed to send error response: {:?}", e); + } + self.handle_handshake_failure(&peer, "Invalid handshake"); + } + } + } + request_response::Event::Message { + peer, + message: + request_response::Message::Response { + response: + InstanceMessageResponse::Handshake { + public_key, + signature, + }, + .. + }, + .. + } => { + debug!(%peer, "Received handshake response"); + + // Verify we have a pending outbound handshake + if !self.outbound_handshakes.contains_key(&peer) { + warn!(%peer, "Received unexpected handshake response"); + return; + } + + // Verify the handshake + match self.verify_handshake(&peer, &public_key, &signature) { + Ok(()) => { + // Mark handshake as completed + self.complete_handshake(&peer, &public_key); + } + Err(e) => { + warn!(%peer, "Invalid handshake response: {:?}", e); + self.handle_handshake_failure(&peer, "Invalid handshake response"); + } + } + + // Remove the outbound handshake + self.outbound_handshakes.remove(&peer); + } + request_response::Event::Message { + peer, + message: + request_response::Message::Response { + response: InstanceMessageResponse::Error { code, message }, + .. + }, + .. + } => { + warn!(%peer, code, %message, "Received error response"); + self.handle_handshake_failure(&peer, &message); + } + request_response::Event::Message { + peer, + message: + request_response::Message::Request { + request: + InstanceMessageRequest::Protocol { + protocol, + payload, + metadata, + }, + channel, + .. + }, + .. + } => { + // Only accept protocol messages from peers we've completed handshakes with + if !self.peer_manager.is_peer_verified(&peer) { + warn!(%peer, "Received protocol message from unverified peer"); + let response = InstanceMessageResponse::Error { + code: 403, + message: "Handshake required".to_string(), + }; + if let Err(e) = self.send_response(channel, response) { + warn!(%peer, "Failed to send error response: {:?}", e); + } + return; + } + + debug!(%peer, %protocol, "Received protocol request"); + // Handle protocol message... + } + _ => {} + } + + // Check for expired handshakes + self.check_expired_handshakes(); + } + + /// Check for and remove expired handshakes + fn check_expired_handshakes(&mut self) { + let now = Instant::now(); + + // Check inbound handshakes + let expired_inbound: Vec<_> = self + .inbound_handshakes + .clone() + .into_read_only() + .iter() + .filter(|(_, &time)| now.duration_since(time) > INBOUND_HANDSHAKE_TIMEOUT) + .map(|(peer_id, _)| *peer_id) + .collect(); + + for peer_id in expired_inbound { + self.inbound_handshakes.remove(&peer_id); + self.handle_handshake_failure(&peer_id, "Inbound handshake timeout"); + } + + // Check outbound handshakes + let expired_outbound: Vec<_> = self + .outbound_handshakes + .clone() + .into_read_only() + .iter() + .filter(|(_, &time)| now.duration_since(time) > OUTBOUND_HANDSHAKE_TIMEOUT) + .map(|(peer_id, _)| *peer_id) + .collect(); + + for peer_id in expired_outbound { + self.outbound_handshakes.remove(&peer_id); + self.handle_handshake_failure(&peer_id, "Outbound handshake timeout"); + } + } + + /// Complete a successful handshake + fn complete_handshake(&mut self, peer: &PeerId, public_key: &InstanceMsgPublicKey) { + debug!(%peer, "Completed handshake"); + + // Remove from pending handshakes + self.inbound_handshakes.remove(peer); + self.outbound_handshakes.remove(peer); + + // Add to verified peers + self.peer_manager.verify_peer(peer); + + // Update peer manager + self.peer_manager + .add_peer_id_to_public_key(peer, public_key); + } + + pub fn handle_gossipsub_event(&mut self, event: gossipsub::Event) { + match event { + gossipsub::Event::Message { + propagation_source, + message_id, + message, + } => { + // Only accept gossip from verified peers + if !self.peer_manager.is_peer_verified(&propagation_source) { + warn!(%propagation_source, "Received gossip from unverified peer"); + return; + } + + debug!(%propagation_source, "Received gossip message"); + } + gossipsub::Event::Subscribed { peer_id, topic } => { + debug!(%peer_id, %topic, "Peer subscribed to topic"); + } + gossipsub::Event::Unsubscribed { peer_id, topic } => { + debug!(%peer_id, %topic, "Peer unsubscribed from topic"); + } + _ => {} + } + } +} diff --git a/crates/networking/src/blueprint_protocol/mod.rs b/crates/networking/src/blueprint_protocol/mod.rs new file mode 100644 index 000000000..9f4efe078 --- /dev/null +++ b/crates/networking/src/blueprint_protocol/mod.rs @@ -0,0 +1,57 @@ +mod behaviour; +mod handler; + +pub use behaviour::{BlueprintProtocolBehaviour, BlueprintProtocolEvent}; + +use crate::key_types::{InstanceMsgPublicKey, InstanceSignedMsgSignature}; +use serde::{Deserialize, Serialize}; + +/// A message sent to a specific instance or broadcast to all instances +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum InstanceMessageRequest { + /// Handshake request with authentication + Handshake { + /// Public key for authentication + public_key: InstanceMsgPublicKey, + /// Signature for verification + signature: InstanceSignedMsgSignature, + }, + /// Protocol-specific message with custom payload + Protocol { + /// Protocol identifier (e.g., "consensus/1.0.0", "sync/1.0.0") + protocol: String, + /// Protocol-specific message payload + payload: Vec, + /// Optional metadata for the protocol handler + metadata: Option>, + }, +} + +/// Response to an instance message +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum InstanceMessageResponse { + /// Handshake response with authentication + Handshake { + /// Public key for authentication + public_key: InstanceMsgPublicKey, + /// Signature for verification + signature: InstanceSignedMsgSignature, + }, + /// Success response with optional data + Success { + /// Response data specific to the protocol + data: Option>, + }, + /// Error response with details + Error { + /// Error code + code: u16, + /// Error message + message: String, + }, + /// Protocol-specific response + Protocol { + /// Protocol-specific response data + data: Vec, + }, +} diff --git a/crates/networking/src/discovery/behaviour.rs b/crates/networking/src/discovery/behaviour.rs new file mode 100644 index 000000000..cabb5cdc7 --- /dev/null +++ b/crates/networking/src/discovery/behaviour.rs @@ -0,0 +1,353 @@ +use std::{ + cmp, + collections::{HashMap, HashSet, VecDeque}, + task::{Context, Poll}, + time::Duration, +}; + +use gadget_logging::trace; +use libp2p::{ + autonat, + core::Multiaddr, + identify, + identity::PeerId, + kad::{self, store::MemoryStore}, + mdns::{tokio::Behaviour as Mdns, Event as MdnsEvent}, + relay, + swarm::{ + behaviour::toggle::Toggle, derive_prelude::*, dial_opts::DialOpts, NetworkBehaviour, + ToSwarm, + }, + upnp, +}; +use tokio::time::Interval; +use tracing::{debug, info}; + +use super::PeerInfo; + +#[derive(NetworkBehaviour)] +pub struct DerivedDiscoveryBehaviour { + /// Kademlia discovery + pub kademlia: Toggle>, + /// Local network discovery via mDNS + pub mdns: Toggle, + /// Identify protocol for peer information exchange + pub identify: identify::Behaviour, + /// NAT traversal + pub autonat: autonat::Behaviour, + /// UPnP port mapping + pub upnp: Toggle, + /// Circuit relay for NAT traversal + pub relay: Toggle, +} + +/// Event generated by the `DiscoveryBehaviour`. +#[derive(Debug)] +pub enum DiscoveryEvent { + /// Event that notifies that we connected to the node with the given peer + /// id. + PeerConnected(PeerId), + + /// Event that notifies that we disconnected with the node with the given + /// peer id. + PeerDisconnected(PeerId), + + /// Discovery event + Discovery(Box), +} + +pub struct DiscoveryBehaviour { + /// Discovery behaviour + pub discovery: DerivedDiscoveryBehaviour, + /// Stream that fires when we need to perform the next random Kademlia + /// query. + pub next_kad_random_query: Interval, + /// After `next_kad_random_query` triggers, the next one triggers after this + /// duration. + pub duration_to_next_kad: Duration, + /// Events to return in priority when polled. + pub pending_events: VecDeque, + /// Number of nodes we're currently connected to. + pub n_node_connected: u64, + /// Peers + pub peers: HashSet, + /// Peer info + pub peer_info: HashMap, + /// Target peer count + pub target_peer_count: u64, + /// Options to configure dials to known peers. + pub pending_dial_opts: VecDeque, +} + +impl DiscoveryBehaviour { + /// Bootstrap Kademlia network + pub fn bootstrap(&mut self) -> Result { + if let Some(active_kad) = self.discovery.kademlia.as_mut() { + active_kad.bootstrap().map_err(|e| e.to_string()) + } else { + Err("Kademlia is not activated".to_string()) + } + } + + pub fn get_peers(&self) -> &HashSet { + &self.peers + } + + pub fn get_peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> { + self.peer_info.get(peer_id) + } + + pub fn nat_status(&self) -> autonat::NatStatus { + self.discovery.autonat.nat_status() + } + + pub fn get_peer_addresses(&self) -> HashMap> { + self.peer_info + .iter() + .map(|(peer_id, info)| (*peer_id, info.addresses.clone())) + .collect() + } +} + +impl NetworkBehaviour for DiscoveryBehaviour { + type ConnectionHandler = ::ConnectionHandler; + type ToSwarm = DiscoveryEvent; + + fn handle_established_inbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + local_addr: &Multiaddr, + remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + self.discovery.handle_established_inbound_connection( + connection_id, + peer, + local_addr, + remote_addr, + ) + } + + fn handle_established_outbound_connection( + &mut self, + connection_id: ConnectionId, + peer: PeerId, + addr: &libp2p::Multiaddr, + role_override: libp2p::core::Endpoint, + port_use: PortUse, + ) -> Result, ConnectionDenied> { + self.peer_info + .entry(peer) + .or_insert_with(|| PeerInfo { + addresses: HashSet::new(), + identify_info: None, + last_seen: std::time::SystemTime::now(), + ping_latency: None, + successes: 0, + failures: 0, + average_response_time: None, + }) + .addresses + .insert(addr.clone()); + self.discovery.handle_established_outbound_connection( + connection_id, + peer, + addr, + role_override, + port_use, + ) + } + + fn on_connection_handler_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + event: THandlerOutEvent, + ) { + self.discovery + .on_connection_handler_event(peer_id, connection, event); + } + + fn on_swarm_event(&mut self, event: FromSwarm<'_>) { + match &event { + FromSwarm::ConnectionEstablished(e) => { + if e.other_established == 0 { + self.n_node_connected += 1; + self.peers.insert(e.peer_id); + self.pending_events + .push_back(DiscoveryEvent::PeerConnected(e.peer_id)); + } + } + FromSwarm::ConnectionClosed(e) => { + if e.remaining_established == 0 { + self.n_node_connected -= 1; + self.peers.remove(&e.peer_id); + self.peer_info.remove(&e.peer_id); + self.pending_events + .push_back(DiscoveryEvent::PeerDisconnected(e.peer_id)); + } + } + _ => {} + }; + self.discovery.on_swarm_event(event) + } + + #[allow(clippy::type_complexity)] + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + // Immediately process the content of `discovered`. + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(ToSwarm::GenerateEvent(ev)); + } + + // Dial to peers + if let Some(opts) = self.pending_dial_opts.pop_front() { + return Poll::Ready(ToSwarm::Dial { opts }); + } + + // Poll the stream that fires when we need to start a random Kademlia query. + while self.next_kad_random_query.poll_tick(cx).is_ready() { + if self.n_node_connected < self.target_peer_count { + // We still have not hit the discovery max, send random request for peers. + let random_peer_id = PeerId::random(); + debug!( + "Libp2p <= Starting random Kademlia request for {:?}", + random_peer_id + ); + if let Some(kademlia) = self.discovery.kademlia.as_mut() { + kademlia.get_closest_peers(random_peer_id); + } + } + + // Schedule the next random query with exponentially increasing delay, + // capped at 60 seconds. + self.next_kad_random_query = tokio::time::interval(self.duration_to_next_kad); + // we need to reset the interval, otherwise the next tick completes immediately. + self.next_kad_random_query.reset(); + + self.duration_to_next_kad = + cmp::min(self.duration_to_next_kad * 2, Duration::from_secs(60)); + } + + // Poll discovery events. + while let Poll::Ready(ev) = self.discovery.poll(cx) { + match ev { + ToSwarm::GenerateEvent(ev) => { + match &ev { + DerivedDiscoveryBehaviourEvent::Identify(ev) => { + if let identify::Event::Received { peer_id, info, .. } = ev { + self.peer_info.entry(*peer_id).or_default().identify_info = + Some(info.clone()); + if let Some(kademlia) = self.discovery.kademlia.as_mut() { + for address in &info.listen_addrs { + kademlia.add_address(peer_id, address.clone()); + } + } + } + } + DerivedDiscoveryBehaviourEvent::Autonat(_) => {} + DerivedDiscoveryBehaviourEvent::Upnp(ev) => match ev { + upnp::Event::NewExternalAddr(addr) => { + info!("UPnP NewExternalAddr: {addr}"); + } + upnp::Event::ExpiredExternalAddr(addr) => { + info!("UPnP ExpiredExternalAddr: {addr}"); + } + upnp::Event::GatewayNotFound => { + info!("UPnP GatewayNotFound"); + } + upnp::Event::NonRoutableGateway => { + info!("UPnP NonRoutableGateway"); + } + }, + DerivedDiscoveryBehaviourEvent::Kademlia(ev) => match ev { + // Adding to Kademlia buckets is automatic with our config, + // no need to do manually. + kad::Event::RoutingUpdated { .. } => {} + kad::Event::RoutablePeer { .. } => {} + kad::Event::PendingRoutablePeer { .. } => { + // Intentionally ignore + } + other => { + trace!("Libp2p => Unhandled Kademlia event: {:?}", other) + } + }, + DerivedDiscoveryBehaviourEvent::Mdns(ev) => match ev { + MdnsEvent::Discovered(list) => { + if self.n_node_connected >= self.target_peer_count { + // Already over discovery max, don't add discovered peers. + // We could potentially buffer these addresses to be added later, + // but mdns is not an important use case and may be removed in future. + continue; + } + + // Add any discovered peers to Kademlia + for (peer_id, multiaddr) in list { + if let Some(kad) = self.discovery.kademlia.as_mut() { + kad.add_address(peer_id, multiaddr.clone()); + } + } + } + MdnsEvent::Expired(_) => {} + }, + DerivedDiscoveryBehaviourEvent::Relay(relay_event) => match relay_event { + relay::Event::ReservationReqAccepted { src_peer_id, .. } => { + debug!("Relay accepted reservation request from: {src_peer_id:#?}"); + } + relay::Event::ReservationReqDenied { src_peer_id } => { + debug!("Reservation request was denied for: {src_peer_id:#?}"); + } + relay::Event::ReservationTimedOut { src_peer_id } => { + debug!("Reservation timed out for: {src_peer_id:#?}"); + } + _ => {} + }, + } + self.pending_events + .push_back(DiscoveryEvent::Discovery(Box::new(ev))); + } + ToSwarm::Dial { opts } => { + return Poll::Ready(ToSwarm::Dial { opts }); + } + ToSwarm::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(ToSwarm::NotifyHandler { + peer_id, + handler, + event, + }) + } + ToSwarm::CloseConnection { + peer_id, + connection, + } => { + return Poll::Ready(ToSwarm::CloseConnection { + peer_id, + connection, + }) + } + ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), + ToSwarm::RemoveListener { id } => { + return Poll::Ready(ToSwarm::RemoveListener { id }) + } + ToSwarm::NewExternalAddrCandidate(addr) => { + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(addr)) + } + ToSwarm::ExternalAddrConfirmed(addr) => { + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) + } + ToSwarm::ExternalAddrExpired(addr) => { + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) + } + _ => {} + } + } + + Poll::Pending + } +} diff --git a/crates/networking/src/discovery/config.rs b/crates/networking/src/discovery/config.rs new file mode 100644 index 000000000..7eb869135 --- /dev/null +++ b/crates/networking/src/discovery/config.rs @@ -0,0 +1,173 @@ +use super::{ + behaviour::{DerivedDiscoveryBehaviour, DiscoveryBehaviour}, + new_kademlia, +}; +use gadget_logging::warn; +use libp2p::{ + autonat, identify, identity::PublicKey, mdns, relay, upnp, Multiaddr, PeerId, StreamProtocol, +}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + time::Duration, +}; + +pub struct DiscoveryConfig { + /// The local peer ID. + local_peer_id: PeerId, + /// The local public key. + local_public_key: PublicKey, + /// The bootstrap peers. + bootstrap_peers: Vec<(PeerId, Multiaddr)>, + /// The relay nodes. + relay_nodes: Vec<(PeerId, Multiaddr)>, + /// The number of peers to connect to. + target_peer_count: u64, + /// Enable mDNS discovery. + enable_mdns: bool, + /// Enable Kademlia discovery. + enable_kademlia: bool, + /// Enable UPnP discovery. + enable_upnp: bool, + /// Enable relay nodes. + enable_relay: bool, + /// The name of the network. + network_name: String, + /// Protocol version string that uniquely identifies your P2P service. + /// This should be unique to your application to avoid conflicts with other P2P networks. + /// Format recommendation: "/" + /// Example: "my-blockchain/1.0.0" or "my-chat-app/0.1.0" + protocol_version: String, +} + +impl DiscoveryConfig { + pub fn new(local_public_key: PublicKey, network_name: impl Into) -> Self { + Self { + local_peer_id: local_public_key.to_peer_id(), + local_public_key, + bootstrap_peers: Vec::new(), + relay_nodes: Vec::new(), + target_peer_count: 25, // Reasonable default + enable_mdns: true, // Enable by default for local development + enable_kademlia: true, // Enable by default for production + enable_upnp: true, // Enable by default for better connectivity + enable_relay: true, // Enable by default for relay functionality + network_name: network_name.into(), + protocol_version: String::from("gadget/1.0.0"), // Default version + } + } + + /// Set the protocol version that uniquely identifies your P2P service. + /// This should be unique to your application to avoid conflicts with other P2P networks. + /// Format recommendation: "/" + pub fn with_protocol_version(mut self, version: impl Into) -> Self { + self.protocol_version = version.into(); + self + } + + pub fn with_bootstrap_peers(mut self, peers: Vec<(PeerId, Multiaddr)>) -> Self { + self.bootstrap_peers = peers; + self + } + + pub fn with_relay_nodes(mut self, nodes: Vec<(PeerId, Multiaddr)>) -> Self { + self.relay_nodes = nodes; + self + } + + pub fn with_target_peer_count(mut self, count: u64) -> Self { + self.target_peer_count = count; + self + } + + pub fn with_mdns(mut self, enable: bool) -> Self { + self.enable_mdns = enable; + self + } + + pub fn with_kademlia(mut self, enable: bool) -> Self { + self.enable_kademlia = enable; + self + } + + pub fn with_upnp(mut self, enable: bool) -> Self { + self.enable_upnp = enable; + self + } + + pub fn with_relay(mut self, enable: bool) -> Self { + self.enable_relay = enable; + self + } + + pub fn build(self) -> anyhow::Result { + let kademlia_opt = if self.enable_kademlia { + let protocol = StreamProtocol::try_from_owned(format!( + "/gadget/kad/{}/kad/1.0.0", + self.network_name + ))?; + + let mut kademlia = new_kademlia(self.local_peer_id, protocol); + + // Add bootstrap peers + for (peer_id, addr) in &self.bootstrap_peers { + kademlia.add_address(peer_id, addr.clone()); + } + + // Start bootstrap process + if let Err(e) = kademlia.bootstrap() { + warn!("Kademlia bootstrap failed: {}", e); + } + + Some(kademlia) + } else { + None + }; + + let mdns_opt = if self.enable_mdns { + Some(mdns::Behaviour::new( + Default::default(), + self.local_peer_id, + )?) + } else { + None + }; + + let upnp_opt = if self.enable_upnp { + Some(upnp::tokio::Behaviour::default()) + } else { + None + }; + + let relay_opt = if self.enable_relay { + let relay = relay::Behaviour::new(self.local_peer_id, Default::default()); + Some(relay) + } else { + None + }; + + let behaviour = DerivedDiscoveryBehaviour { + kademlia: kademlia_opt.into(), + mdns: mdns_opt.into(), + identify: identify::Behaviour::new( + identify::Config::new(self.protocol_version, self.local_public_key) + .with_agent_version(format!("gadget-{}", env!("CARGO_PKG_VERSION"))) + .with_push_listen_addr_updates(true), + ), + autonat: autonat::Behaviour::new(self.local_peer_id, Default::default()), + upnp: upnp_opt.into(), + relay: relay_opt.into(), + }; + + Ok(DiscoveryBehaviour { + discovery: behaviour, + peers: HashSet::new(), + peer_info: HashMap::new(), + target_peer_count: self.target_peer_count, + next_kad_random_query: tokio::time::interval(Duration::from_secs(1)), + duration_to_next_kad: Duration::from_secs(1), + pending_events: VecDeque::new(), + n_node_connected: 0, + pending_dial_opts: VecDeque::new(), + }) + } +} diff --git a/crates/networking/src/discovery/mod.rs b/crates/networking/src/discovery/mod.rs new file mode 100644 index 000000000..c9aa05957 --- /dev/null +++ b/crates/networking/src/discovery/mod.rs @@ -0,0 +1,32 @@ +use std::{num::NonZero, time::Duration}; + +use libp2p::{ + kad::{self, store::MemoryStore}, + PeerId, StreamProtocol, +}; + +pub mod behaviour; +pub mod config; +pub mod peers; + +pub use peers::{PeerEvent, PeerInfo, PeerManager}; + +const MAX_ESTABLISHED_PER_PEER: u32 = 4; + +pub fn new_kademlia(peer_id: PeerId, protocol: StreamProtocol) -> kad::Behaviour { + let store = kad::store::MemoryStore::new(peer_id); + let mut config = kad::Config::new(protocol); + + // Optimize Kademlia configuration + config + .set_query_timeout(Duration::from_secs(60)) + .set_replication_factor(NonZero::new(3).unwrap()) + .set_publication_interval(Some(Duration::from_secs(120))) + .set_provider_record_ttl(Some(Duration::from_secs(24 * 60 * 60))) + .set_record_ttl(Some(Duration::from_secs(24 * 60 * 60))) + .set_parallelism(NonZero::new(5).unwrap()); + + let mut kademlia = kad::Behaviour::with_config(peer_id, store, config); + kademlia.set_mode(Some(kad::Mode::Server)); + kademlia +} diff --git a/crates/networking/src/discovery/peers.rs b/crates/networking/src/discovery/peers.rs new file mode 100644 index 000000000..a3d7679cd --- /dev/null +++ b/crates/networking/src/discovery/peers.rs @@ -0,0 +1,252 @@ +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, + time::{Duration, Instant, SystemTime}, +}; + +use crate::InstanceMsgPublicKey; +use dashmap::{DashMap, DashSet}; +use libp2p::{core::Multiaddr, identify, PeerId}; +use tokio::sync::{broadcast, RwLock}; +use tracing::debug; + +/// Information about a peer's connection and behavior +#[derive(Clone, Debug)] +pub struct PeerInfo { + /// Known addresses for the peer + pub addresses: HashSet, + /// Information from the identify protocol + pub identify_info: Option, + /// When the peer was last seen + pub last_seen: SystemTime, + /// Latest ping latency + pub ping_latency: Option, + /// Number of successful protocol interactions + pub successes: u32, + /// Number of failed protocol interactions + pub failures: u32, + /// Average response time for protocol requests + pub average_response_time: Option, +} + +impl Default for PeerInfo { + fn default() -> Self { + Self { + addresses: HashSet::new(), + identify_info: None, + last_seen: SystemTime::now(), + ping_latency: None, + successes: 0, + failures: 0, + average_response_time: None, + } + } +} + +#[derive(Debug, Clone)] +pub enum PeerEvent { + /// A peer was added or updated + PeerUpdated { peer_id: PeerId, info: PeerInfo }, + /// A peer was removed + PeerRemoved { peer_id: PeerId, reason: String }, + /// A peer was banned + PeerBanned { + peer_id: PeerId, + reason: String, + expires_at: Option, + }, + /// A peer was unbanned + PeerUnbanned { peer_id: PeerId }, +} + +pub struct PeerManager { + /// Active peers and their information + peers: DashMap, + /// Verified peers from completed handshakes + verified_peers: DashSet, + /// Handshake keys to peer ids + public_keys_to_peer_ids: Arc>>, + /// Banned peers with optional expiration time + banned_peers: DashMap>, + /// Event sender for peer updates + event_tx: broadcast::Sender, +} + +impl Default for PeerManager { + fn default() -> Self { + let (event_tx, _) = broadcast::channel(100); + Self { + peers: Default::default(), + banned_peers: Default::default(), + verified_peers: Default::default(), + public_keys_to_peer_ids: Arc::new(RwLock::new(BTreeMap::new())), + event_tx, + } + } +} + +impl PeerManager { + /// Get a subscription to peer events + pub fn subscribe(&self) -> broadcast::Receiver { + self.event_tx.subscribe() + } + + /// Update or add peer information + pub fn update_peer(&self, peer_id: PeerId, mut info: PeerInfo) { + // Update last seen time + info.last_seen = SystemTime::now(); + + // Insert or update peer info + self.peers.insert(peer_id, info.clone()); + + // Emit event + let _ = self.event_tx.send(PeerEvent::PeerUpdated { peer_id, info }); + } + + /// Remove a peer + pub fn remove_peer(&self, peer_id: &PeerId, reason: impl Into) { + if self.peers.remove(peer_id).is_some() { + let reason = reason.into(); + debug!(%peer_id, %reason, "removed peer"); + let _ = self.event_tx.send(PeerEvent::PeerRemoved { + peer_id: *peer_id, + reason, + }); + } + } + + /// Verify a peer + pub fn verify_peer(&self, peer_id: &PeerId) { + self.verified_peers.insert(*peer_id); + } + + /// Check if a peer is verified + pub fn is_peer_verified(&self, peer_id: &PeerId) -> bool { + self.verified_peers.contains(peer_id) + } + + /// Ban a peer with optional expiration + pub fn ban_peer(&self, peer_id: PeerId, reason: impl Into, duration: Option) { + let expires_at = duration.map(|d| Instant::now() + d); + + // Remove from active peers + self.remove_peer(&peer_id, "banned"); + + // Add to banned peers + self.banned_peers.insert(peer_id, expires_at); + + let reason = reason.into(); + debug!(%peer_id, %reason, "banned peer"); + let _ = self.event_tx.send(PeerEvent::PeerBanned { + peer_id, + reason, + expires_at, + }); + } + + /// Bans a peer with the default duration(`1h`) + pub async fn ban_peer_with_default_duration(&self, peer: PeerId, reason: impl Into) { + const BAN_PEER_DURATION: Duration = Duration::from_secs(60 * 60); //1h + self.ban_peer(peer, reason, Some(BAN_PEER_DURATION)) + } + + /// Unban a peer + pub fn unban_peer(&self, peer_id: &PeerId) { + if self.banned_peers.remove(peer_id).is_some() { + debug!(%peer_id, "unbanned peer"); + let _ = self + .event_tx + .send(PeerEvent::PeerUnbanned { peer_id: *peer_id }); + } + } + + /// Check if a peer is banned + pub fn is_banned(&self, peer_id: &PeerId) -> bool { + self.banned_peers.contains_key(peer_id) + } + + /// Log a successful interaction with a peer + pub fn log_success(&self, peer_id: &PeerId, duration: Duration) { + if let Some(mut info) = self.peers.get_mut(peer_id) { + info.successes += 1; + update_average_time(&mut info, duration); + self.update_peer(*peer_id, info.clone()); + } + } + + /// Log a failed interaction with a peer + pub fn log_failure(&self, peer_id: &PeerId, duration: Duration) { + if let Some(mut info) = self.peers.get_mut(peer_id) { + info.failures += 1; + update_average_time(&mut info, duration); + self.update_peer(*peer_id, info.clone()); + } + } + + /// Get peer information + pub fn get_peer_info(&self, peer_id: &PeerId) -> Option { + self.peers.get(peer_id).map(|info| info.value().clone()) + } + + /// Get all active peers + pub fn get_peers(&self) -> DashMap { + self.peers.clone() + } + + /// Get number of active peers + pub fn peer_count(&self) -> usize { + self.peers.len() + } + + /// Start the background task to clean up expired bans + pub async fn run_ban_cleanup(self: Arc) { + loop { + let now = Instant::now(); + let mut to_unban = Vec::new(); + + // Find expired bans + let banned_peers = self.banned_peers.clone().into_read_only(); + for (peer_id, expires_at) in banned_peers.iter() { + if let Some(expiry) = expires_at { + if now >= *expiry { + to_unban.push(*peer_id); + } + } + } + + // Unban expired peers + for peer_id in to_unban { + self.unban_peer(&peer_id); + } + + tokio::time::sleep(Duration::from_secs(60)).await; + } + } + + /// Add a peer id to the public key to peer id map after verifying handshake + pub async fn add_peer_id_to_public_key( + &self, + peer_id: &PeerId, + public_key: &InstanceMsgPublicKey, + ) { + self.public_keys_to_peer_ids + .write() + .await + .insert(public_key.clone(), *peer_id); + } +} + +/// Update the average response time for a peer +fn update_average_time(info: &mut PeerInfo, duration: Duration) { + const ALPHA: u32 = 5; // Smoothing factor for the moving average + + if info.average_response_time.is_none() { + info.average_response_time = Some(duration); + } else if duration < info.average_response_time.unwrap() { + let delta = (info.average_response_time.unwrap() - duration) / ALPHA; + info.average_response_time = Some(info.average_response_time.unwrap() - delta); + } else { + let delta = (duration - info.average_response_time.unwrap()) / ALPHA; + info.average_response_time = Some(info.average_response_time.unwrap() + delta); + } +} diff --git a/crates/networking/src/error.rs b/crates/networking/src/error.rs index 9c4d13ead..e2e4b056f 100644 --- a/crates/networking/src/error.rs +++ b/crates/networking/src/error.rs @@ -1,3 +1,7 @@ +use crate::NetworkEvent; + +pub type Result = core::result::Result; + #[derive(Debug, thiserror::Error)] pub enum Error { #[error("Network error: {0}")] @@ -42,14 +46,22 @@ pub enum Error { // libp2p compat #[error(transparent)] Dial(#[from] libp2p::swarm::DialError), + #[error(transparent)] Noise(#[from] libp2p::noise::Error), + #[error(transparent)] Behaviour(#[from] libp2p::BehaviourBuilderError), + #[error(transparent)] Subscription(#[from] libp2p::gossipsub::SubscriptionError), + #[error(transparent)] TransportIo(#[from] libp2p::TransportError), + #[error(transparent)] Multiaddr(#[from] libp2p::multiaddr::Error), + + #[error(transparent)] + TokioSendError(#[from] tokio::sync::mpsc::error::SendError), } diff --git a/crates/networking/src/gossip.rs b/crates/networking/src/gossip.rs deleted file mode 100644 index 2f97475cb..000000000 --- a/crates/networking/src/gossip.rs +++ /dev/null @@ -1,356 +0,0 @@ -#![allow( - missing_debug_implementations, - unused_results, - clippy::module_name_repetitions, - clippy::exhaustive_enums -)] - -use crate::behaviours::{ - GossipMessage, GossipOrRequestResponse, MyBehaviour, MyBehaviourEvent, MyBehaviourRequest, -}; -use crate::error::Error; -use crate::key_types::{GossipMsgKeyPair, GossipMsgPublicKey}; -use crate::types::{IntraNodePayload, MessageType, ParticipantInfo, ProtocolMessage}; -use async_trait::async_trait; -use gadget_crypto::hashing::blake3_256; -use gadget_std::collections::BTreeMap; -use gadget_std::string::ToString; -use gadget_std::sync::atomic::AtomicUsize; -use gadget_std::sync::Arc; -use libp2p::gossipsub::IdentTopic; -use libp2p::{swarm::SwarmEvent, PeerId}; -use lru_mem::LruCache; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::{Mutex, RwLock}; - -use crate::networking::Network; -use gadget_std::{boxed::Box, format, vec::Vec}; - -pub type InboundMapping = (IdentTopic, UnboundedSender>, Arc); - -pub struct NetworkServiceWithoutSwarm<'a> { - pub inbound_mapping: &'a [InboundMapping], - pub public_key_to_libp2p_id: Arc>>, - pub secret_key: &'a GossipMsgKeyPair, - pub connected_peers: Arc, - pub span: tracing::Span, - pub my_id: PeerId, -} - -impl<'a> NetworkServiceWithoutSwarm<'a> { - pub(crate) fn with_swarm( - &'a self, - swarm: &'a mut libp2p::Swarm, - ) -> NetworkService<'a> { - NetworkService { - swarm, - inbound_mapping: self.inbound_mapping, - public_key_to_libp2p_id: &self.public_key_to_libp2p_id, - secret_key: self.secret_key, - connected_peers: self.connected_peers.clone(), - span: &self.span, - my_id: self.my_id, - } - } -} - -pub struct NetworkService<'a> { - pub swarm: &'a mut libp2p::Swarm, - pub inbound_mapping: &'a [InboundMapping], - pub public_key_to_libp2p_id: &'a Arc>>, - pub connected_peers: Arc, - pub secret_key: &'a GossipMsgKeyPair, - pub span: &'a tracing::Span, - pub my_id: PeerId, -} - -impl NetworkService<'_> { - /// Handle local requests that are meant to be sent to the network. - pub(crate) fn handle_intra_node_payload(&mut self, msg: IntraNodePayload) { - let _enter = self.span.enter(); - match (msg.message_type, msg.payload) { - (MessageType::Broadcast, GossipOrRequestResponse::Gossip(payload)) => { - let gossip_message = bincode::serialize(&payload).expect("Should serialize"); - if let Err(e) = self - .swarm - .behaviour_mut() - .gossipsub - .publish(msg.topic, gossip_message) - { - gadget_logging::error!("Publish error: {e:?}"); - } - } - - (MessageType::P2P(peer_id), GossipOrRequestResponse::Request(req)) => { - // Send the outer payload in order to attach the topic to it - // "Requests are sent using Behaviour::send_request and the responses - // received as Message::Response via Event::Message." - self.swarm.behaviour_mut().p2p.send_request(&peer_id, req); - } - (MessageType::Broadcast, GossipOrRequestResponse::Request(_)) => { - gadget_logging::error!("Broadcasting a request is not supported"); - } - (MessageType::Broadcast, GossipOrRequestResponse::Response(_)) => { - gadget_logging::error!("Broadcasting a response is not supported"); - } - (MessageType::P2P(_), GossipOrRequestResponse::Gossip(_)) => { - gadget_logging::error!("P2P message should be a request or response"); - } - (MessageType::P2P(_), GossipOrRequestResponse::Response(_)) => { - // TODO: Send the response to the peer. - } - } - } - - /// Handle inbound events from the networking layer - #[allow(clippy::too_many_lines)] - pub(crate) async fn handle_swarm_event(&mut self, event: SwarmEvent) { - use MyBehaviourEvent::{Dcutr, Gossipsub, Identify, Kadmelia, Mdns, P2p, Ping, Relay}; - use SwarmEvent::{ - Behaviour, ConnectionClosed, ConnectionEstablished, Dialing, ExpiredListenAddr, - ExternalAddrConfirmed, ExternalAddrExpired, IncomingConnection, - IncomingConnectionError, ListenerClosed, ListenerError, NewExternalAddrCandidate, - NewExternalAddrOfPeer, NewListenAddr, OutgoingConnectionError, - }; - let _enter = self.span.enter(); - match event { - Behaviour(P2p(event)) => { - self.handle_p2p(event).await; - } - Behaviour(Gossipsub(event)) => { - self.handle_gossip(event).await; - } - Behaviour(Mdns(event)) => { - self.handle_mdns_event(event).await; - } - Behaviour(Identify(event)) => { - self.handle_identify_event(event).await; - } - Behaviour(Kadmelia(event)) => { - gadget_logging::trace!("Kadmelia event: {event:?}"); - } - Behaviour(Dcutr(event)) => { - self.handle_dcutr_event(event).await; - } - Behaviour(Relay(event)) => { - self.handle_relay_event(event).await; - } - Behaviour(Ping(event)) => { - self.handle_ping_event(event).await; - } - - NewListenAddr { - address, - listener_id, - } => { - gadget_logging::trace!("{listener_id} has a new address: {address}"); - } - ConnectionEstablished { - peer_id, - num_established, - .. - } => { - self.handle_connection_established(peer_id, num_established.get()) - .await; - } - ConnectionClosed { - peer_id, - num_established, - cause, - .. - } => { - self.handle_connection_closed(peer_id, num_established, cause) - .await; - } - IncomingConnection { - connection_id, - local_addr, - send_back_addr, - } => { - self.handle_incoming_connection(connection_id, local_addr, send_back_addr) - .await; - } - IncomingConnectionError { - connection_id, - local_addr, - send_back_addr, - error, - } => { - self.handle_incoming_connection_error( - connection_id, - local_addr, - send_back_addr, - error, - ) - .await; - } - OutgoingConnectionError { - connection_id, - peer_id, - error, - } => { - self.handle_outgoing_connection_error(connection_id, peer_id, error) - .await; - } - ExpiredListenAddr { - listener_id, - address, - } => { - gadget_logging::trace!("{listener_id} has an expired address: {address}"); - } - ListenerClosed { - listener_id, - addresses, - reason, - } => { - gadget_logging::trace!( - "{listener_id} on {addresses:?} has been closed: {reason:?}" - ); - } - ListenerError { listener_id, error } => { - gadget_logging::error!("{listener_id} has an error: {error}"); - } - Dialing { - peer_id, - connection_id, - } => { - gadget_logging::trace!( - "Dialing peer: {peer_id:?} with connection_id: {connection_id}" - ); - } - NewExternalAddrCandidate { address } => { - gadget_logging::trace!("New external address candidate: {address}"); - } - ExternalAddrConfirmed { address } => { - gadget_logging::trace!("External address confirmed: {address}"); - } - ExternalAddrExpired { address } => { - gadget_logging::trace!("External address expired: {address}"); - } - NewExternalAddrOfPeer { peer_id, address } => { - gadget_logging::trace!( - "New external address of peer: {peer_id} with address: {address}" - ); - } - unknown => { - gadget_logging::warn!("Unknown swarm event: {unknown:?}"); - } - } - } -} - -pub struct GossipHandle { - pub topic: IdentTopic, - pub tx_to_outbound: UnboundedSender, - pub rx_from_inbound: Arc>>>, - pub connected_peers: Arc, - pub public_key_to_libp2p_id: Arc>>, - pub recent_messages: parking_lot::Mutex>, - pub my_id: GossipMsgPublicKey, -} - -impl GossipHandle { - #[must_use] - pub fn connected_peers(&self) -> usize { - self.connected_peers - .load(gadget_std::sync::atomic::Ordering::Relaxed) - } - - #[must_use] - pub fn topic(&self) -> IdentTopic { - self.topic.clone() - } - - /// Returns an ordered vector of public keys of the peers that are connected to the gossipsub topic. - pub async fn peers(&self) -> Vec { - self.public_key_to_libp2p_id - .read() - .await - .keys() - .copied() - .collect() - } -} - -#[async_trait] -impl Network for GossipHandle { - async fn next_message(&self) -> Option { - loop { - let mut lock = self - .rx_from_inbound - .try_lock() - .expect("There should be only a single caller for `next_message`"); - - let message_bytes = lock.recv().await?; - drop(lock); - match bincode::deserialize::(&message_bytes) { - Ok(message) => { - let hash = blake3_256(&message_bytes); - let mut map = self.recent_messages.lock(); - if map - .insert(hash, ()) - .expect("Should not exceed memory limit (rx)") - .is_none() - { - return Some(message); - } - } - Err(e) => { - gadget_logging::error!("Failed to deserialize message (gossip): {e}"); - } - } - } - } - - async fn send_message(&self, mut message: ProtocolMessage) -> Result<(), Error> { - message.sender.public_key = Some(self.my_id); - let message_type = if let Some(ParticipantInfo { - public_key: Some(to), - .. - }) = message.recipient - { - let pub_key_to_libp2p_id = self.public_key_to_libp2p_id.read().await; - gadget_logging::trace!("Handshake count: {}", pub_key_to_libp2p_id.len()); - let libp2p_id = pub_key_to_libp2p_id - .get(&to) - .copied() - .ok_or_else(|| { - Error::NetworkError(format!( - "No libp2p ID found for crypto public key: {:?}. No handshake happened? Total handshakes: {}", - to, pub_key_to_libp2p_id.len(), - )) - })?; - - MessageType::P2P(libp2p_id) - } else { - MessageType::Broadcast - }; - - let raw_payload = - bincode::serialize(&message).map_err(|err| Error::MessagingError(err.to_string()))?; - let payload_inner = match message_type { - MessageType::Broadcast => GossipOrRequestResponse::Gossip(GossipMessage { - topic: self.topic.to_string(), - raw_payload, - }), - MessageType::P2P(_) => GossipOrRequestResponse::Request(MyBehaviourRequest::Message { - topic: self.topic.to_string(), - raw_payload, - }), - }; - - let payload = IntraNodePayload { - topic: self.topic.clone(), - payload: payload_inner, - message_type, - }; - - self.tx_to_outbound - .send(payload) - .map_err(|e| Error::NetworkError(format!("Failed to send intra-node payload: {e}"))) - } - - fn public_id(&self) -> GossipMsgPublicKey { - self.my_id - } -} diff --git a/crates/networking/src/handlers/blueprint_protocol.rs b/crates/networking/src/handlers/blueprint_protocol.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/networking/src/handlers/blueprint_protocol.rs @@ -0,0 +1 @@ + diff --git a/crates/networking/src/handlers/connections.rs b/crates/networking/src/handlers/connections.rs deleted file mode 100644 index ea8b812bf..000000000 --- a/crates/networking/src/handlers/connections.rs +++ /dev/null @@ -1,141 +0,0 @@ -#![allow(unused_results, clippy::used_underscore_binding)] - -use crate::behaviours::MyBehaviourRequest; -use crate::gossip::NetworkService; -use crate::key_types::Curve; -use gadget_crypto::KeyType; -use gadget_std as std; -use itertools::Itertools; -use libp2p::PeerId; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self))] - pub(crate) async fn handle_connection_established( - &mut self, - peer_id: PeerId, - _num_established: u32, - ) { - gadget_logging::debug!("Connection established"); - if !self - .public_key_to_libp2p_id - .read() - .await - .iter() - .any(|(_, id)| id == &peer_id) - { - let my_peer_id = *self.swarm.local_peer_id(); - let msg = my_peer_id.to_bytes(); - match ::sign_with_secret(&mut self.secret_key.clone(), &msg) { - Ok(signature) => { - let handshake = MyBehaviourRequest::Handshake { - public_key: self.secret_key.public(), - signature, - }; - self.swarm - .behaviour_mut() - .p2p - .send_request(&peer_id, handshake); - self.swarm - .behaviour_mut() - .gossipsub - .add_explicit_peer(&peer_id); - gadget_logging::info!("Sent handshake from {my_peer_id} to {peer_id}"); - } - Err(e) => { - gadget_logging::error!("Failed to sign handshake: {e}"); - } - } - } - } - - #[tracing::instrument(skip(self))] - pub(crate) async fn handle_connection_closed( - &mut self, - peer_id: PeerId, - num_established: u32, - _cause: Option, - ) { - gadget_logging::trace!("Connection closed"); - if num_established == 0 { - self.swarm - .behaviour_mut() - .gossipsub - .remove_explicit_peer(&peer_id); - let mut pub_key_to_libp2p_id = self.public_key_to_libp2p_id.write().await; - let len_initial = 0; - pub_key_to_libp2p_id.retain(|_, id| *id != peer_id); - if pub_key_to_libp2p_id.len() == len_initial + 1 { - self.connected_peers - .fetch_sub(1, std::sync::atomic::Ordering::Relaxed); - } - } - } - - #[tracing::instrument(skip(self))] - pub(crate) async fn handle_incoming_connection( - &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _local_addr: libp2p::Multiaddr, - _send_back_addr: libp2p::Multiaddr, - ) { - gadget_logging::trace!("Incoming connection"); - } - - #[tracing::instrument(skip(self))] - async fn handle_outgoing_connection( - &mut self, - peer_id: PeerId, - _connection_id: libp2p::swarm::ConnectionId, - ) { - gadget_logging::trace!("Outgoing connection to peer: {peer_id}"); - } - - #[tracing::instrument(skip(self, error))] - pub(crate) async fn handle_incoming_connection_error( - &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _local_addr: libp2p::Multiaddr, - _send_back_addr: libp2p::Multiaddr, - error: libp2p::swarm::ListenError, - ) { - gadget_logging::error!("Incoming connection error: {error}"); - } - - #[tracing::instrument(skip(self, error))] - pub(crate) async fn handle_outgoing_connection_error( - &mut self, - _connection_id: libp2p::swarm::ConnectionId, - _peer_id: Option, - error: libp2p::swarm::DialError, - ) { - if let libp2p::swarm::DialError::Transport(addrs) = error { - let read = self.public_key_to_libp2p_id.read().await; - for (addr, err) in addrs { - if let Some(peer_id) = get_peer_id_from_multiaddr(&addr) { - if !read.values().contains(&peer_id) { - gadget_logging::warn!( - "Outgoing connection error to peer: {peer_id} at {addr}: {err}", - peer_id = peer_id, - addr = addr, - err = err - ); - } - } - } - } else { - gadget_logging::error!("Outgoing connection error to peer: {error}"); - } - } -} - -fn get_peer_id_from_multiaddr(addr: &libp2p::Multiaddr) -> Option { - addr.iter() - .find_map(|proto| { - if let libp2p::multiaddr::Protocol::P2p(peer_id) = proto { - Some(Some(peer_id)) - } else { - None - } - }) - .flatten() -} diff --git a/crates/networking/src/handlers/dcutr.rs b/crates/networking/src/handlers/dcutr.rs deleted file mode 100644 index f0fd3497f..000000000 --- a/crates/networking/src/handlers/dcutr.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::gossip::NetworkService; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - pub async fn handle_dcutr_event(&mut self, event: libp2p::dcutr::Event) { - gadget_logging::trace!("DCUTR event: {event:?}"); - } -} diff --git a/crates/networking/src/handlers/discovery.rs b/crates/networking/src/handlers/discovery.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/networking/src/handlers/discovery.rs @@ -0,0 +1 @@ + diff --git a/crates/networking/src/handlers/gossip.rs b/crates/networking/src/handlers/gossip.rs deleted file mode 100644 index 67a68c913..000000000 --- a/crates/networking/src/handlers/gossip.rs +++ /dev/null @@ -1,118 +0,0 @@ -#![allow(unused_results)] - -use crate::behaviours::GossipMessage; -use crate::gossip::NetworkService; -use gadget_std::string::ToString; -use gadget_std::sync::atomic::AtomicUsize; -use gadget_std::sync::Arc; -use libp2p::gossipsub::{Event, TopicHash}; -use libp2p::{gossipsub, PeerId}; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - pub(crate) async fn handle_gossip(&mut self, event: gossipsub::Event) { - let with_connected_peers = |topic: &TopicHash, f: fn(&Arc)| { - let maybe_mapping = self - .inbound_mapping - .iter() - .find(|r| r.0.to_string() == topic.to_string()); - match maybe_mapping { - Some((_, _, connected_peers)) => { - f(connected_peers); - true - } - None => false, - } - }; - match event { - Event::Message { - propagation_source, - message_id, - message, - } => { - self.handle_gossip_message(propagation_source, message_id, message) - .await; - } - Event::Subscribed { peer_id, topic } => { - let added = with_connected_peers(&topic, |_connected_peers| { - // Code commented out because each peer needs to do a request-response - // direct P2P handshake, which is where the connected_peers counter is - // incremented. Adding here will just add twice, which is undesirable. - // connected_peers.fetch_add(1, gadget_std::sync::atomic::Ordering::Relaxed); - }); - if added { - gadget_logging::trace!("{peer_id} subscribed to {topic}"); - } else { - gadget_logging::error!("{peer_id} subscribed to unknown topic: {topic}"); - } - } - Event::Unsubscribed { peer_id, topic } => { - let removed = with_connected_peers(&topic, |_connected_peers| { - // Code commented out because each peer needs to do a request-response - // direct P2P handshake, which is where the connected_peers counter is - // decremented. Subbing here will just sub twice, which is undesirable. - // connected_peers.fetch_sub(1, gadget_std::sync::atomic::Ordering::Relaxed); - }); - if removed { - gadget_logging::trace!("{peer_id} unsubscribed from {topic}"); - } else { - gadget_logging::error!("{peer_id} unsubscribed from unknown topic: {topic}"); - } - } - Event::GossipsubNotSupported { peer_id } => { - gadget_logging::trace!("{peer_id} does not support gossipsub!"); - } - Event::SlowPeer { - peer_id, - failed_messages: _, - } => { - gadget_logging::error!("{peer_id} wasn't able to download messages in time!"); - } - } - } - - #[tracing::instrument( - skip(self, message), - fields( - %_message_id, - %_propagation_source, - source = ?message.source - ) - )] - async fn handle_gossip_message( - &mut self, - _propagation_source: PeerId, - _message_id: gossipsub::MessageId, - message: gossipsub::Message, - ) { - let Some(origin) = message.source else { - gadget_logging::error!("Got message from unknown peer"); - return; - }; - - // Reject messages from self - if origin == self.my_id { - return; - } - - gadget_logging::trace!("Got message from peer: {origin}"); - match bincode::deserialize::(&message.data) { - Ok(GossipMessage { topic, raw_payload }) => { - if let Some((_, tx, _)) = self - .inbound_mapping - .iter() - .find(|r| r.0.to_string() == topic) - { - if let Err(e) = tx.send(raw_payload) { - gadget_logging::warn!("Failed to send message to worker: {e}"); - } - } else { - gadget_logging::error!("No registered worker for topic: {topic}!"); - } - } - Err(e) => { - gadget_logging::error!("Failed to deserialize message (handlers/gossip): {e}"); - } - } - } -} diff --git a/crates/networking/src/handlers/identify.rs b/crates/networking/src/handlers/identify.rs deleted file mode 100644 index 017be564b..000000000 --- a/crates/networking/src/handlers/identify.rs +++ /dev/null @@ -1,41 +0,0 @@ -use crate::gossip::NetworkService; -use gadget_std::format; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - pub(crate) async fn handle_identify_event(&mut self, event: libp2p::identify::Event) { - use libp2p::identify::Event::{Error, Pushed, Received, Sent}; - match event { - Received { peer_id, info, .. } => { - // TODO: Verify the peer info, for example the protocol version, agent version, etc. - let info_lines = [ - format!("Protocol Version: {}", info.protocol_version), - format!("Agent Version: {}", info.agent_version), - format!("Supported Protocols: {:?}", info.protocols), - ]; - let info_lines = info_lines.join(", "); - gadget_logging::trace!( - "Received identify event from peer: {peer_id} with info: {info_lines}" - ); - self.swarm.add_external_address(info.observed_addr); - } - Sent { peer_id, .. } => { - gadget_logging::trace!("Sent identify event to peer: {peer_id}"); - } - Pushed { peer_id, info, .. } => { - let info_lines = [ - format!("Protocol Version: {}", info.protocol_version), - format!("Agent Version: {}", info.agent_version), - format!("Supported Protocols: {:?}", info.protocols), - ]; - let info_lines = info_lines.join(", "); - gadget_logging::trace!( - "Pushed identify event to peer: {peer_id} with info: {info_lines}" - ); - } - Error { peer_id, error, .. } => { - gadget_logging::error!("Identify error from peer: {peer_id} with error: {error}"); - } - } - } -} diff --git a/crates/networking/src/handlers/kadmelia.rs b/crates/networking/src/handlers/kadmelia.rs deleted file mode 100644 index fec6a8a39..000000000 --- a/crates/networking/src/handlers/kadmelia.rs +++ /dev/null @@ -1,9 +0,0 @@ -use crate::gossip::NetworkService; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - async fn handle_kadmelia_event(&mut self, event: libp2p::kad::Event) { - // TODO: Handle kadmelia events - gadget_logging::trace!("Kadmelia event: {event:?}"); - } -} diff --git a/crates/networking/src/handlers/mdns.rs b/crates/networking/src/handlers/mdns.rs deleted file mode 100644 index 9bdd1ac8d..000000000 --- a/crates/networking/src/handlers/mdns.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::gossip::NetworkService; -use libp2p::mdns; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - pub(crate) async fn handle_mdns_event(&mut self, event: mdns::Event) { - use mdns::Event::{Discovered, Expired}; - match event { - Discovered(list) => { - for (peer_id, multiaddr) in list { - gadget_logging::trace!("discovered a new peer: {peer_id} on {multiaddr}"); - self.swarm - .behaviour_mut() - .gossipsub - .add_explicit_peer(&peer_id); - if let Err(err) = self.swarm.dial(multiaddr) { - gadget_logging::error!("Failed to dial peer: {err}"); - } - } - } - Expired(list) => { - for (peer_id, multiaddr) in list { - gadget_logging::trace!("discover peer has expired: {peer_id} with {multiaddr}"); - self.swarm - .behaviour_mut() - .gossipsub - .remove_explicit_peer(&peer_id); - } - } - } - } -} diff --git a/crates/networking/src/handlers/mod.rs b/crates/networking/src/handlers/mod.rs index 63b39123e..4e5668a5d 100644 --- a/crates/networking/src/handlers/mod.rs +++ b/crates/networking/src/handlers/mod.rs @@ -1,10 +1,3 @@ -#[cfg(not(target_family = "wasm"))] -pub mod connections; -pub mod dcutr; -pub mod gossip; -pub mod identify; -pub mod kadmelia; -pub mod mdns; -pub mod p2p; +pub mod blueprint_protocol; +pub mod discovery; pub mod ping; -pub mod relay; diff --git a/crates/networking/src/handlers/p2p.rs b/crates/networking/src/handlers/p2p.rs deleted file mode 100644 index be9b279ff..000000000 --- a/crates/networking/src/handlers/p2p.rs +++ /dev/null @@ -1,204 +0,0 @@ -#![allow(unused_results)] - -use crate::behaviours::{MyBehaviourRequest, MyBehaviourResponse}; -use crate::gossip::NetworkService; -use crate::key_types::Curve; -use gadget_crypto::KeyType; -use gadget_std::string::ToString; -use gadget_std::sync::atomic::Ordering; -use libp2p::gossipsub::IdentTopic; -use libp2p::{request_response, PeerId}; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - pub(crate) async fn handle_p2p( - &mut self, - event: request_response::Event, - ) { - use request_response::Event::{InboundFailure, Message, OutboundFailure, ResponseSent}; - match event { - Message { - peer, - message, - connection_id: _, - } => { - gadget_logging::trace!("Received P2P message from: {peer}"); - self.handle_p2p_message(peer, message).await; - } - OutboundFailure { - peer, - request_id, - error, - connection_id: _, - } => { - gadget_logging::error!("Failed to send message to peer: {peer} with request_id: {request_id} and error: {error}"); - } - InboundFailure { - peer, - request_id, - error, - connection_id: _, - } => { - gadget_logging::error!("Failed to receive message from peer: {peer} with request_id: {request_id} and error: {error}"); - } - ResponseSent { - peer, - request_id, - connection_id: _, - } => { - gadget_logging::debug!( - "Sent response to peer: {peer} with request_id: {request_id}" - ); - } - } - } - - #[tracing::instrument(skip(self, message))] - async fn handle_p2p_message( - &mut self, - peer: PeerId, - message: request_response::Message, - ) { - use request_response::Message::{Request, Response}; - match message { - Request { - request, - channel, - request_id, - } => { - gadget_logging::trace!( - "Received request with request_id: {request_id} from peer: {peer}" - ); - self.handle_p2p_request(peer, request_id, request, channel) - .await; - } - Response { - response, - request_id, - } => { - gadget_logging::trace!( - "Received response from peer: {peer} with request_id: {request_id}" - ); - self.handle_p2p_response(peer, request_id, response).await; - } - } - } - - #[tracing::instrument(skip(self, req, channel))] - async fn handle_p2p_request( - &mut self, - peer: PeerId, - request_id: request_response::InboundRequestId, - req: MyBehaviourRequest, - channel: request_response::ResponseChannel, - ) { - let result = match req { - MyBehaviourRequest::Handshake { - public_key, - signature, - } => { - gadget_logging::trace!("Received handshake from peer: {peer}"); - // Verify the signature - let msg = peer.to_bytes(); - let valid = ::verify(&public_key, &msg, &signature); - if !valid { - gadget_logging::warn!("Invalid initial handshake signature from peer: {peer}"); - let _ = self.swarm.disconnect_peer_id(peer); - return; - } - if self - .public_key_to_libp2p_id - .write() - .await - .insert(public_key, peer) - .is_none() - { - let _ = self.connected_peers.fetch_add(1, Ordering::Relaxed); - } - // Send response with our public key - let my_peer_id = self.swarm.local_peer_id(); - let msg = my_peer_id.to_bytes(); - match ::sign_with_secret(&mut self.secret_key.clone(), &msg) { - Ok(signature) => self.swarm.behaviour_mut().p2p.send_response( - channel, - MyBehaviourResponse::Handshaked { - public_key: self.secret_key.public(), - signature, - }, - ), - Err(e) => { - gadget_logging::error!("Failed to sign message: {e}"); - return; - } - } - } - MyBehaviourRequest::Message { topic, raw_payload } => { - // Reject messages from self - if peer == self.my_id { - return; - } - - let topic = IdentTopic::new(topic); - if let Some((_, tx, _)) = self - .inbound_mapping - .iter() - .find(|r| r.0.to_string() == topic.to_string()) - { - if let Err(e) = tx.send(raw_payload) { - gadget_logging::warn!("Failed to send message to worker: {e}"); - } - } else { - gadget_logging::error!("No registered worker for topic: {topic}!"); - } - self.swarm - .behaviour_mut() - .p2p - .send_response(channel, MyBehaviourResponse::MessageHandled) - } - }; - if result.is_err() { - gadget_logging::error!("Failed to send response for {request_id}"); - } - } - - #[tracing::instrument(skip(self, message))] - async fn handle_p2p_response( - &mut self, - peer: PeerId, - request_id: request_response::OutboundRequestId, - message: MyBehaviourResponse, - ) { - match message { - MyBehaviourResponse::Handshaked { - public_key, - signature, - } => { - gadget_logging::trace!("Received handshake-ack message from peer: {peer}"); - let msg = peer.to_bytes(); - let valid = ::verify(&public_key, &msg, &signature); - if !valid { - gadget_logging::warn!( - "Invalid handshake-acknowledgement signature from peer: {peer}" - ); - // TODO: report this peer. - self.public_key_to_libp2p_id - .write() - .await - .remove(&public_key); - let _ = self.swarm.disconnect_peer_id(peer); - return; - } - if self - .public_key_to_libp2p_id - .write() - .await - .insert(public_key, peer) - .is_none() - { - let _ = self.connected_peers.fetch_add(1, Ordering::Relaxed); - } - } - MyBehaviourResponse::MessageHandled => {} - } - } -} diff --git a/crates/networking/src/handlers/ping.rs b/crates/networking/src/handlers/ping.rs index af93d89fd..8b1378917 100644 --- a/crates/networking/src/handlers/ping.rs +++ b/crates/networking/src/handlers/ping.rs @@ -1,8 +1 @@ -use crate::gossip::NetworkService; -impl NetworkService<'_> { - #[tracing::instrument(skip(self, _event))] - pub async fn handle_ping_event(&mut self, _event: libp2p::ping::Event) { - //gadget_logging::trace!("Ping event: {event:?}"); - } -} diff --git a/crates/networking/src/handlers/relay.rs b/crates/networking/src/handlers/relay.rs deleted file mode 100644 index 320132086..000000000 --- a/crates/networking/src/handlers/relay.rs +++ /dev/null @@ -1,8 +0,0 @@ -use crate::gossip::NetworkService; - -impl NetworkService<'_> { - #[tracing::instrument(skip(self, event))] - pub async fn handle_relay_event(&mut self, event: libp2p::relay::Event) { - gadget_logging::trace!("Relay event: {event:?}"); - } -} diff --git a/crates/networking/src/lib.rs b/crates/networking/src/lib.rs index 96de798ca..a085802ea 100644 --- a/crates/networking/src/lib.rs +++ b/crates/networking/src/lib.rs @@ -1,19 +1,18 @@ #![cfg_attr(docsrs, feature(doc_auto_cfg))] -pub mod gossip; -pub mod handlers; -pub mod networking; -#[cfg(feature = "round-based-compat")] -pub mod round_based_compat; -#[cfg(feature = "round-based-compat")] -pub use round_based; - pub mod behaviours; +pub mod blueprint_protocol; +pub mod discovery; pub mod error; -pub mod setup; +pub mod handlers; +pub mod service; pub mod types; +#[cfg(test)] +mod tests; + pub use key_types::*; +pub use service::{NetworkConfig, NetworkEvent, NetworkMessage, NetworkService}; #[cfg(all( feature = "sp-core-ecdsa", @@ -22,9 +21,15 @@ pub use key_types::*; ))] pub mod key_types { pub use gadget_crypto::sp_core::{ - SpEcdsa as Curve, SpEcdsaPair as GossipMsgKeyPair, SpEcdsaPublic as GossipMsgPublicKey, - SpEcdsaSignature as GossipSignedMsgSignature, + SpEcdsa as Curve, SpEcdsaPair as InstanceMsgKeyPair, SpEcdsaPublic as InstanceMsgPublicKey, + SpEcdsaSignature as InstanceSignedMsgSignature, }; + + impl super::KeySignExt for InstanceMsgKeyPair { + fn sign_prehash(&self, prehash: &[u8; 32]) -> InstanceSignedMsgSignature { + InstanceSignedMsgSignature(self.0.sign_prehashed(prehash)) + } + } } #[cfg(all( @@ -34,9 +39,15 @@ pub mod key_types { ))] pub mod key_types { pub use gadget_crypto::sp_core::{ - SpSr25519 as Curve, SpSr25519Pair as GossipMsgKeyPair, - SpSr25519Public as GossipMsgPublicKey, SpSr25519Signature as GossipSignedMsgSignature, + SpSr25519 as Curve, SpSr25519Pair as InstanceMsgKeyPair, + SpSr25519Public as InstanceMsgPublicKey, SpSr25519Signature as InstanceSignedMsgSignature, }; + + impl super::KeySignExt for InstanceMsgKeyPair { + fn sign_prehash(&self, prehash: &[u8; 32]) -> InstanceSignedMsgSignature { + InstanceSignedMsgSignature(self.0.sign_prehashed(prehash)) + } + } } #[cfg(all( @@ -46,9 +57,15 @@ pub mod key_types { ))] pub mod key_types { pub use gadget_crypto::sp_core::{ - SpEd25519 as Curve, SpEd25519Pair as GossipMsgKeyPair, - SpEd25519Public as GossipMsgPublicKey, SpEd25519Signature as GossipSignedMsgSignature, + SpEd25519 as Curve, SpEd25519Pair as InstanceMsgKeyPair, + SpEd25519Public as InstanceMsgPublicKey, SpEd25519Signature as InstanceSignedMsgSignature, }; + + impl super::KeySignExt for InstanceMsgKeyPair { + fn sign_prehash(&self, prehash: &[u8; 32]) -> InstanceSignedMsgSignature { + InstanceSignedMsgSignature(self.0.sign_prehashed(prehash)) + } + } } #[cfg(all( @@ -59,9 +76,15 @@ pub mod key_types { pub mod key_types { // Default to k256 ECDSA implementation pub use gadget_crypto::k256::{ - K256Ecdsa as Curve, K256Signature as GossipSignedMsgSignature, - K256SigningKey as GossipMsgKeyPair, K256VerifyingKey as GossipMsgPublicKey, + K256Ecdsa as Curve, K256Signature as InstanceSignedMsgSignature, + K256SigningKey as InstanceMsgKeyPair, K256VerifyingKey as InstanceMsgPublicKey, }; + + impl super::KeySignExt for InstanceMsgKeyPair { + fn sign_prehash(&self, prehash: &[u8; 32]) -> InstanceSignedMsgSignature { + self.sign_prehash(prehash) + } + } } // Compile-time assertion to ensure only one feature is enabled @@ -73,3 +96,7 @@ pub mod key_types { compile_error!( "Only one of 'sp-core-ecdsa', 'sp-core-sr25519', or 'sp-core-ed25519' features can be enabled at a time" ); + +pub(crate) trait KeySignExt { + fn sign_prehash(&self, prehash: &[u8; 32]) -> InstanceSignedMsgSignature; +} \ No newline at end of file diff --git a/crates/networking/src/networking.rs b/crates/networking/src/networking.rs deleted file mode 100644 index 877c93fd5..000000000 --- a/crates/networking/src/networking.rs +++ /dev/null @@ -1,552 +0,0 @@ -#[cfg(test)] -mod tests; - -use crate::error::Error; -use crate::key_types::GossipMsgPublicKey; -use crate::types::{IdentifierInfo, ParticipantInfo, ProtocolMessage, UserID}; -use async_trait::async_trait; -use dashmap::DashMap; -use futures::{Stream, StreamExt}; -use gadget_crypto::hashing::blake3_256; -use gadget_std as std; -use gadget_std::boxed::Box; -use gadget_std::cmp::Reverse; -use gadget_std::collections::{BinaryHeap, HashMap}; -use gadget_std::ops::{Deref, DerefMut}; -use gadget_std::pin::Pin; -use gadget_std::string::ToString; -use gadget_std::sync::Arc; -use gadget_std::task::{Context, Poll}; -use gadget_std::vec::Vec; -use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::Mutex; - -#[async_trait] -#[auto_impl::auto_impl(&, Box, Arc)] -pub trait Network: Send + Sync + 'static { - async fn next_message(&self) -> Option; - async fn send_message(&self, message: ProtocolMessage) -> Result<(), Error>; - - fn public_id(&self) -> GossipMsgPublicKey; - - fn build_protocol_message( - &self, - identifier_info: IdentifierInfo, - from: UserID, - to: Option, - payload: &Payload, - to_network_id: Option, - ) -> ProtocolMessage { - assert!( - (u8::from(to.is_none()) + u8::from(to_network_id.is_none()) != 1), - "Either `to` must be Some AND `to_network_id` is Some, or, both None" - ); - - let sender_participant_info = ParticipantInfo { - user_id: from, - public_key: Some(self.public_id()), - }; - let receiver_participant_info = to.map(|to| ParticipantInfo { - user_id: to, - public_key: to_network_id, - }); - ProtocolMessage { - identifier_info, - sender: sender_participant_info, - recipient: receiver_participant_info, - payload: bincode::serialize(payload).expect("Failed to serialize message"), - } - } -} - -#[derive(Debug, Serialize, Deserialize)] -struct SequencedMessage { - sequence_number: u64, - payload: Vec, -} - -#[derive(Debug)] -struct PendingMessage { - sequence_number: u64, - message: ProtocolMessage, -} - -impl PartialEq for PendingMessage { - fn eq(&self, other: &Self) -> bool { - self.sequence_number == other.sequence_number - } -} - -impl Eq for PendingMessage {} - -impl PartialOrd for PendingMessage { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for PendingMessage { - fn cmp(&self, other: &Self) -> gadget_std::cmp::Ordering { - self.sequence_number.cmp(&other.sequence_number) - } -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct MultiplexedMessage { - stream_id: StreamKey, - payload: SequencedMessage, -} - -pub struct NetworkMultiplexer { - to_receiving_streams: ActiveStreams, - unclaimed_receiving_streams: Arc>, - tx_to_networking_layer: MultiplexedSender, - sequence_numbers: Arc>, - pub my_id: GossipMsgPublicKey, -} - -type ActiveStreams = Arc>>; - -#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash, Serialize, Deserialize, Default)] -pub struct StreamKey { - pub task_hash: [u8; 32], - pub round_id: i32, -} - -impl From for StreamKey { - fn from(identifier_info: IdentifierInfo) -> Self { - let str_repr = identifier_info.to_string(); - let task_hash = blake3_256(str_repr.as_bytes()); - Self { - task_hash, - round_id: -1, - } - } -} - -pub struct MultiplexedReceiver { - inner: tokio::sync::mpsc::UnboundedReceiver, - stream_id: StreamKey, - // For post-drop removal purposes - active_streams: ActiveStreams, -} - -#[derive(Clone)] -pub struct MultiplexedSender { - inner: tokio::sync::mpsc::UnboundedSender<(StreamKey, ProtocolMessage)>, - pub(crate) stream_id: StreamKey, -} - -impl MultiplexedSender { - /// Sends a protocol message through the multiplexed channel. - /// - /// # Arguments - /// * `message` - The protocol message to send - /// - /// # Returns - /// * `Ok(())` - If the message was successfully sent - /// * `Err(Error)` - If there was an error sending the message - /// - /// # Errors - /// Returns an error if the receiving end of the channel has been closed, - /// indicating that the network connection is no longer available. - pub fn send(&self, message: ProtocolMessage) -> Result<(), Error> { - self.inner - .send((self.stream_id, message)) - .map_err(|err| Error::Other(err.to_string())) - } -} - -impl Stream for MultiplexedReceiver { - type Item = ProtocolMessage; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.get_mut().inner).poll_recv(cx) - } -} - -impl Deref for MultiplexedReceiver { - type Target = tokio::sync::mpsc::UnboundedReceiver; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for MultiplexedReceiver { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -impl Drop for MultiplexedReceiver { - fn drop(&mut self) { - let _ = self.active_streams.remove(&self.stream_id); - } -} - -// Since a single stream can be used for multiple users, and, multiple users assign seq's independently, -// we need to make a key that is unique for each (send->dest) pair and stream. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] -struct CompoundStreamKey { - stream_key: StreamKey, - send_user: UserID, - recv_user: Option, -} - -impl NetworkMultiplexer { - /// Creates a new `NetworkMultiplexer` instance. - /// - /// # Arguments - /// * `network` - The underlying network implementation that implements the Network trait - /// - /// # Type Parameters - /// * `N` - The network type that implements the Network trait - /// - /// # Returns - /// * `Self` - A new `NetworkMultiplexer` instance - /// - /// # Panics - /// This function will panic if the internal receiver has already been taken, which should not happen. - #[allow(clippy::too_many_lines)] - pub fn new(network: N) -> Self { - let (tx_to_networking_layer, mut rx_from_substreams) = - tokio::sync::mpsc::unbounded_channel(); - let my_id = network.public_id(); - let this = NetworkMultiplexer { - to_receiving_streams: Arc::new(DashMap::new()), - unclaimed_receiving_streams: Arc::new(DashMap::new()), - tx_to_networking_layer: MultiplexedSender { - inner: tx_to_networking_layer, - stream_id: StreamKey::default(), - }, - sequence_numbers: Arc::new(DashMap::new()), - my_id, - }; - - let active_streams = this.to_receiving_streams.clone(); - let unclaimed_streams = this.unclaimed_receiving_streams.clone(); - let tx_to_networking_layer = this.tx_to_networking_layer.clone(); - let sequence_numbers = this.sequence_numbers.clone(); - - drop(tokio::spawn(async move { - let network_clone = &network; - - let task1 = async move { - while let Some((stream_id, msg)) = rx_from_substreams.recv().await { - let compound_key = CompoundStreamKey { - stream_key: stream_id, - send_user: msg.sender.user_id, - recv_user: msg.recipient.as_ref().map(|p| p.user_id), - }; - - let mut seq = sequence_numbers.entry(compound_key).or_insert(0); - let current_seq = *seq; - *seq += 1; - - gadget_logging::trace!( - "SEND SEQ {current_seq} FROM {} | StreamKey: {:?}", - msg.sender.user_id, - hex::encode(bincode::serialize(&compound_key).unwrap()) - ); - - let multiplexed_message = MultiplexedMessage { - stream_id, - payload: SequencedMessage { - sequence_number: current_seq, - payload: msg.payload, - }, - }; - - let message = ProtocolMessage { - identifier_info: msg.identifier_info, - sender: msg.sender, - recipient: msg.recipient, - payload: bincode::serialize(&multiplexed_message) - .expect("Failed to serialize message"), - }; - - if let Err(err) = network_clone.send_message(message).await { - gadget_logging::error!("Failed to send message to network: {err:?}"); - break; - } - } - }; - - let task2 = async move { - let mut pending_messages: HashMap< - CompoundStreamKey, - BinaryHeap>, - > = HashMap::default(); - let mut expected_seqs: HashMap = HashMap::default(); - - while let Some(mut msg) = network_clone.next_message().await { - if let Some(recv) = msg.recipient.as_ref() { - if let Some(recv_pk) = &recv.public_key { - if recv_pk != &my_id { - gadget_logging::warn!( - "Received a message not intended for the local user" - ); - } - } - } - - let Ok(multiplexed_message) = - bincode::deserialize::(&msg.payload) - else { - gadget_logging::error!("Failed to deserialize message (networking)"); - continue; - }; - - let stream_id = multiplexed_message.stream_id; - let compound_key = CompoundStreamKey { - stream_key: stream_id, - send_user: msg.sender.user_id, - recv_user: msg.recipient.as_ref().map(|p| p.user_id), - }; - let seq = multiplexed_message.payload.sequence_number; - msg.payload = multiplexed_message.payload.payload; - - // Get or create the pending heap for this stream - let pending = pending_messages.entry(compound_key).or_default(); - let expected_seq = expected_seqs.entry(compound_key).or_default(); - - let send_user = msg.sender.user_id; - let recv_user = msg.recipient.as_ref().map(|p| p.user_id); - let compound_key_hex = hex::encode(bincode::serialize(&compound_key).unwrap()); - gadget_logging::trace!( - "RECV SEQ {seq} FROM {} as user {:?} | Expecting: {} | StreamKey: {:?}", - send_user, - recv_user, - *expected_seq, - compound_key_hex, - ); - - // Add the message to pending - pending.push(Reverse(PendingMessage { - sequence_number: seq, - message: msg, - })); - - // Try to deliver messages in order - if let Some(active_receiver) = active_streams.get(&stream_id) { - while let Some(Reverse(PendingMessage { - sequence_number, - message: _, - })) = pending.peek() - { - if *sequence_number != *expected_seq { - gadget_logging::error!( - "Sequence number mismatch, expected {} but got {}", - *expected_seq, - sequence_number - ); - break; - } - - gadget_logging::trace!("DELIVERING SEQ {seq} FROM {} as user {:?} | Expecting: {} | StreamKey: {:?}", send_user, recv_user, *expected_seq, compound_key_hex); - - *expected_seq += 1; - - let message = pending.pop().unwrap().0.message; - - if let Err(err) = active_receiver.send(message) { - gadget_logging::error!(%err, "Failed to send message to receiver"); - let _ = active_streams.remove(&stream_id); - break; - } - } - } else { - let (tx, rx) = Self::create_multiplexed_stream_inner( - tx_to_networking_layer.clone(), - &active_streams, - stream_id, - ); - - // Deliver any pending messages in order - while let Some(Reverse(PendingMessage { - sequence_number, - message: _, - })) = pending.peek() - { - if *sequence_number != *expected_seq { - gadget_logging::error!( - "Sequence number mismatch, expected {} but got {}", - *expected_seq, - sequence_number - ); - break; - } - - gadget_logging::warn!("EARLY DELIVERY SEQ {seq} FROM {} as user {:?} | Expecting: {} | StreamKey: {:?}", send_user, recv_user, *expected_seq, compound_key_hex); - - *expected_seq += 1; - - let message = pending.pop().unwrap().0.message; - - if let Err(err) = tx.send(message) { - gadget_logging::error!(%err, "Failed to send message to receiver"); - break; - } - } - - let _ = unclaimed_streams.insert(stream_id, rx); - } - } - }; - - tokio::select! { - () = task1 => { - gadget_logging::error!("Task 1 exited"); - }, - () = task2 => { - gadget_logging::error!("Task 2 exited"); - } - } - })); - - this - } - - /// Creates a new multiplexed stream. - /// - /// # Arguments - /// * `id` - The ID of the stream to create - /// - /// # Returns - /// * `Self` - A new multiplexed stream - pub fn multiplex(&self, id: impl Into) -> SubNetwork { - let id = id.into(); - let my_id = self.my_id; - let mut tx_to_networking_layer = self.tx_to_networking_layer.clone(); - if let Some(unclaimed) = self.unclaimed_receiving_streams.remove(&id) { - tx_to_networking_layer.stream_id = id; - return SubNetwork { - tx: tx_to_networking_layer, - rx: Some(unclaimed.1.into()), - my_id, - }; - } - - let (tx, rx) = Self::create_multiplexed_stream_inner( - tx_to_networking_layer, - &self.to_receiving_streams, - id, - ); - - SubNetwork { - tx, - rx: Some(rx.into()), - my_id, - } - } - - /// Creates a subnetwork, and also forwards all messages to the given channel. The network cannot be used to - /// receive messages since the messages will be forwarded to the provided channel. - /// - /// # Panics - /// - /// This function will panic if the internal receiver has already been taken, which should not happen - /// under normal circumstances. - pub fn multiplex_with_forwarding( - &self, - id: impl Into, - forward_tx: tokio::sync::mpsc::UnboundedSender, - ) -> SubNetwork { - let mut network = self.multiplex(id); - let rx = network.rx.take().expect("Rx from network should be Some"); - let forwarding_task = async move { - let mut rx = rx.into_inner(); - while let Some(msg) = rx.recv().await { - gadget_logging::info!( - "Round {}: Received message from {} to {:?} (id: {})", - msg.identifier_info.round_id, - msg.sender.user_id, - msg.recipient.as_ref().map(|p| p.user_id), - msg.identifier_info.message_id, - ); - if let Err(err) = forward_tx.send(msg) { - gadget_logging::error!(%err, "Failed to forward message to network"); - // TODO: Add AtomicBool to make sending stop - break; - } - } - }; - - drop(tokio::spawn(forwarding_task)); - - network - } - - fn create_multiplexed_stream_inner( - mut tx_to_networking_layer: MultiplexedSender, - active_streams: &ActiveStreams, - stream_id: StreamKey, - ) -> (MultiplexedSender, MultiplexedReceiver) { - let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); - if active_streams.insert(stream_id, tx).is_some() { - gadget_logging::warn!( - "Stream ID {stream_id:?} already exists! Existing stream will be replaced" - ); - } - tx_to_networking_layer.stream_id = stream_id; - - ( - tx_to_networking_layer, - MultiplexedReceiver { - inner: rx, - stream_id, - active_streams: active_streams.clone(), - }, - ) - } -} - -impl From for NetworkMultiplexer { - fn from(network: N) -> Self { - Self::new(network) - } -} - -pub struct SubNetwork { - tx: MultiplexedSender, - rx: Option>, - my_id: GossipMsgPublicKey, -} - -impl SubNetwork { - /// Sends a protocol message through the subnetwork. - /// - /// # Arguments - /// * `message` - The protocol message to send - /// - /// # Returns - /// * `Ok(())` - If the message was successfully sent - /// * `Err(Error)` - If there was an error sending the message - /// - /// # Errors - /// * Returns an error if the underlying network connection is closed or unavailable - pub fn send(&self, message: ProtocolMessage) -> Result<(), Error> { - self.tx.send(message) - } - - pub async fn recv(&self) -> Option { - self.rx.as_ref()?.lock().await.next().await - } -} - -#[async_trait] -impl Network for SubNetwork { - async fn next_message(&self) -> Option { - self.recv().await - } - - async fn send_message(&self, message: ProtocolMessage) -> Result<(), Error> { - self.send(message) - } - - fn public_id(&self) -> GossipMsgPublicKey { - self.my_id - } -} diff --git a/crates/networking/src/round_based_compat.rs b/crates/networking/src/round_based_compat.rs deleted file mode 100644 index ea9dd9e8a..000000000 --- a/crates/networking/src/round_based_compat.rs +++ /dev/null @@ -1,253 +0,0 @@ -use crate::key_types::GossipMsgPublicKey; -use crate::networking::{NetworkMultiplexer, StreamKey, SubNetwork}; -use crate::types::{IdentifierInfo, ParticipantInfo, ProtocolMessage}; -use core::pin::Pin; -use core::sync::atomic::AtomicU64; -use core::task::{ready, Context, Poll}; -use futures::prelude::*; -use gadget_std::collections::{BTreeMap, HashMap}; -use gadget_std::string::ToString; -use gadget_std::sync::Arc; -use round_based::{Delivery, Incoming, MessageType, Outgoing}; -use round_based::{MessageDestination, MsgId, PartyIndex}; -use stream::{SplitSink, SplitStream}; - -pub struct NetworkDeliveryWrapper { - /// The wrapped network implementation. - network: NetworkWrapper, -} - -impl NetworkDeliveryWrapper -where - M: Clone + Send + Unpin + 'static, - M: serde::Serialize + serde::de::DeserializeOwned, -{ - /// Create a new `NetworkDeliveryWrapper` over a network implementation with the given party index. - #[must_use] - pub fn new( - mux: Arc, - i: PartyIndex, - task_hash: [u8; 32], - parties: BTreeMap, - ) -> Self { - let (tx_forward, rx) = tokio::sync::mpsc::unbounded_channel(); - // By default, we create 10 substreams for each party. - let mut sub_streams = HashMap::new(); - for x in 0..N { - let key = StreamKey { - task_hash, - round_id: x as i32, - }; - // Creates a multiplexed subnetwork, and also forwards all messages to the given channel - let _ = sub_streams.insert(key, mux.multiplex_with_forwarding(key, tx_forward.clone())); - } - - let network = NetworkWrapper { - me: i, - mux, - message_hashes: HashMap::new(), - sub_streams, - participants: parties, - task_hash, - tx_forward, - rx, - next_msg_id: Arc::new(NextMessageId::default()), - _phantom: std::marker::PhantomData, - }; - - NetworkDeliveryWrapper { network } - } -} - -/// A `NetworkWrapper` wraps a network implementation -/// and implements [`Stream`] and [`Sink`] for it. -pub struct NetworkWrapper { - /// The current party index. - me: PartyIndex, - /// Our network Multiplexer. - mux: Arc, - /// A Map of substreams for each round. - sub_streams: HashMap, - /// A map of message hashes to their corresponding message id. - /// This is used to deduplicate messages. - message_hashes: HashMap, - /// Participants in the network with their corresponding public keys. - /// Note: This is a `BTreeMap` to ensure that the participants are sorted by their party index. - participants: BTreeMap, - /// The next message id to use. - next_msg_id: Arc, - /// A channel for forwarding messages to the network. - tx_forward: tokio::sync::mpsc::UnboundedSender, - /// A channel for receiving messages from the network. - rx: tokio::sync::mpsc::UnboundedReceiver, - /// The task hash of the current task. - task_hash: [u8; 32], - /// A phantom data type to ensure that the network wrapper is generic over the message type. - _phantom: std::marker::PhantomData, -} - -impl Delivery for NetworkDeliveryWrapper -where - M: Clone + Send + Unpin + 'static, - M: serde::Serialize + serde::de::DeserializeOwned, - M: round_based::ProtocolMessage, -{ - type Send = SplitSink, Outgoing>; - type Receive = SplitStream>; - type SendError = crate::error::Error; - type ReceiveError = crate::error::Error; - - fn split(self) -> (Self::Receive, Self::Send) { - let (sink, stream) = self.network.split(); - (stream, sink) - } -} - -impl Stream for NetworkWrapper -where - M: serde::de::DeserializeOwned + Unpin, - M: round_based::ProtocolMessage, -{ - type Item = Result, crate::error::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.get_mut(); - let res = ready!(this.rx.poll_recv(cx)); - if let Some(res) = res { - let msg_type = if res.recipient.is_some() { - MessageType::P2P - } else { - MessageType::Broadcast - }; - - let id = res.identifier_info.message_id; - - let msg: M = match serde_json::from_slice(&res.payload) { - Ok(msg) => msg, - Err(err) => { - gadget_logging::error!(%err, "Failed to deserialize message (round_based_compat)"); - return Poll::Ready(Some(Err(crate::error::Error::Other(err.to_string())))); - } - }; - - let message_hash = blake3::hash(&res.payload); - gadget_logging::debug!( - "Received message with hash {} from {} in round {}", - hex::encode(message_hash.as_bytes()), - res.sender.user_id, - res.identifier_info.round_id - ); - - if this.message_hashes.contains_key(&message_hash) { - gadget_logging::warn!( - "Received duplicate message with hash {} (id: {})", - hex::encode(message_hash.as_bytes()), - id - ); - return Poll::Ready(None); - } - - this.message_hashes.insert(message_hash, id); - - Poll::Ready(Some(Ok(Incoming { - msg, - sender: res.sender.user_id, - id, - msg_type, - }))) - } else { - Poll::Ready(None) - } - } -} - -impl Sink> for NetworkWrapper -where - M: Unpin + serde::Serialize, - M: round_based::ProtocolMessage, -{ - type Error = crate::error::Error; - - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn start_send(self: Pin<&mut Self>, out: Outgoing) -> Result<(), Self::Error> { - let this = self.get_mut(); - let id = this.next_msg_id.next(); - - let round_id = out.msg.round(); - - gadget_logging::info!( - "Round {}: Sending message from {} to {:?} (id: {})", - round_id, - this.me, - out.recipient, - id, - ); - - // Get the substream to send the message to. - let key = StreamKey { - task_hash: this.task_hash, - round_id: i32::from(round_id), - }; - let substream = this.sub_streams.entry(key).or_insert_with(|| { - this.mux - .multiplex_with_forwarding(key, this.tx_forward.clone()) - }); - - let identifier_info = IdentifierInfo { - message_id: id, - round_id, - }; - let (to, to_network_id) = match out.recipient { - MessageDestination::AllParties => (None, None), - MessageDestination::OneParty(p) => (Some(p), this.participants.get(&p).copied()), - }; - - if matches!(out.recipient, MessageDestination::OneParty(_)) && to_network_id.is_none() { - gadget_logging::warn!("Recipient not found when required for {:?}", out.recipient); - return Err(crate::error::Error::Other( - "Recipient not found".to_string(), - )); - } - - // Manually construct a `ProtocolMessage` since rounds-based - // does not work well with bincode - let protocol_message = ProtocolMessage { - identifier_info, - sender: ParticipantInfo { - user_id: this.me, - public_key: this.participants.get(&this.me).copied(), - }, - recipient: to.map(|user_id| ParticipantInfo { - user_id, - public_key: to_network_id, - }), - payload: serde_json::to_vec(&out.msg).expect("Should be able to serialize message"), - }; - - match substream.send(protocol_message) { - Ok(()) => Ok(()), - Err(e) => Err(e), - } - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} - -#[derive(Default)] -struct NextMessageId(AtomicU64); - -impl NextMessageId { - fn next(&self) -> MsgId { - self.0 - .fetch_add(1, gadget_std::sync::atomic::Ordering::Relaxed) - } -} diff --git a/crates/networking/src/service.rs b/crates/networking/src/service.rs new file mode 100644 index 000000000..27fafb4a5 --- /dev/null +++ b/crates/networking/src/service.rs @@ -0,0 +1,484 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::Duration, +}; + +use crate::{ + behaviours::{GadgetBehaviour, GadgetBehaviourEvent}, + blueprint_protocol::{BlueprintProtocolEvent, InstanceMessageRequest, InstanceMessageResponse}, + discovery::{ + behaviour::{DerivedDiscoveryBehaviour, DerivedDiscoveryBehaviourEvent, DiscoveryEvent}, + PeerInfo, PeerManager, + }, + error::Error, + key_types::{InstanceMsgKeyPair, InstanceMsgPublicKey, InstanceSignedMsgSignature}, +}; +use futures::{Stream, StreamExt}; +use gadget_logging::trace; +use libp2p::{ + core::{transport::Boxed, upgrade}, + gossipsub::{IdentTopic, Topic}, + identify, + identity::Keypair, + noise, ping, + swarm::{ConnectionId, SwarmEvent}, + tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, Transport, +}; +use tokio::sync::mpsc; +use tokio_stream::wrappers::{IntervalStream, UnboundedReceiverStream}; +use tracing::{debug, error, info, warn}; + +/// Events emitted by the network service +#[derive(Debug)] +pub enum NetworkEvent { + /// New request received from a peer + InstanceRequestInbound { + peer: PeerId, + request: InstanceMessageRequest, + }, + /// New response received from a peer + InstanceResponseInbound { + peer: PeerId, + response: InstanceMessageResponse, + }, + /// New request sent to a peer + InstanceRequestOutbound { + peer: PeerId, + request: InstanceMessageRequest, + }, + /// Response sent to a peer + InstanceResponseOutbound { + peer: PeerId, + response: InstanceMessageResponse, + }, + /// New gossip message received + GossipReceived { + source: PeerId, + topic: String, + message: Vec, + }, + /// New gossip message sent + GossipSent { topic: String, message: Vec }, + /// Peer connected + PeerConnected(PeerId), + /// Peer disconnected + PeerDisconnected(PeerId), + /// Handshake completed successfully + HandshakeCompleted { peer: PeerId }, + /// Handshake failed + HandshakeFailed { peer: PeerId, reason: String }, +} + +#[derive(Debug)] +pub enum NetworkMessage { + InstanceRequest { + peer: PeerId, + request: InstanceMessageRequest, + }, + InstanceResponse { + peer: PeerId, + response: InstanceMessageResponse, + }, + GossipMessage { + source: PeerId, + topic: String, + message: Vec, + }, + HandshakeRequest { + peer: PeerId, + public_key: InstanceMsgPublicKey, + signature: InstanceSignedMsgSignature, + }, + HandshakeResponse { + peer: PeerId, + public_key: InstanceMsgPublicKey, + signature: InstanceSignedMsgSignature, + }, +} + +/// Configuration for the network service +#[derive(Debug, Clone)] +pub struct NetworkConfig { + /// Network name/namespace + pub network_name: String, + /// Instance id for blueprint protocol + pub instance_id: String, + /// Instance secret key for blueprint protocol + pub instance_secret_key: InstanceMsgKeyPair, + /// Instance public key for blueprint protocol + pub instance_public_key: InstanceMsgPublicKey, + /// Local keypair for authentication + pub local_key: Keypair, + /// Address to listen on + pub listen_addr: Multiaddr, + /// Target number of peers to maintain + pub target_peer_count: u64, + /// Bootstrap peers to connect to + pub bootstrap_peers: Vec<(PeerId, Multiaddr)>, + /// Whether to enable mDNS discovery + pub enable_mdns: bool, + /// Whether to enable Kademlia DHT + pub enable_kademlia: bool, +} + +pub struct NetworkService { + /// The libp2p swarm + swarm: Swarm, + /// Peer manager for tracking peer states + peer_manager: Arc, + /// Channel for sending messages to the network service + network_sender: mpsc::UnboundedSender, + /// Channel for receiving messages from the network service + network_receiver: UnboundedReceiverStream, + /// Channel for sending events to the network service + event_sender: mpsc::UnboundedSender, + /// Channel for receiving events from the network service + event_receiver: UnboundedReceiverStream, + /// Network name/namespace + network_name: String, + /// Bootstrap peers + bootstrap_peers: HashMap, +} + +impl NetworkService { + /// Create a new network service + pub async fn new(config: NetworkConfig) -> Result { + let NetworkConfig { + network_name, + instance_id, + instance_secret_key, + instance_public_key, + local_key, + listen_addr, + target_peer_count, + bootstrap_peers, + enable_mdns, + enable_kademlia, + } = config; + + let peer_manager = Arc::new(PeerManager::default()); + + let blueprint_protocol_name = format!("/blueprint_protocol/{}/1.0.0", instance_id); + + // Create the swarm + let behaviour = GadgetBehaviour::new( + &network_name, + &blueprint_protocol_name, + &local_key, + &instance_secret_key, + &instance_public_key, + target_peer_count, + peer_manager.clone(), + ); + + let mut swarm = SwarmBuilder::with_existing_identity(local_key) + .with_tokio() + .with_tcp( + libp2p::tcp::Config::default().nodelay(true), + libp2p::noise::Config::new, + libp2p::yamux::Config::default, + )? + .with_quic_config(|mut config| { + config.handshake_timeout = Duration::from_secs(30); + config + }) + .with_dns()? + .with_behaviour(|_| behaviour) + .unwrap() + .build(); + + // Start listening + swarm.listen_on(listen_addr)?; + + let (network_sender, network_receiver) = mpsc::unbounded_channel(); + let (event_sender, event_receiver) = mpsc::unbounded_channel(); + + let bootstrap_peers = bootstrap_peers.into_iter().collect(); + + let service = Self { + swarm, + peer_manager, + network_sender, + network_receiver: UnboundedReceiverStream::new(network_receiver), + event_sender: event_sender.clone(), + event_receiver: UnboundedReceiverStream::new(event_receiver), + network_name, + bootstrap_peers, + }; + + Ok(service) + } + + /// Get a sender to send messages to the network service + pub fn message_sender(&self) -> mpsc::UnboundedSender { + self.network_sender.clone() + } + + /// Run the network service + async fn run(mut self, event_sender: mpsc::UnboundedSender) { + info!("Starting network service"); + + // Bootstrap with Kademlia + if let Err(e) = self.swarm.behaviour_mut().bootstrap() { + warn!("Failed to bootstrap with Kademlia: {}", e); + } + + // Connect to bootstrap peers + for (peer_id, addr) in &self.bootstrap_peers { + debug!("Dialing bootstrap peer {} at {}", peer_id, addr); + if let Err(e) = self.swarm.dial(addr.clone()) { + warn!("Failed to dial bootstrap peer: {}", e); + } + } + + let mut swarm_stream = self.swarm.fuse(); + let mut network_stream = self.network_receiver.fuse(); + loop { + tokio::select! { + message = network_stream.next() => { + match message { + Some(msg) => match handle_network_message( + swarm_stream.get_mut(), + msg, + &self.peer_manager, + &self.event_sender, + ) + .await + { + Ok(_) => {} + Err(e) => { + warn!("Failed to handle network message: {}", e); + } + }, + None => break, + } + } + swarm_event = swarm_stream.next() => match swarm_event { + // outbound events + Some(SwarmEvent::Behaviour(event)) => { + handle_behaviour_event( + swarm_stream.get_mut(), + &self.peer_manager, + event, + &self.event_sender, + &self.network_sender, + ) + .await; + }, + None => { break; }, + _ => { }, + }, + } + } + + info!("Network service stopped"); + } +} + +/// Handle a swarm event +async fn handle_swarm_event( + swarm: &mut Swarm, + peer_manager: &Arc, + event: SwarmEvent, + event_sender: &mpsc::UnboundedSender, + network_sender: &mpsc::UnboundedSender, +) -> Result<(), Error> { + match event { + SwarmEvent::Behaviour(behaviour_event) => { + handle_behaviour_event( + swarm, + peer_manager, + behaviour_event, + event_sender, + network_sender, + ) + .await? + } + _ => {} + } + + Ok(()) +} + +/// Handle a behaviour event +async fn handle_behaviour_event( + swarm: &mut Swarm, + peer_manager: &Arc, + event: GadgetBehaviourEvent, + event_sender: &mpsc::UnboundedSender, + network_sender: &mpsc::UnboundedSender, +) -> Result<(), Error> { + match event { + GadgetBehaviourEvent::ConnectionLimits(_) => {} + GadgetBehaviourEvent::Discovery(discovery_event) => { + handle_discovery_event( + &swarm.behaviour().discovery.peer_info, + peer_manager, + discovery_event, + event_sender, + network_sender, + &swarm.behaviour().blueprint_protocol.blueprint_protocol_name, + ) + .await? + } + GadgetBehaviourEvent::BlueprintProtocol(blueprint_event) => { + handle_blueprint_protocol_event( + swarm, + peer_manager, + blueprint_event, + event_sender, + network_sender, + ) + .await? + } + GadgetBehaviourEvent::Ping(ping_event) => { + handle_ping_event( + swarm, + peer_manager, + ping_event, + event_sender, + network_sender, + ) + .await? + } + } + + Ok(()) +} + +/// Handle a discovery event +async fn handle_discovery_event( + peer_info_map: &HashMap, + peer_manager: &Arc, + event: DiscoveryEvent, + event_sender: &mpsc::UnboundedSender, + network_sender: &mpsc::UnboundedSender, + blueprint_protocol_name: &str, +) -> Result<(), Error> { + match event { + DiscoveryEvent::PeerConnected(peer_id) => { + trace!("Peer connected, {peer_id}"); + event_sender.send(NetworkEvent::PeerConnected(peer_id))?; + } + DiscoveryEvent::PeerDisconnected(peer_id) => { + trace!("Peer disconnected, {peer_id}"); + event_sender.send(NetworkEvent::PeerDisconnected(peer_id))?; + } + DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event { + DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received { + peer_id, + info, + .. + }) => { + let protocols: HashSet = + HashSet::from_iter(info.protocols.iter().map(|p| p.to_string())); + if !protocols.contains(blueprint_protocol_name) { + peer_manager + .ban_peer_with_default_duration(*peer_id, "hello protocol unsupported"); + } + } + DerivedDiscoveryBehaviourEvent::Identify(_) => {} + _ => {} + }, + }; + + Ok(()) +} + +/// Handle a blueprint event +async fn handle_blueprint_protocol_event( + swarm: &mut Swarm, + peer_manager: &Arc, + event: BlueprintProtocolEvent, + event_sender: &mpsc::UnboundedSender, + network_sender: &mpsc::UnboundedSender, +) -> Result<(), Error> { + match event { + BlueprintProtocolEvent::Request { + peer, + request, + channel, + } => event_sender.send(NetworkEvent::InstanceRequestInbound { peer, request })?, + BlueprintProtocolEvent::Response { + peer, + response, + request_id, + } => event_sender.send(NetworkEvent::InstanceResponseInbound { peer, response })?, + BlueprintProtocolEvent::GossipMessage { + source, + topic, + message, + } => event_sender.send(NetworkEvent::GossipReceived { + source, + topic: topic.to_string(), + message, + })?, + } + + Ok(()) +} + +/// Handle a ping event +async fn handle_ping_event( + swarm: &mut Swarm, + peer_manager: &Arc, + event: ping::Event, + event_sender: &mpsc::UnboundedSender, + network_sender: &mpsc::UnboundedSender, +) -> Result<(), Error> { + match event.result { + Ok(rtt) => { + trace!( + "PingSuccess::Ping rtt to {} is {} ms", + event.peer, + rtt.as_millis() + ); + } + Err(ping::Failure::Unsupported) => { + debug!(peer=%event.peer, "Ping protocol unsupported"); + } + Err(ping::Failure::Timeout) => { + debug!("Ping timeout: {}", event.peer); + } + Err(ping::Failure::Other { error }) => { + debug!("Ping failure: {error}"); + } + } + + Ok(()) +} + +/// Handle a network message +async fn handle_network_message( + swarm: &mut Swarm, + msg: NetworkMessage, + peer_manager: &Arc, + event_sender: &mpsc::UnboundedSender, +) -> Result<(), Error> { + match msg { + NetworkMessage::InstanceRequest { peer, request } => { + event_sender.send(NetworkEvent::InstanceRequestOutbound { peer, request })? + } + NetworkMessage::InstanceResponse { peer, response } => { + event_sender.send(NetworkEvent::InstanceResponseOutbound { peer, response })? + } + NetworkMessage::GossipMessage { + source, + topic, + message, + } => event_sender.send(NetworkEvent::GossipSent { topic, message })?, + NetworkMessage::HandshakeRequest { + peer, + public_key, + signature, + } => event_sender.send(NetworkEvent::HandshakeCompleted { peer })?, + NetworkMessage::HandshakeResponse { + peer, + public_key, + signature, + } => event_sender.send(NetworkEvent::HandshakeCompleted { peer })?, + } + + Ok(()) +} diff --git a/crates/networking/src/setup.rs b/crates/networking/src/setup.rs index 0a3ee5595..e69de29bb 100644 --- a/crates/networking/src/setup.rs +++ b/crates/networking/src/setup.rs @@ -1,341 +0,0 @@ -#![allow(unused_results, missing_docs)] - -use ::std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use ::std::str::FromStr; - -use crate::behaviours::MyBehaviour; -use crate::error::Error; -use crate::gossip::{GossipHandle, NetworkServiceWithoutSwarm}; -pub use crate::key_types::GossipMsgKeyPair; -use crate::types::{IntraNodePayload, MAX_MESSAGE_SIZE}; -use futures::StreamExt; -use gadget_std as std; -use gadget_std::collections::BTreeMap; -use gadget_std::io; -use gadget_std::string::String; -use gadget_std::sync::atomic::AtomicUsize; -use gadget_std::sync::Arc; -use gadget_std::time::Duration; -use gadget_std::vec; -use gadget_std::vec::Vec; -use libp2p::multiaddr::Protocol; -use libp2p::Multiaddr; -use libp2p::{ - gossipsub, gossipsub::IdentTopic, kad::store::MemoryStore, mdns, request_response, - swarm::dial_opts::DialOpts, StreamProtocol, -}; -use lru_mem::LruCache; -use tokio::select; -use tokio::sync::{Mutex, RwLock}; -use tokio::task::{spawn, JoinHandle}; - -/// The version of the gadget sdk -pub const AGENT_VERSION: &str = "tangle/gadget-sdk/1.0.0"; -/// The version of the client -pub const CLIENT_VERSION: &str = "1.0.0"; - -/// The base network configuration for a blueprint's `libp2p` network. -/// -/// This configuration is used to setup the `libp2p` network for a blueprint. -/// Construct using [`NetworkConfig::new`] for advanced users or -/// [`NetworkConfig::new_service_network`] ordinarily. -pub struct NetworkConfig { - pub identity: libp2p::identity::Keypair, - pub secret_key: GossipMsgKeyPair, - pub bootnodes: Vec, - pub bind_port: u16, - pub topics: Vec, -} - -impl gadget_std::fmt::Debug for NetworkConfig { - fn fmt(&self, f: &mut gadget_std::fmt::Formatter<'_>) -> gadget_std::fmt::Result { - f.debug_struct("NetworkConfig") - .field("identity", &self.identity) - .field("bootnodes", &self.bootnodes) - .field("bind_port", &self.bind_port) - .field("topics", &self.topics) - .finish_non_exhaustive() - } -} - -impl NetworkConfig { - /// For advanced use only. Use `NetworkConfig::new_service_network` for ordinary use. - /// This function allows for the creation of a network with multiple topics. - #[must_use] - pub fn new( - identity: libp2p::identity::Keypair, - secret_key: GossipMsgKeyPair, - bootnodes: Vec, - bind_port: u16, - topics: Vec, - ) -> Self { - Self { - identity, - secret_key, - bootnodes, - bind_port, - topics, - } - } - - /// When constructing a network for a single service, the service name is used as the network name. - /// Each service within a blueprint must have a unique network name. - pub fn new_service_network>( - identity: libp2p::identity::Keypair, - secret_key: GossipMsgKeyPair, - bootnodes: Vec, - bind_port: u16, - service_name: T, - ) -> Self { - Self::new( - identity, - secret_key, - bootnodes, - bind_port, - vec![service_name.into()], - ) - } -} - -/// Start a P2P network with the given configuration. -/// -/// Each service will only have one network. It is necessary that each service calling this function -/// uses a distinct network name, otherwise, the network will not be able to distinguish between -/// the different services. -/// -/// # Arguments -/// -/// * `config` - The network configuration. -/// -/// # Errors -/// -/// Returns an error if the network setup fails. -pub fn start_p2p_network(config: NetworkConfig) -> Result { - if config.topics.len() != 1 { - return Err(Error::TooManyTopics(config.topics.len())); - } - - let (networks, _) = multiplexed_libp2p_network(config)?; - let network = networks.into_iter().next().ok_or(Error::NoNetworkFound)?.1; - Ok(network) -} - -pub type NetworkResult = Result<(BTreeMap, JoinHandle<()>), Error>; - -#[allow(clippy::collapsible_else_if, clippy::too_many_lines)] -/// Starts the multiplexed libp2p network with the given configuration. -/// -/// # Arguments -/// -/// * `config` - The network configuration. -/// -/// # Errors -/// -/// Returns an error if the network setup fails. -/// -/// # Panics -/// -/// Panics if the network name is invalid. -pub fn multiplexed_libp2p_network(config: NetworkConfig) -> NetworkResult { - // Setup both QUIC (UDP) and TCP transports the increase the chances of NAT traversal - - use gadget_std::collections::BTreeMap; - gadget_logging::trace!("Building P2P Network with config: {config:?}"); - let NetworkConfig { - identity, - bootnodes, - bind_port, - topics, - secret_key, - } = config; - - // Ensure all topics are unique - let topics_unique = topics - .iter() - .cloned() - .collect::>() - .into_iter() - .collect::>(); - - if topics_unique.len() != topics.len() { - return Err(Error::DuplicateTopics); - } - - let networks = topics; - - let my_pk = secret_key.public(); - let my_id = identity.public().to_peer_id(); - - let mut swarm = libp2p::SwarmBuilder::with_existing_identity(identity) - .with_tokio() - .with_tcp( - libp2p::tcp::Config::default().nodelay(true), // Allow port reuse for TCP-hole punching - libp2p::noise::Config::new, - libp2p::yamux::Config::default, - )? - .with_quic_config(|mut config| { - config.handshake_timeout = Duration::from_secs(30); - config - }) - .with_dns()? - .with_behaviour(|key| { - // Set a custom gossipsub configuration - let gossipsub_config = gossipsub::ConfigBuilder::default() - .protocol_id_prefix("/tangle/gadget-binary-sdk/meshsub") - .max_transmit_size(MAX_MESSAGE_SIZE) - .validate_messages() - .validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing) - .build() - .map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`. - - // Setup gossipsub network behaviour for broadcasting - let gossipsub = gossipsub::Behaviour::new( - gossipsub::MessageAuthenticity::Signed(key.clone()), - gossipsub_config, - )?; - - // Setup mDNS for peer discovery - let mdns = - mdns::tokio::Behaviour::new(mdns::Config::default(), key.public().to_peer_id())?; - - // Setup request-response for direct messaging - let p2p_config = request_response::Config::default(); - // StreamProtocols MUST begin with a forward slash - let protocols = networks - .iter() - .map(|n| { - ( - StreamProtocol::try_from_owned(n.clone()).expect("Invalid network name"), - request_response::ProtocolSupport::Full, - ) - }) - .collect::>(); - - let p2p = request_response::Behaviour::new(protocols, p2p_config); - - // Setup the identify protocol for peers to exchange information about each other, a requirement for kadmelia DHT - let identify = libp2p::identify::Behaviour::new( - libp2p::identify::Config::new(CLIENT_VERSION.into(), key.public()) - .with_agent_version(AGENT_VERSION.into()), - ); - - // Setup kadmelia for DHT for peer discovery over a larger network - let memory_db = MemoryStore::new(key.public().to_peer_id()); - let kadmelia = libp2p::kad::Behaviour::new(key.public().to_peer_id(), memory_db); - - // Setup dcutr for upgrading existing connections to use relay against the bootnodes when necessary - // This also provided hole-punching capabilities to attempt to seek a direct connection, and fallback to relaying - // otherwise. - // dcutr = direct connection upgrade through relay - let dcutr = libp2p::dcutr::Behaviour::new(key.public().to_peer_id()); - - // Setup relay for using the dcutr-upgraded connections to relay messages for other peers when required - let relay_config = libp2p::relay::Config::default(); - let relay = libp2p::relay::Behaviour::new(key.public().to_peer_id(), relay_config); - - // Setup ping for liveness checks between connections - let ping = libp2p::ping::Behaviour::new(libp2p::ping::Config::default()); - - // Setup autonat for NAT traversal - let autonat = libp2p::autonat::Behaviour::new( - key.public().to_peer_id(), - libp2p::autonat::Config::default(), - ); - - Ok(MyBehaviour { - gossipsub, - mdns, - p2p, - identify, - kadmelia, - dcutr, - relay, - ping, - autonat, - }) - })? - .with_swarm_config(|c| c.with_idle_connection_timeout(Duration::from_secs(60))) - .build(); - - gadget_logging::trace!("~~~ Starting P2P Network Setup Phase 1 ~~~"); - - // Subscribe to all networks - let mut inbound_mapping = Vec::new(); - let (tx_to_outbound, mut rx_to_outbound) = - tokio::sync::mpsc::unbounded_channel::(); - let public_key_to_libp2p_id = Arc::new(RwLock::new(BTreeMap::new())); - let mut handles_ret = BTreeMap::new(); - let connected_peers = Arc::new(AtomicUsize::new(0)); - for network in networks { - let topic = IdentTopic::new(network.clone()); - swarm.behaviour_mut().gossipsub.subscribe(&topic)?; - let (inbound_tx, inbound_rx) = tokio::sync::mpsc::unbounded_channel(); - inbound_mapping.push((topic.clone(), inbound_tx, connected_peers.clone())); - - handles_ret.insert( - network, - GossipHandle { - connected_peers: connected_peers.clone(), - topic, - tx_to_outbound: tx_to_outbound.clone(), - rx_from_inbound: Arc::new(Mutex::new(inbound_rx)), - public_key_to_libp2p_id: public_key_to_libp2p_id.clone(), - // Each key is 32 bytes, therefore 512 messages hashes can be stored in the set - recent_messages: LruCache::new(16 * 1024).into(), - my_id: my_pk, - }, - ); - } - - gadget_logging::trace!("~~~ Starting P2P Network Setup Phase 2 ~~~"); - - let ips_to_bind_to = [ - IpAddr::from_str("::").unwrap(), // IN_ADDR_ANY_V6 - IpAddr::from_str("0.0.0.0").unwrap(), // IN_ADDR_ANY_V4 - ]; - - for addr in ips_to_bind_to { - let ip_label = if addr.is_ipv4() { "ip4" } else { "ip6" }; - // Bind to both UDP and TCP to increase probability of successful NAT traversal. - // Use QUIC over UDP to have reliable ordered transport like TCP. - swarm.listen_on(format!("/{ip_label}/{addr}/udp/{bind_port}/quic-v1").parse()?)?; - swarm.listen_on(format!("/{ip_label}/{addr}/tcp/{bind_port}").parse()?)?; - } - - // Dial all bootnodes - for bootnode in &bootnodes { - swarm.dial( - DialOpts::unknown_peer_id() - .address(bootnode.clone()) - .build(), - )?; - } - - let worker = async move { - let span = tracing::debug_span!("network_worker"); - let _enter = span.enter(); - let service = NetworkServiceWithoutSwarm { - inbound_mapping: &inbound_mapping, - connected_peers, - public_key_to_libp2p_id, - secret_key: &secret_key, - span: tracing::debug_span!(parent: &span, "network_service"), - my_id, - }; - - loop { - select! { - // Setup outbound channel - Some(msg) = rx_to_outbound.recv() => { - service.with_swarm(&mut swarm).handle_intra_node_payload(msg); - } - event = swarm.select_next_some() => { - service.with_swarm(&mut swarm).handle_swarm_event(event).await; - } - } - } - }; - - let spawn_handle = spawn(worker); - Ok((handles_ret, spawn_handle)) -} diff --git a/crates/networking/src/tests/mod.rs b/crates/networking/src/tests/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/crates/networking/src/tests/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/networking/src/types.rs b/crates/networking/src/types.rs index d60ccc70b..63b7e5e07 100644 --- a/crates/networking/src/types.rs +++ b/crates/networking/src/types.rs @@ -1,78 +1,112 @@ -use crate::{behaviours::GossipOrRequestResponse, key_types::GossipMsgPublicKey}; +use crate::key_types::InstanceMsgPublicKey; use libp2p::{gossipsub::IdentTopic, PeerId}; use serde::{Deserialize, Serialize}; use std::fmt::Display; -/// Maximum allowed size for a Signed Message. +/// Maximum allowed size for a message payload pub const MAX_MESSAGE_SIZE: usize = 16 * 1024 * 1024; -/// Unique identifier for a party -pub type UserID = u16; +/// Unique identifier for a participant in the network +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct ParticipantId(pub u16); -/// Type of message to be sent -pub enum MessageType { - Broadcast, - P2P(PeerId), +impl Into for ParticipantId { + fn into(self) -> u16 { + self.0 + } +} + +/// Type of message delivery mechanism +#[derive(Debug, Clone)] +pub enum MessageDelivery { + /// Broadcast to all peers via gossipsub + Broadcast { + /// The topic to broadcast on + topic: IdentTopic, + }, + /// Direct P2P message to a specific peer + Direct { + /// The target peer ID + peer_id: PeerId, + }, } -#[derive(Debug, Serialize, Deserialize, Clone, Copy, Default)] -pub struct IdentifierInfo { +/// Message routing information +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MessageRouting { + /// Unique identifier for this message pub message_id: u64, + /// The round/sequence number this message belongs to pub round_id: u16, + /// The sender's information + pub sender: ParticipantInfo, + /// Optional recipient information for direct messages + pub recipient: Option, } -impl Display for IdentifierInfo { - fn fmt(&self, f: &mut gadget_std::fmt::Formatter<'_>) -> gadget_std::fmt::Result { - let message_id = format!("message_id: {}", self.message_id); - let round_id = format!("round_id: {}", self.round_id); - write!(f, "{} {}", message_id, round_id) - } +/// Information about a participant in the network +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ParticipantInfo { + /// The participant's unique ID + pub id: ParticipantId, + /// The participant's public key (if known) + pub public_key: Option, } -#[derive(Debug, Serialize, Deserialize, Clone, Copy)] -pub struct ParticipantInfo { - pub user_id: u16, - pub public_key: Option, +/// A protocol message that can be sent over the network +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ProtocolMessage { + /// Routing information for the message + pub routing: MessageRouting, + /// The actual message payload + pub payload: Vec, } -impl Display for ParticipantInfo { - fn fmt(&self, f: &mut gadget_std::fmt::Formatter<'_>) -> gadget_std::fmt::Result { - let public_key = self - .public_key - .map(|key| format!("public_key: {:?}", key)) - .unwrap_or_default(); - write!(f, "user_id: {}, {}", self.user_id, public_key) - } +/// Internal representation of a message to be sent +#[derive(Debug)] +pub struct OutboundMessage { + /// The message to be sent + pub message: ProtocolMessage, + /// How the message should be delivered + pub delivery: MessageDelivery, } -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct ProtocolMessage { - pub identifier_info: IdentifierInfo, - pub sender: ParticipantInfo, - pub recipient: Option, - pub payload: Vec, +impl Display for ParticipantId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "P{}", self.0) + } } -impl Display for ProtocolMessage { - fn fmt(&self, f: &mut gadget_std::fmt::Formatter<'_>) -> gadget_std::fmt::Result { +impl Display for MessageRouting { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "identifier_info: {}, sender: {}, recipient: {:?}, payload: {:?}", - self.identifier_info, self.sender, self.recipient, self.payload + "msg={} round={} from={} to={:?}", + self.message_id, + self.round_id, + self.sender.id, + self.recipient.as_ref().map(|r| r.id) ) } } -pub struct IntraNodePayload { - pub topic: IdentTopic, - pub payload: GossipOrRequestResponse, - pub message_type: MessageType, +impl Display for ParticipantInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} key={}", + self.id, + if self.public_key.is_some() { + "yes" + } else { + "no" + } + ) + } } -impl gadget_std::fmt::Debug for IntraNodePayload { - fn fmt(&self, f: &mut gadget_std::fmt::Formatter<'_>) -> gadget_std::fmt::Result { - f.debug_struct("IntraNodePayload") - .field("topic", &self.topic) - .finish_non_exhaustive() +impl Display for ProtocolMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} payload_size={}", self.routing, self.payload.len()) } }