Skip to content

Commit

Permalink
Start new networking (#665)
Browse files Browse the repository at this point in the history
Co-authored-by: Serial <69764315+Serial-ATA@users.noreply.github.com>
  • Loading branch information
drewstone and Serial-ATA authored Feb 20, 2025
1 parent 195032a commit 3341094
Show file tree
Hide file tree
Showing 33 changed files with 2,277 additions and 2,192 deletions.
46 changes: 46 additions & 0 deletions .cursor/rules/p2p.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
---
description: Peer to peer networking expert & Rust engineer
globs: **/*.rs
---
# Peer to peer networking expert

## P2P Development Cursor (Rust + libp2p)

1. **Initial Analysis**
- Review input requirements and resources
- Identify minimal viable protocol components
- Review code that exists.
- Never build new when it already exists. Improve instead, remove, and optimize.
- Move tests to a `src/tests` directory, create it if it doesn't exist.

2. **Implementation Flow**
- Start with concise implementations and build from there.
- Suggest improvements with sound reasoning.
- Leverage libp2p's modular design patterns
- Leverage existing code and improve it, optimize it.
- When a code snippet is provided, understand it, and adapt it with existing code. These are meant as resources not as copy/pastes.

3. **Code Standards**
- Implement proper error handling with custom error types
- Prioritize concise and efficient code.
- Add relevant and detailed documentation
- Always put tests inside a `src/tests` directory

4. **Efficiency Guidelines**
- Prefer bounded channels for peer message handling
- Implement connection pooling where appropriate
- Leverage existing libp2p protocols before custom ones

5. **Review & Integration**
- Verify protocol compatibility
- Test network behaviour under various conditions
- Test in `src/tests` directory
- Ensure proper resource cleanup
- Document failure modes and recovery

6. **Core Principles**
- Start minimal, expand as needed
- Test thoroughly between iterations
- Maintain clear protocol boundaries
- Document network assumptions and requirements
-
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,12 @@ gadget-runner-symbiotic = { version = "0.1.0", path = "./crates/runners/symbioti
gadget-config = { version = "0.1.0", path = "./crates/config", default-features = false }
gadget-keystore = { version = "0.1.0", path = "./crates/keystore", default-features = false }
gadget-logging = { version = "0.1.0", path = "./crates/logging", default-features = false }
gadget-networking = { version = "0.1.0", path = "./crates/networking", default-features = false }
gadget-std = { version = "0.1.0", path = "./crates/std", default-features = false }

# P2P
gadget-networking = { version = "0.1.0", path = "./crates/networking", default-features = false }
gadget-networking-behaviours = { version = "0.1.0", path = "./crates/networking/behaviours", default-features = false }

# Utilities
gadget-utils = { version = "0.1.0", path = "./crates/utils", default-features = false }
gadget-utils-evm = { version = "0.1.0", path = "./crates/utils/evm", default-features = false }
Expand Down Expand Up @@ -145,6 +148,7 @@ futures-util = { version = "0.3.31", default-features = false }
tokio = { version = "1.40", default-features = false }
tokio-util = { version = "0.7.12", default-features = false }
tokio-cron-scheduler = "0.13.0"
tokio-stream = { version = "0.1.17", default-features = false }

# CLI & Configuration
cargo-generate = { version = "0.22.1", default-features = false }
Expand Down
3 changes: 3 additions & 0 deletions crates/networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ blake3 = { workspace = true }
dashmap = { workspace = true }
libp2p = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
tokio-stream = { workspace = true, features = ["time"] }
futures = { workspace = true }
tracing = { workspace = true }
bincode = { workspace = true }
Expand All @@ -30,6 +31,7 @@ hex = { workspace = true }
itertools = { workspace = true, features = ["use_alloc"] }
parking_lot = { workspace = true }
thiserror = { workspace = true }
anyhow = { workspace = true }

# Crypto dependencies
gadget-crypto = { workspace = true, features = ["k256", "hashing"] }
Expand Down Expand Up @@ -58,6 +60,7 @@ features = [
"ping",
"dns",
"autonat",
"upnp",
]

[dev-dependencies]
Expand Down
160 changes: 112 additions & 48 deletions crates/networking/src/behaviours.rs
Original file line number Diff line number Diff line change
@@ -1,55 +1,119 @@
use crate::key_types::{GossipMsgPublicKey, GossipSignedMsgSignature};
use libp2p::{gossipsub, kad::store::MemoryStore, mdns, request_response, swarm::NetworkBehaviour};
use serde::{Deserialize, Serialize};

#[non_exhaustive]
#[derive(Serialize, Deserialize, Debug)]
// TODO: Needs better name
pub enum GossipOrRequestResponse {
Gossip(GossipMessage),
Request(MyBehaviourRequest),
Response(MyBehaviourResponse),
}
use crate::key_types::{InstanceMsgKeyPair, InstanceMsgPublicKey};
use crate::{
blueprint_protocol::{BlueprintProtocolBehaviour, BlueprintProtocolEvent},
discovery::{
behaviour::{DiscoveryBehaviour, DiscoveryEvent},
config::DiscoveryConfig,
PeerInfo, PeerManager,
},
};
use libp2p::{
connection_limits::{self, ConnectionLimits},
identity::Keypair,
kad::QueryId,
ping,
swarm::NetworkBehaviour,
Multiaddr, PeerId,
};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
time::Duration,
};

#[derive(Serialize, Deserialize, Debug)]
pub struct GossipMessage {
pub topic: String,
pub raw_payload: Vec<u8>,
}
const MAX_ESTABLISHED_PER_PEER: u32 = 4;

#[non_exhaustive]
#[derive(Serialize, Deserialize, Debug)]
pub enum MyBehaviourRequest {
Handshake {
public_key: GossipMsgPublicKey,
signature: GossipSignedMsgSignature,
},
Message {
topic: String,
raw_payload: Vec<u8>,
},
/// Events that can be emitted by the GadgetBehavior
#[derive(Debug)]
pub enum GadgetEvent {
/// Discovery-related events
Discovery(DiscoveryEvent),
/// Ping events for connection liveness
Ping(ping::Event),
/// Blueprint protocol events
Blueprint(BlueprintProtocolEvent),
}

#[non_exhaustive]
#[derive(Serialize, Deserialize, Debug)]
pub enum MyBehaviourResponse {
Handshaked {
public_key: GossipMsgPublicKey,
signature: GossipSignedMsgSignature,
},
MessageHandled,
#[derive(NetworkBehaviour)]
pub struct GadgetBehaviour {
/// Connection limits to prevent DoS
connection_limits: connection_limits::Behaviour,
/// Discovery mechanisms (Kademlia, mDNS, etc)
pub(super) discovery: DiscoveryBehaviour,
/// Direct P2P messaging and gossip
pub(super) blueprint_protocol: BlueprintProtocolBehaviour,
/// Connection liveness checks
ping: ping::Behaviour,
}

// We create a custom network behaviour that combines Gossipsub and Mdns.
#[derive(NetworkBehaviour)]
pub struct MyBehaviour {
pub gossipsub: gossipsub::Behaviour,
pub mdns: mdns::tokio::Behaviour,
pub p2p: request_response::cbor::Behaviour<MyBehaviourRequest, MyBehaviourResponse>,
pub identify: libp2p::identify::Behaviour,
pub kadmelia: libp2p::kad::Behaviour<MemoryStore>,
pub dcutr: libp2p::dcutr::Behaviour,
pub relay: libp2p::relay::Behaviour,
pub ping: libp2p::ping::Behaviour,
pub autonat: libp2p::autonat::Behaviour,
impl GadgetBehaviour {
pub fn new(
network_name: &str,
blueprint_protocol_name: &str,
local_key: &Keypair,
instance_secret_key: &InstanceMsgKeyPair,
instance_public_key: &InstanceMsgPublicKey,
target_peer_count: u64,
peer_manager: Arc<PeerManager>,
) -> Self {
let connection_limits = connection_limits::Behaviour::new(
ConnectionLimits::default()
.with_max_pending_incoming(Some(
target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER,
))
.with_max_pending_outgoing(Some(
target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER,
))
.with_max_established_incoming(Some(
target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER,
))
.with_max_established_outgoing(Some(
target_peer_count as u32 * MAX_ESTABLISHED_PER_PEER,
))
.with_max_established_per_peer(Some(MAX_ESTABLISHED_PER_PEER)),
);

let ping = ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(30)));

let discovery = DiscoveryConfig::new(local_key.public(), network_name)
.with_mdns(true)
.with_kademlia(true)
.with_target_peer_count(target_peer_count)
.build()
.unwrap();

let blueprint_protocol = BlueprintProtocolBehaviour::new(
local_key,
instance_secret_key,
instance_public_key,
peer_manager,
blueprint_protocol_name,
);

Self {
connection_limits,
discovery,
blueprint_protocol,
ping,
}
}

/// Bootstrap Kademlia network
pub fn bootstrap(&mut self) -> Result<QueryId, String> {
self.discovery.bootstrap()
}

/// Returns a set of peer ids
pub fn peers(&self) -> &HashSet<PeerId> {
self.discovery.get_peers()
}

/// Returns a map of peer ids and their multi-addresses
pub fn peer_addresses(&self) -> HashMap<PeerId, HashSet<Multiaddr>> {
self.discovery.get_peer_addresses()
}

pub fn peer_info(&self, peer_id: &PeerId) -> Option<&PeerInfo> {
self.discovery.get_peer_info(peer_id)
}
}
Loading

0 comments on commit 3341094

Please # to comment.