diff --git a/examples/get_mutable.rs b/examples/get_mutable.rs index c7ca5bc0..2b8155c7 100644 --- a/examples/get_mutable.rs +++ b/examples/get_mutable.rs @@ -42,7 +42,7 @@ fn lookup(dht: &Dht, public_key: VerifyingKey) { let mut count = 0; println!("Streaming mutable items:"); - for item in dht.get_mutable(public_key.as_bytes(), None).unwrap() { + for item in dht.get_mutable(public_key.as_bytes(), None, None).unwrap() { count += 1; if !first { diff --git a/src/async_dht.rs b/src/async_dht.rs index 1fc364c8..f0206120 100644 --- a/src/async_dht.rs +++ b/src/async_dht.rs @@ -63,7 +63,8 @@ impl AsyncDht { // === Public Methods === pub fn shutdown(&self) -> Result<()> { - self.0.shutdown() + self.0.shutdown(); + self.0.handle.join() } // === Peers === @@ -168,16 +169,13 @@ impl AsyncDht { &self, public_key: &[u8; 32], salt: Option, + seq: Option, ) -> Result> { let target = target_from_key(public_key, &salt); let (sender, receiver) = flume::unbounded::(); - let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { - target, - seq: None, - salt, - }); + let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt }); let _ = self.0.sender.send(ActorMessage::Get( target, @@ -344,7 +342,7 @@ mod test { a.put_mutable(item.clone()).await.unwrap(); let response = b - .get_mutable(signer.verifying_key().as_bytes(), None) + .get_mutable(signer.verifying_key().as_bytes(), None, None) .unwrap() .next() .await @@ -355,4 +353,42 @@ mod test { futures::executor::block_on(test()); } + + #[test] + fn put_get_mutable_no_more_recent_value() { + async fn test() { + let testnet = Testnet::new(10); + + let a = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + let b = Dht::builder() + .bootstrap(&testnet.bootstrap) + .build() + .as_async(); + + let signer = SigningKey::from_bytes(&[ + 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7, + 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103, + ]); + + let seq = 1000; + let value: Bytes = "Hello World!".into(); + + let item = MutableItem::new(signer.clone(), value, seq, None); + + a.put_mutable(item.clone()).await.unwrap(); + + let response = b + .get_mutable(signer.verifying_key().as_bytes(), None, Some(seq)) + .unwrap() + .next() + .await; + + assert!(&response.is_none()); + } + + futures::executor::block_on(test()); + } } diff --git a/src/dht.rs b/src/dht.rs index 01faa8ba..29428691 100644 --- a/src/dht.rs +++ b/src/dht.rs @@ -282,17 +282,14 @@ impl Dht { pub fn get_mutable( &self, public_key: &[u8; 32], + seq: Option, salt: Option, ) -> Result> { let target = target_from_key(public_key, &salt); let (sender, receiver) = flume::unbounded::(); - let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { - target, - seq: None, - salt, - }); + let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt }); let _ = self.sender.send(ActorMessage::Get( target, @@ -502,11 +499,38 @@ mod test { a.put_mutable(item.clone()).unwrap(); let response = b - .get_mutable(signer.verifying_key().as_bytes(), None) + .get_mutable(signer.verifying_key().as_bytes(), None, None) .unwrap() .next() .expect("No mutable values"); assert_eq!(&response, &item); } + + #[test] + fn put_get_mutable_no_more_recent_value() { + let testnet = Testnet::new(10); + + let a = Dht::builder().bootstrap(&testnet.bootstrap).build(); + let b = Dht::builder().bootstrap(&testnet.bootstrap).build(); + + let signer = SigningKey::from_bytes(&[ + 56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7, + 228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103, + ]); + + let seq = 1000; + let value: Bytes = "Hello World!".into(); + + let item = MutableItem::new(signer.clone(), value, seq, None); + + a.put_mutable(item.clone()).unwrap(); + + let response = b + .get_mutable(signer.verifying_key().as_bytes(), Some(seq), None) + .unwrap() + .next(); + + assert!(&response.is_none()); + } } diff --git a/src/messages/internal.rs b/src/messages/internal.rs index 61ebe2dd..b0f76a2b 100644 --- a/src/messages/internal.rs +++ b/src/messages/internal.rs @@ -96,6 +96,11 @@ pub enum DHTResponseSpecific { arguments: DHTGetMutableResponseArguments, }, + NoMoreRecentValue { + #[serde(rename = "r")] + arguments: DHTNoMoreRecentValueResponseArguments, + }, + GetImmutable { #[serde(rename = "r")] arguments: DHTGetImmutableResponseArguments, @@ -259,6 +264,21 @@ pub struct DHTGetImmutableResponseArguments { pub v: Vec, } +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] +pub struct DHTNoMoreRecentValueResponseArguments { + #[serde(with = "serde_bytes")] + pub id: Vec, + + #[serde(with = "serde_bytes")] + pub token: Vec, + + #[serde(with = "serde_bytes")] + #[serde(default)] + pub nodes: Option>, + + pub seq: i64, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct DHTGetMutableResponseArguments { #[serde(with = "serde_bytes")] diff --git a/src/messages/mod.rs b/src/messages/mod.rs index 457033e7..24d14d7b 100644 --- a/src/messages/mod.rs +++ b/src/messages/mod.rs @@ -82,6 +82,7 @@ pub enum ResponseSpecific { GetImmutable(GetImmutableResponseArguments), GetMutable(GetMutableResponseArguments), NoValues(NoValuesResponseArguments), + NoMoreRecentValue(NoMoreRecentValueResponseArguments), } // === PING === @@ -168,6 +169,14 @@ pub struct GetMutableResponseArguments { pub sig: Vec, } +#[derive(Debug, PartialEq, Clone)] +pub struct NoMoreRecentValueResponseArguments { + pub responder_id: Id, + pub token: Vec, + pub nodes: Option>, + pub seq: i64, +} + // === Put Immutable === #[derive(Debug, PartialEq, Clone)] @@ -356,6 +365,16 @@ impl Message { }, } } + ResponseSpecific::NoMoreRecentValue(args) => { + internal::DHTResponseSpecific::NoMoreRecentValue { + arguments: internal::DHTNoMoreRecentValueResponseArguments { + id: args.responder_id.to_vec(), + token: args.token.clone(), + nodes: args.nodes.as_ref().map(|nodes| nodes4_to_bytes(nodes)), + seq: args.seq, + }, + } + } }), MessageType::Error(err) => { @@ -531,6 +550,19 @@ impl Message { sig: arguments.sig, }) } + internal::DHTResponseSpecific::NoMoreRecentValue { arguments } => { + ResponseSpecific::NoMoreRecentValue( + NoMoreRecentValueResponseArguments { + responder_id: Id::from_bytes(arguments.id)?, + token: arguments.token, + nodes: match arguments.nodes { + Some(nodes) => Some(bytes_to_nodes4(nodes)?), + None => None, + }, + seq: arguments.seq, + }, + ) + } }) } @@ -594,6 +626,7 @@ impl Message { ResponseSpecific::GetImmutable(arguments) => arguments.responder_id, ResponseSpecific::GetMutable(arguments) => arguments.responder_id, ResponseSpecific::NoValues(arguments) => arguments.responder_id, + ResponseSpecific::NoMoreRecentValue(arguments) => arguments.responder_id, }, MessageType::Error(_) => { return None; @@ -613,6 +646,7 @@ impl Message { ResponseSpecific::GetMutable(arguments) => arguments.nodes.as_ref().cloned(), ResponseSpecific::GetImmutable(arguments) => arguments.nodes.as_ref().cloned(), ResponseSpecific::NoValues(arguments) => arguments.nodes.as_ref().cloned(), + ResponseSpecific::NoMoreRecentValue(arguments) => arguments.nodes.as_ref().cloned(), }, _ => None, } @@ -635,6 +669,9 @@ impl Message { ResponseSpecific::NoValues(arguments) => { Some((arguments.responder_id, arguments.token.clone())) } + ResponseSpecific::NoMoreRecentValue(arguments) => { + Some((arguments.responder_id, arguments.token.clone())) + } }, _ => None, } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 82332c2b..f639d6c8 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -19,8 +19,9 @@ use crate::common::{ }; use crate::messages::{ FindNodeRequestArguments, GetImmutableResponseArguments, GetMutableResponseArguments, - GetPeersResponseArguments, GetValueRequestArguments, Message, MessageType, PutRequestSpecific, - RequestSpecific, RequestTypeSpecific, ResponseSpecific, + GetPeersResponseArguments, GetValueRequestArguments, Message, MessageType, + NoMoreRecentValueResponseArguments, PutRequestSpecific, RequestSpecific, RequestTypeSpecific, + ResponseSpecific, }; use crate::Result; @@ -423,6 +424,23 @@ impl Rpc { ); } } + MessageType::Response(ResponseSpecific::NoMoreRecentValue( + NoMoreRecentValueResponseArguments { + seq, responder_id, .. + }, + )) => { + debug!( + target= ?query.target, + salt= ?match query.request.request_type.clone() { + RequestTypeSpecific::GetValue(args) => args.salt, + _ => None, + }, + ?seq, + ?from, + ?responder_id, + "No more recent mutable value" + ); + } // Ping response is already handled in add_node() // FindNode response is already handled in query.add_candidate() _ => {} diff --git a/src/rpc/server/request.rs b/src/rpc/server/request.rs index 77eb1f63..40293bf7 100644 --- a/src/rpc/server/request.rs +++ b/src/rpc/server/request.rs @@ -9,9 +9,9 @@ use crate::messages::{ AnnouncePeerRequestArguments, ErrorSpecific, FindNodeRequestArguments, FindNodeResponseArguments, GetImmutableResponseArguments, GetMutableResponseArguments, GetPeersRequestArguments, GetPeersResponseArguments, GetValueRequestArguments, - NoValuesResponseArguments, PingResponseArguments, PutImmutableRequestArguments, - PutMutableRequestArguments, PutRequest, PutRequestSpecific, RequestSpecific, - RequestTypeSpecific, ResponseSpecific, + NoMoreRecentValueResponseArguments, NoValuesResponseArguments, PingResponseArguments, + PutImmutableRequestArguments, PutMutableRequestArguments, PutRequest, PutRequestSpecific, + RequestSpecific, RequestTypeSpecific, ResponseSpecific, }; use super::super::Rpc; @@ -65,7 +65,7 @@ pub fn handle_request( } RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, .. }) => { if seq.is_some() { - return handle_get_mutable(rpc, from, transaction_id, target); + return handle_get_mutable(rpc, from, transaction_id, target, seq); } if let Some(v) = rpc.immutable_values.get(target) { @@ -80,7 +80,7 @@ pub fn handle_request( }), ) } else { - handle_get_mutable(rpc, from, transaction_id, target); + handle_get_mutable(rpc, from, transaction_id, target, seq); }; } RequestTypeSpecific::Put(PutRequest { @@ -322,25 +322,39 @@ pub fn handle_request( } } -fn handle_get_mutable(rpc: &mut Rpc, from: SocketAddr, transaction_id: u16, target: &Id) { +fn handle_get_mutable( + rpc: &mut Rpc, + from: SocketAddr, + transaction_id: u16, + target: &Id, + seq: &Option, +) { rpc.socket.response( from, transaction_id, match rpc.mutable_values.get(target) { Some(item) => { - // TODO: support seq (NoMoreRecentValue) - // if let Some(seq) = seq { - // } + let no_more_recent_values = seq.map(|request_seq| item.seq() <= &request_seq); - ResponseSpecific::GetMutable(GetMutableResponseArguments { - responder_id: rpc.id, - token: rpc.tokens.generate_token(from).into(), - nodes: Some(rpc.routing_table.closest(target)), - v: item.value().to_vec(), - k: item.key().to_vec(), - seq: *item.seq(), - sig: item.signature().to_vec(), - }) + match no_more_recent_values { + Some(true) => { + ResponseSpecific::NoMoreRecentValue(NoMoreRecentValueResponseArguments { + responder_id: rpc.id, + token: rpc.tokens.generate_token(from).into(), + nodes: Some(rpc.routing_table.closest(target)), + seq: *item.seq(), + }) + } + _ => ResponseSpecific::GetMutable(GetMutableResponseArguments { + responder_id: rpc.id, + token: rpc.tokens.generate_token(from).into(), + nodes: Some(rpc.routing_table.closest(target)), + v: item.value().to_vec(), + k: item.key().to_vec(), + seq: *item.seq(), + sig: item.signature().to_vec(), + }), + } } None => ResponseSpecific::NoValues(NoValuesResponseArguments { responder_id: rpc.id,