diff --git a/Cargo.toml b/Cargo.toml index 865cddaa..75cf77f3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ serde_bytes = "0.11.5" thiserror = "1.0.49" crc = "3.0.1" sha1_smol = "1.0.0" -flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false } +flume = { version = "0.11.0", features = [], default-features = false} ed25519-dalek = "2.1.0" bytes = "1.5.0" tracing = "0.1" diff --git a/examples/announce_peer.rs b/examples/announce_peer.rs index e22ca694..f879d077 100644 --- a/examples/announce_peer.rs +++ b/examples/announce_peer.rs @@ -36,8 +36,6 @@ fn announce(dht: &Dht, info_hash: Id) { let start = Instant::now(); dht.announce_peer(info_hash, Some(6991)) - .recv() - .unwrap() .expect("announce_peer failed"); println!( diff --git a/examples/get_immutable.rs b/examples/get_immutable.rs index 824c8577..1438611b 100644 --- a/examples/get_immutable.rs +++ b/examples/get_immutable.rs @@ -35,29 +35,24 @@ fn main() { fn get_immutable(dht: &Dht, info_hash: Id) { let start = Instant::now(); - let receiever = &mut dht.get_immutable(info_hash); - // No need to stream responses, just print the first result, since // all immutable data items are guaranteed to be the same. - match receiever.recv() { - Ok(value) => { - let string = String::from_utf8(value.to_vec()) - .expect("expected immutable data to be valid utf-8 for this demo"); - - println!( - "Got result in {:?} milliseconds\n", - start.elapsed().as_millis() - ); - - println!("Got immutable data: {:?}", string); - - println!( - "\nQuery exhausted in {:?} milliseconds", - start.elapsed().as_millis(), - ); - } - Err(_) => { - println!("\nFailed to find the immutable value for the provided info_hash",); - } - } + let value = dht + .get_immutable(info_hash) + .expect("Failed to find the immutable value for the provided info_hash"); + + let string = String::from_utf8(value.to_vec()) + .expect("expected immutable data to be valid utf-8 for this demo"); + + println!( + "Got result in {:?} milliseconds\n", + start.elapsed().as_millis() + ); + + println!("Got immutable data: {:?}", string); + + println!( + "\nQuery exhausted in {:?} milliseconds", + start.elapsed().as_millis(), + ); } diff --git a/examples/get_mutable.rs b/examples/get_mutable.rs index ae000451..c7ca5bc0 100644 --- a/examples/get_mutable.rs +++ b/examples/get_mutable.rs @@ -41,10 +41,8 @@ fn lookup(dht: &Dht, public_key: VerifyingKey) { let mut first = false; let mut count = 0; - let receiver = dht.get_mutable(public_key.as_bytes(), None); - println!("Streaming mutable items:"); - while let Ok(item) = receiver.recv() { + for item in dht.get_mutable(public_key.as_bytes(), None).unwrap() { count += 1; if !first { diff --git a/examples/get_peers.rs b/examples/get_peers.rs index 8df68fae..dc1f39dd 100644 --- a/examples/get_peers.rs +++ b/examples/get_peers.rs @@ -41,9 +41,7 @@ fn get_peers(dht: &Dht, info_hash: &Id) { let mut count = 0; - let receiver = dht.get_peers(*info_hash); - - while let Ok(peer) = receiver.recv() { + for peer in dht.get_peers(*info_hash).unwrap() { if !first { first = true; println!( diff --git a/examples/put_immutable.rs b/examples/put_immutable.rs index 7b5568ac..2c5c8d78 100644 --- a/examples/put_immutable.rs +++ b/examples/put_immutable.rs @@ -36,8 +36,6 @@ fn put_immutable(dht: &Dht, value: &Bytes) { let info_hash = dht .put_immutable(value.to_owned()) - .recv() - .unwrap() .expect("put immutable failed"); println!( diff --git a/examples/put_mutable.rs b/examples/put_mutable.rs index dd2b28dc..6546c9f5 100644 --- a/examples/put_mutable.rs +++ b/examples/put_mutable.rs @@ -54,10 +54,7 @@ fn main() { fn put(dht: &Dht, item: &MutableItem) { let start = Instant::now(); - dht.put_mutable(item.clone()) - .recv() - .unwrap() - .expect("Put mutable failed"); + dht.put_mutable(item.clone()).expect("Put mutable failed"); println!( "Stored mutable data as {:?} in {:?} milliseconds", diff --git a/src/async_dht.rs b/src/async_dht.rs new file mode 100644 index 00000000..1fc364c8 --- /dev/null +++ b/src/async_dht.rs @@ -0,0 +1,358 @@ +//! AsyncDht node. + +use bytes::Bytes; +use std::{net::SocketAddr, thread::JoinHandle}; + +use crate::{ + common::{ + hash_immutable, target_from_key, Id, MutableItem, PutResult, ResponseSender, RoutingTable, + }, + dht::{ActorMessage, Dht}, + messages::{ + AnnouncePeerRequestArguments, GetPeersRequestArguments, GetValueRequestArguments, + PutImmutableRequestArguments, PutMutableRequestArguments, PutRequestSpecific, + RequestTypeSpecific, + }, + Result, +}; + +impl Dht { + pub fn as_async(self) -> AsyncDht { + AsyncDht(self) + } +} + +#[derive(Debug, Clone)] +pub struct AsyncDht(Dht); + +impl AsyncDht { + // === Getters === + + /// Returns the local address of the udp socket this node is listening on. + pub async fn local_addr(&self) -> Result { + let (sender, receiver) = flume::bounded::(1); + + self.0.sender.send(ActorMessage::LocalAddress(sender))?; + + Ok(receiver.recv_async().await?) + } + + /// Returns a clone of the [RoutingTable] table of this node. + pub async fn routing_table(&self) -> Result { + let (sender, receiver) = flume::bounded::(1); + + self.0.sender.send(ActorMessage::RoutingTable(sender))?; + + Ok(receiver.recv_async().await?) + } + + /// Returns the size of the [RoutingTable] without cloning the entire table. + pub async fn routing_table_size(&self) -> Result { + let (sender, receiver) = flume::bounded::(1); + + self.0.sender.send(ActorMessage::RoutingTableSize(sender))?; + + Ok(receiver.recv_async().await?) + } + + /// Returns the `JoinHandle` of the actor thread + pub fn handle(self) -> Option> { + self.0.handle() + } + + // === Public Methods === + + pub fn shutdown(&self) -> Result<()> { + self.0.shutdown() + } + + // === Peers === + + /// Get peers for a given infohash. + /// + /// Note: each node of the network will only return a _random_ subset (usually 20) + /// of the total peers it has for a given infohash, so if you are getting responses + /// from 20 nodes, you can expect up to 400 peers in total, but if there are more + /// announced peers on that infohash, you are likely to miss some, the logic here + /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange" + /// so if you are implementing something different from Bittorrent, you might want + /// to implement your own logic for gossipping more peers after you discover the first ones. + pub fn get_peers(&self, info_hash: Id) -> Result> { + // Get requests use unbounded channels to avoid blocking in the run loop. + // Other requests like put_* and getters don't need that and is ok with + // bounded channel with 1 capacity since it only ever sends one message back. + // + // So, if it is a ResponseMessage<_>, it should be unbounded, otherwise bounded. + let (sender, receiver) = flume::unbounded::(); + + let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash }); + + self.0.sender.send(ActorMessage::Get( + info_hash, + request, + ResponseSender::Peer(sender), + ))?; + + Ok(receiver.into_stream()) + } + + /// Announce a peer for a given infohash. + /// + /// The peer will be announced on this process IP. + /// If explicit port is passed, it will be used, otherwise the port will be implicitly + /// assumed by remote nodes to be the same ase port they recieved the request from. + pub async fn announce_peer(&self, info_hash: Id, port: Option) -> Result { + let (sender, receiver) = flume::bounded::(1); + + let (port, implied_port) = match port { + Some(port) => (port, None), + None => (0, Some(true)), + }; + + let request = PutRequestSpecific::AnnouncePeer(AnnouncePeerRequestArguments { + info_hash, + port, + implied_port, + }); + + self.0 + .sender + .send(ActorMessage::Put(info_hash, request, sender))?; + + receiver.recv_async().await? + } + + // === Immutable data === + + /// Get an Immutable data by its sha1 hash. + pub async fn get_immutable(&self, target: Id) -> Result { + let (sender, receiver) = flume::unbounded::(); + + let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { + target, + seq: None, + salt: None, + }); + + self.0.sender.send(ActorMessage::Get( + target, + request, + ResponseSender::Immutable(sender), + ))?; + + Ok(receiver.recv_async().await?) + } + + /// Put an immutable data to the DHT. + pub async fn put_immutable(&self, value: Bytes) -> Result { + let target = Id::from_bytes(hash_immutable(&value)).unwrap(); + + let (sender, receiver) = flume::bounded::(1); + + let request = PutRequestSpecific::PutImmutable(PutImmutableRequestArguments { + target, + v: value.clone().into(), + }); + + self.0 + .sender + .send(ActorMessage::Put(target, request, sender))?; + + receiver.recv_async().await? + } + + // === Mutable data === + + /// Get a mutable data by its public_key and optional salt. + pub fn get_mutable( + &self, + public_key: &[u8; 32], + salt: Option, + ) -> Result> { + let target = target_from_key(public_key, &salt); + + let (sender, receiver) = flume::unbounded::(); + + let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { + target, + seq: None, + salt, + }); + + let _ = self.0.sender.send(ActorMessage::Get( + target, + request, + ResponseSender::Mutable(sender), + )); + + Ok(receiver.into_stream()) + } + + /// Put a mutable data to the DHT. + pub async fn put_mutable(&self, item: MutableItem) -> Result { + let (sender, receiver) = flume::bounded::(1); + + let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments { + target: *item.target(), + v: item.value().clone().into(), + k: item.key().to_vec(), + seq: *item.seq(), + sig: item.signature().to_vec(), + salt: item.salt().clone().map(|s| s.to_vec()), + cas: *item.cas(), + }); + + let _ = self + .0 + .sender + .send(ActorMessage::Put(*item.target(), request, sender)); + + receiver.recv_async().await? + } +} + +#[cfg(test)] +mod test { + use std::str::FromStr; + + use ed25519_dalek::SigningKey; + use futures::StreamExt; + + use crate::Testnet; + + use super::*; + + #[test] + fn shutdown() { + async fn test() { + let dht = Dht::default().as_async(); + + dht.local_addr().await.unwrap(); + + let a = dht.clone(); + + dht.shutdown().unwrap(); + dht.handle().map(|h| h.join()); + + let local_addr = a.local_addr().await; + assert!(local_addr.is_err()); + } + futures::executor::block_on(test()); + } + + #[test] + fn bind_twice() { + async fn test() { + let a = Dht::default().as_async(); + let b = Dht::builder() + .port(a.local_addr().await.unwrap().port()) + .as_server() + .build() + .as_async(); + + let result = b.handle().unwrap().join(); + assert!(result.is_err()); + } + + futures::executor::block_on(test()); + } + + #[test] + fn announce_get_peer() { + async fn test() { + let testnet = Testnet::new(10); + + let a = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + let b = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + + let info_hash = Id::random(); + + a.announce_peer(info_hash, Some(45555)) + .await + .expect("failed to announce"); + + let peer = b + .get_peers(info_hash) + .unwrap() + .next() + .await + .expect("No peers"); + + assert_eq!(peer.port(), 45555); + } + + futures::executor::block_on(test()); + } + + #[test] + fn put_get_immutable() { + async fn test() { + let testnet = Testnet::new(10); + + let a = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + let b = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + + let value: Bytes = "Hello World!".into(); + let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap(); + + let target = a.put_immutable(value.clone()).await.unwrap(); + assert_eq!(target, expected_target); + + let response = b.get_immutable(target).await.unwrap(); + assert_eq!(response, value); + } + + futures::executor::block_on(test()); + } + + #[test] + fn put_get_mutable() { + async fn test() { + let testnet = Testnet::new(10); + + let a = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + let b = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + + let signer = SigningKey::from_bytes(&[ + 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7, + 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103, + ]); + + let seq = 1000; + let value: Bytes = "Hello World!".into(); + + let item = MutableItem::new(signer.clone(), value, seq, None); + + a.put_mutable(item.clone()).await.unwrap(); + + let response = b + .get_mutable(signer.verifying_key().as_bytes(), None) + .unwrap() + .next() + .await + .expect("No mutable values"); + + assert_eq!(&response, &item); + } + + futures::executor::block_on(test()); + } +} diff --git a/src/dht.rs b/src/dht.rs index 7eb76326..01faa8ba 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -1,4 +1,4 @@ -//! Dht node./ +//! Dht node. use std::{ net::SocketAddr, @@ -148,33 +148,39 @@ impl Dht { pub fn local_addr(&self) -> Result { let (sender, receiver) = flume::bounded::(1); - let _ = self.sender.send(ActorMessage::LocalAddress(sender)); + self.sender.send(ActorMessage::LocalAddress(sender))?; - receiver.recv().map_err(|e| e.into()) + Ok(receiver.recv()?) } /// Returns a clone of the [RoutingTable] table of this node. pub fn routing_table(&self) -> Result { let (sender, receiver) = flume::bounded::(1); - let _ = self.sender.send(ActorMessage::RoutingTable(sender)); + self.sender.send(ActorMessage::RoutingTable(sender))?; - receiver.recv().map_err(|e| e.into()) + Ok(receiver.recv()?) } /// Returns the size of the [RoutingTable] without cloning the entire table. pub fn routing_table_size(&self) -> Result { let (sender, receiver) = flume::bounded::(1); - let _ = self.sender.send(ActorMessage::RoutingTableSize(sender)); + self.sender.send(ActorMessage::RoutingTableSize(sender))?; - receiver.recv().map_err(|e| e.into()) + Ok(receiver.recv()?) + } + + /// Returns the `JoinHandle` of the actor thread + pub fn handle(self) -> Option> { + self.handle } // === Public Methods === - pub fn shutdown(&self) { - let _ = self.sender.send(ActorMessage::Shutdown).ok(); + pub fn shutdown(&self) -> Result<()> { + self.sender.send(ActorMessage::Shutdown)?; + Ok(()) } // === Peers === @@ -188,7 +194,7 @@ impl Dht { /// for Bittorrent is that any peer will introduce you to more peers through "peer exchange" /// so if you are implementing something different from Bittorrent, you might want /// to implement your own logic for gossipping more peers after you discover the first ones. - pub fn get_peers(&self, info_hash: Id) -> Receiver { + pub fn get_peers(&self, info_hash: Id) -> Result> { // Get requests use unbounded channels to avoid blocking in the run loop. // Other requests like put_* and getters don't need that and is ok with // bounded channel with 1 capacity since it only ever sends one message back. @@ -198,13 +204,13 @@ impl Dht { let request = RequestTypeSpecific::GetPeers(GetPeersRequestArguments { info_hash }); - let _ = self.sender.send(ActorMessage::Get( + self.sender.send(ActorMessage::Get( info_hash, request, ResponseSender::Peer(sender), - )); + ))?; - receiver + Ok(receiver.into_iter()) } /// Announce a peer for a given infohash. @@ -212,7 +218,7 @@ impl Dht { /// The peer will be announced on this process IP. /// If explicit port is passed, it will be used, otherwise the port will be implicitly /// assumed by remote nodes to be the same ase port they recieved the request from. - pub fn announce_peer(&self, info_hash: Id, port: Option) -> Receiver { + pub fn announce_peer(&self, info_hash: Id, port: Option) -> Result { let (sender, receiver) = flume::bounded::(1); let (port, implied_port) = match port { @@ -226,17 +232,16 @@ impl Dht { implied_port, }); - let _ = self - .sender - .send(ActorMessage::Put(info_hash, request, sender)); + self.sender + .send(ActorMessage::Put(info_hash, request, sender))?; - receiver + receiver.recv()? } // === Immutable data === /// Get an Immutable data by its sha1 hash. - pub fn get_immutable(&self, target: Id) -> Receiver { + pub fn get_immutable(&self, target: Id) -> Result { let (sender, receiver) = flume::unbounded::(); let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { @@ -245,17 +250,17 @@ impl Dht { salt: None, }); - let _ = self.sender.send(ActorMessage::Get( + self.sender.send(ActorMessage::Get( target, request, ResponseSender::Immutable(sender), - )); + ))?; - receiver + Ok(receiver.recv()?) } /// Put an immutable data to the DHT. - pub fn put_immutable(&self, value: Bytes) -> Receiver { + pub fn put_immutable(&self, value: Bytes) -> Result { let target = Id::from_bytes(hash_immutable(&value)).unwrap(); let (sender, receiver) = flume::bounded::(1); @@ -265,15 +270,20 @@ impl Dht { v: value.clone().into(), }); - let _ = self.sender.send(ActorMessage::Put(target, request, sender)); + self.sender + .send(ActorMessage::Put(target, request, sender))?; - receiver + receiver.recv()? } // === Mutable data === /// Get a mutable data by its public_key and optional salt. - pub fn get_mutable(&self, public_key: &[u8; 32], salt: Option) -> Receiver { + pub fn get_mutable( + &self, + public_key: &[u8; 32], + salt: Option, + ) -> Result> { let target = target_from_key(public_key, &salt); let (sender, receiver) = flume::unbounded::(); @@ -290,11 +300,11 @@ impl Dht { ResponseSender::Mutable(sender), )); - receiver + Ok(receiver.into_iter()) } /// Put a mutable data to the DHT. - pub fn put_mutable(&self, item: MutableItem) -> Receiver { + pub fn put_mutable(&self, item: MutableItem) -> Result { let (sender, receiver) = flume::bounded::(1); let request = PutRequestSpecific::PutMutable(PutMutableRequestArguments { @@ -311,18 +321,11 @@ impl Dht { .sender .send(ActorMessage::Put(*item.target(), request, sender)); - receiver + receiver.recv()? } // === Private Methods === - #[cfg(test)] - fn block_until_shutdown(self) { - if let Some(handle) = self.handle { - let _ = handle.join(); - } - } - fn run(&mut self, settings: DhtSettings, receiver: Receiver) { let mut rpc = Rpc::new().unwrap().with_read_only(settings.read_only); @@ -370,7 +373,7 @@ impl Default for Dht { } } -pub(crate) enum ActorMessage { +pub enum ActorMessage { LocalAddress(Sender), RoutingTable(Sender), RoutingTableSize(Sender), @@ -413,7 +416,6 @@ impl Testnet { #[cfg(test)] mod test { use std::str::FromStr; - use std::time::Duration; use ed25519_dalek::SigningKey; @@ -423,15 +425,15 @@ mod test { fn shutdown() { let dht = Dht::default(); - let clone = dht.clone(); - thread::spawn(move || { - thread::sleep(Duration::from_millis(50)); + dht.local_addr().unwrap(); - clone.shutdown(); - }); + let a = dht.clone(); - // TODO: verify correct error if we call anything after shutdown. - dht.block_until_shutdown(); + dht.shutdown().unwrap(); + dht.handle().map(|h| h.join()); + + let local_addr = a.local_addr(); + assert!(local_addr.is_err()); } #[test] @@ -455,14 +457,12 @@ mod test { let info_hash = Id::random(); - match a.announce_peer(info_hash, Some(45555)).recv() { - Ok(_) => { - let peer = b.get_peers(info_hash).recv().expect("No respnoses"); + a.announce_peer(info_hash, Some(45555)) + .expect("failed to announce"); - assert_eq!(peer.port(), 45555); - } - Err(_) => {} - }; + let peer = b.get_peers(info_hash).unwrap().next().expect("No peers"); + + assert_eq!(peer.port(), 45555); } #[test] @@ -475,10 +475,10 @@ mod test { let value: Bytes = "Hello World!".into(); let expected_target = Id::from_str("e5f96f6f38320f0f33959cb4d3d656452117aadb").unwrap(); - let target = a.put_immutable(value.clone()).recv().unwrap().unwrap(); + let target = a.put_immutable(value.clone()).unwrap(); assert_eq!(target, expected_target); - let response = b.get_immutable(target).recv().unwrap(); + let response = b.get_immutable(target).unwrap(); assert_eq!(response, value); } @@ -499,12 +499,13 @@ mod test { let item = MutableItem::new(signer.clone(), value, seq, None); - a.put_mutable(item.clone()).recv().unwrap().unwrap(); + a.put_mutable(item.clone()).unwrap(); let response = b .get_mutable(signer.verifying_key().as_bytes(), None) - .recv() - .expect("No respnoses"); + .unwrap() + .next() + .expect("No mutable values"); assert_eq!(&response, &item); } diff --git a/src/error.rs b/src/error.rs index 4637aaaa..f10986d0 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,6 +1,6 @@ //! Main Crate Error -use crate::messages::ErrorSpecific; +use crate::{dht::ActorMessage, messages::ErrorSpecific}; #[derive(thiserror::Error, Debug)] /// Mainline crate error enum. @@ -38,6 +38,10 @@ pub enum Error { /// Transparent [std::io::Error] Receive(#[from] flume::RecvError), + #[error(transparent)] + /// Transparent [std::io::Error] + DhtIsShutdown(#[from] flume::SendError), + #[error("Invalid mutable item signature")] InvalidMutableSignature, diff --git a/src/lib.rs b/src/lib.rs index dd6ccc2b..cdb8bfd1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,8 @@ //! Rust implementation of read-only BitTorrent Mainline DHT client. // Public modules +#[cfg(feature = "async")] +pub mod async_dht; pub mod common; pub mod dht; pub mod error; diff --git a/src/rpc/query.rs b/src/rpc/query.rs index 2400cac0..6fa440c9 100644 --- a/src/rpc/query.rs +++ b/src/rpc/query.rs @@ -195,7 +195,6 @@ pub struct PutQuery { error: Option, } -// TODO: can we make both queries the same thing? impl PutQuery { pub fn new(target: Id, request: PutRequestSpecific, sender: Option>) -> Self { Self {