Skip to content

Commit

Permalink
Merge pull request #14 from Nuhvi/feat/no-more-recent-values
Browse files Browse the repository at this point in the history
feat: servers return NoMoreRecentValue response
  • Loading branch information
Nuhvi authored Apr 16, 2024
2 parents 68e9515 + 765d1dc commit 4580812
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 34 deletions.
2 changes: 1 addition & 1 deletion examples/get_mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fn lookup(dht: &Dht, public_key: VerifyingKey) {
let mut count = 0;

println!("Streaming mutable items:");
for item in dht.get_mutable(public_key.as_bytes(), None).unwrap() {
for item in dht.get_mutable(public_key.as_bytes(), None, None).unwrap() {
count += 1;

if !first {
Expand Down
50 changes: 43 additions & 7 deletions src/async_dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl AsyncDht {
// === Public Methods ===

pub fn shutdown(&self) -> Result<()> {
self.0.shutdown()
self.0.shutdown();
self.0.handle.join()
}

// === Peers ===
Expand Down Expand Up @@ -168,16 +169,13 @@ impl AsyncDht {
&self,
public_key: &[u8; 32],
salt: Option<Bytes>,
seq: Option<i64>,
) -> Result<flume::r#async::RecvStream<MutableItem>> {
let target = target_from_key(public_key, &salt);

let (sender, receiver) = flume::unbounded::<MutableItem>();

let request = RequestTypeSpecific::GetValue(GetValueRequestArguments {
target,
seq: None,
salt,
});
let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt });

let _ = self.0.sender.send(ActorMessage::Get(
target,
Expand Down Expand Up @@ -344,7 +342,7 @@ mod test {
a.put_mutable(item.clone()).await.unwrap();

let response = b
.get_mutable(signer.verifying_key().as_bytes(), None)
.get_mutable(signer.verifying_key().as_bytes(), None, None)
.unwrap()
.next()
.await
Expand All @@ -355,4 +353,42 @@ mod test {

futures::executor::block_on(test());
}

#[test]
fn put_get_mutable_no_more_recent_value() {
async fn test() {
let testnet = Testnet::new(10);

let a = Dht::builder()
.bootstrap(&testnet.bootstrap)
.build()
.as_async();
let b = Dht::builder()
.bootstrap(&testnet.bootstrap)
.build()
.as_async();

let signer = SigningKey::from_bytes(&[
56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
]);

let seq = 1000;
let value: Bytes = "Hello World!".into();

let item = MutableItem::new(signer.clone(), value, seq, None);

a.put_mutable(item.clone()).await.unwrap();

let response = b
.get_mutable(signer.verifying_key().as_bytes(), None, Some(seq))
.unwrap()
.next()
.await;

assert!(&response.is_none());
}

futures::executor::block_on(test());
}
}
36 changes: 30 additions & 6 deletions src/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,17 +282,14 @@ impl Dht {
pub fn get_mutable(
&self,
public_key: &[u8; 32],
seq: Option<i64>,
salt: Option<Bytes>,
) -> Result<flume::IntoIter<MutableItem>> {
let target = target_from_key(public_key, &salt);

let (sender, receiver) = flume::unbounded::<MutableItem>();

let request = RequestTypeSpecific::GetValue(GetValueRequestArguments {
target,
seq: None,
salt,
});
let request = RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, salt });

let _ = self.sender.send(ActorMessage::Get(
target,
Expand Down Expand Up @@ -502,11 +499,38 @@ mod test {
a.put_mutable(item.clone()).unwrap();

let response = b
.get_mutable(signer.verifying_key().as_bytes(), None)
.get_mutable(signer.verifying_key().as_bytes(), None, None)
.unwrap()
.next()
.expect("No mutable values");

assert_eq!(&response, &item);
}

#[test]
fn put_get_mutable_no_more_recent_value() {
let testnet = Testnet::new(10);

let a = Dht::builder().bootstrap(&testnet.bootstrap).build();
let b = Dht::builder().bootstrap(&testnet.bootstrap).build();

let signer = SigningKey::from_bytes(&[
56, 171, 62, 85, 105, 58, 155, 209, 189, 8, 59, 109, 137, 84, 84, 201, 221, 115, 7,
228, 127, 70, 4, 204, 182, 64, 77, 98, 92, 215, 27, 103,
]);

let seq = 1000;
let value: Bytes = "Hello World!".into();

let item = MutableItem::new(signer.clone(), value, seq, None);

a.put_mutable(item.clone()).unwrap();

let response = b
.get_mutable(signer.verifying_key().as_bytes(), Some(seq), None)
.unwrap()
.next();

assert!(&response.is_none());
}
}
20 changes: 20 additions & 0 deletions src/messages/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,11 @@ pub enum DHTResponseSpecific {
arguments: DHTGetMutableResponseArguments,
},

NoMoreRecentValue {
#[serde(rename = "r")]
arguments: DHTNoMoreRecentValueResponseArguments,
},

GetImmutable {
#[serde(rename = "r")]
arguments: DHTGetImmutableResponseArguments,
Expand Down Expand Up @@ -259,6 +264,21 @@ pub struct DHTGetImmutableResponseArguments {
pub v: Vec<u8>,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct DHTNoMoreRecentValueResponseArguments {
#[serde(with = "serde_bytes")]
pub id: Vec<u8>,

#[serde(with = "serde_bytes")]
pub token: Vec<u8>,

#[serde(with = "serde_bytes")]
#[serde(default)]
pub nodes: Option<Vec<u8>>,

pub seq: i64,
}

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq)]
pub struct DHTGetMutableResponseArguments {
#[serde(with = "serde_bytes")]
Expand Down
37 changes: 37 additions & 0 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub enum ResponseSpecific {
GetImmutable(GetImmutableResponseArguments),
GetMutable(GetMutableResponseArguments),
NoValues(NoValuesResponseArguments),
NoMoreRecentValue(NoMoreRecentValueResponseArguments),
}

// === PING ===
Expand Down Expand Up @@ -168,6 +169,14 @@ pub struct GetMutableResponseArguments {
pub sig: Vec<u8>,
}

#[derive(Debug, PartialEq, Clone)]
pub struct NoMoreRecentValueResponseArguments {
pub responder_id: Id,
pub token: Vec<u8>,
pub nodes: Option<Vec<Node>>,
pub seq: i64,
}

// === Put Immutable ===

#[derive(Debug, PartialEq, Clone)]
Expand Down Expand Up @@ -356,6 +365,16 @@ impl Message {
},
}
}
ResponseSpecific::NoMoreRecentValue(args) => {
internal::DHTResponseSpecific::NoMoreRecentValue {
arguments: internal::DHTNoMoreRecentValueResponseArguments {
id: args.responder_id.to_vec(),
token: args.token.clone(),
nodes: args.nodes.as_ref().map(|nodes| nodes4_to_bytes(nodes)),
seq: args.seq,
},
}
}
}),

MessageType::Error(err) => {
Expand Down Expand Up @@ -531,6 +550,19 @@ impl Message {
sig: arguments.sig,
})
}
internal::DHTResponseSpecific::NoMoreRecentValue { arguments } => {
ResponseSpecific::NoMoreRecentValue(
NoMoreRecentValueResponseArguments {
responder_id: Id::from_bytes(arguments.id)?,
token: arguments.token,
nodes: match arguments.nodes {
Some(nodes) => Some(bytes_to_nodes4(nodes)?),
None => None,
},
seq: arguments.seq,
},
)
}
})
}

Expand Down Expand Up @@ -594,6 +626,7 @@ impl Message {
ResponseSpecific::GetImmutable(arguments) => arguments.responder_id,
ResponseSpecific::GetMutable(arguments) => arguments.responder_id,
ResponseSpecific::NoValues(arguments) => arguments.responder_id,
ResponseSpecific::NoMoreRecentValue(arguments) => arguments.responder_id,
},
MessageType::Error(_) => {
return None;
Expand All @@ -613,6 +646,7 @@ impl Message {
ResponseSpecific::GetMutable(arguments) => arguments.nodes.as_ref().cloned(),
ResponseSpecific::GetImmutable(arguments) => arguments.nodes.as_ref().cloned(),
ResponseSpecific::NoValues(arguments) => arguments.nodes.as_ref().cloned(),
ResponseSpecific::NoMoreRecentValue(arguments) => arguments.nodes.as_ref().cloned(),
},
_ => None,
}
Expand All @@ -635,6 +669,9 @@ impl Message {
ResponseSpecific::NoValues(arguments) => {
Some((arguments.responder_id, arguments.token.clone()))
}
ResponseSpecific::NoMoreRecentValue(arguments) => {
Some((arguments.responder_id, arguments.token.clone()))
}
},
_ => None,
}
Expand Down
22 changes: 20 additions & 2 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use crate::common::{
};
use crate::messages::{
FindNodeRequestArguments, GetImmutableResponseArguments, GetMutableResponseArguments,
GetPeersResponseArguments, GetValueRequestArguments, Message, MessageType, PutRequestSpecific,
RequestSpecific, RequestTypeSpecific, ResponseSpecific,
GetPeersResponseArguments, GetValueRequestArguments, Message, MessageType,
NoMoreRecentValueResponseArguments, PutRequestSpecific, RequestSpecific, RequestTypeSpecific,
ResponseSpecific,
};

use crate::Result;
Expand Down Expand Up @@ -423,6 +424,23 @@ impl Rpc {
);
}
}
MessageType::Response(ResponseSpecific::NoMoreRecentValue(
NoMoreRecentValueResponseArguments {
seq, responder_id, ..
},
)) => {
debug!(
target= ?query.target,
salt= ?match query.request.request_type.clone() {
RequestTypeSpecific::GetValue(args) => args.salt,
_ => None,
},
?seq,
?from,
?responder_id,
"No more recent mutable value"
);
}
// Ping response is already handled in add_node()
// FindNode response is already handled in query.add_candidate()
_ => {}
Expand Down
50 changes: 32 additions & 18 deletions src/rpc/server/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use crate::messages::{
AnnouncePeerRequestArguments, ErrorSpecific, FindNodeRequestArguments,
FindNodeResponseArguments, GetImmutableResponseArguments, GetMutableResponseArguments,
GetPeersRequestArguments, GetPeersResponseArguments, GetValueRequestArguments,
NoValuesResponseArguments, PingResponseArguments, PutImmutableRequestArguments,
PutMutableRequestArguments, PutRequest, PutRequestSpecific, RequestSpecific,
RequestTypeSpecific, ResponseSpecific,
NoMoreRecentValueResponseArguments, NoValuesResponseArguments, PingResponseArguments,
PutImmutableRequestArguments, PutMutableRequestArguments, PutRequest, PutRequestSpecific,
RequestSpecific, RequestTypeSpecific, ResponseSpecific,
};

use super::super::Rpc;
Expand Down Expand Up @@ -65,7 +65,7 @@ pub fn handle_request(
}
RequestTypeSpecific::GetValue(GetValueRequestArguments { target, seq, .. }) => {
if seq.is_some() {
return handle_get_mutable(rpc, from, transaction_id, target);
return handle_get_mutable(rpc, from, transaction_id, target, seq);
}

if let Some(v) = rpc.immutable_values.get(target) {
Expand All @@ -80,7 +80,7 @@ pub fn handle_request(
}),
)
} else {
handle_get_mutable(rpc, from, transaction_id, target);
handle_get_mutable(rpc, from, transaction_id, target, seq);
};
}
RequestTypeSpecific::Put(PutRequest {
Expand Down Expand Up @@ -322,25 +322,39 @@ pub fn handle_request(
}
}

fn handle_get_mutable(rpc: &mut Rpc, from: SocketAddr, transaction_id: u16, target: &Id) {
fn handle_get_mutable(
rpc: &mut Rpc,
from: SocketAddr,
transaction_id: u16,
target: &Id,
seq: &Option<i64>,
) {
rpc.socket.response(
from,
transaction_id,
match rpc.mutable_values.get(target) {
Some(item) => {
// TODO: support seq (NoMoreRecentValue)
// if let Some(seq) = seq {
// }
let no_more_recent_values = seq.map(|request_seq| item.seq() <= &request_seq);

ResponseSpecific::GetMutable(GetMutableResponseArguments {
responder_id: rpc.id,
token: rpc.tokens.generate_token(from).into(),
nodes: Some(rpc.routing_table.closest(target)),
v: item.value().to_vec(),
k: item.key().to_vec(),
seq: *item.seq(),
sig: item.signature().to_vec(),
})
match no_more_recent_values {
Some(true) => {
ResponseSpecific::NoMoreRecentValue(NoMoreRecentValueResponseArguments {
responder_id: rpc.id,
token: rpc.tokens.generate_token(from).into(),
nodes: Some(rpc.routing_table.closest(target)),
seq: *item.seq(),
})
}
_ => ResponseSpecific::GetMutable(GetMutableResponseArguments {
responder_id: rpc.id,
token: rpc.tokens.generate_token(from).into(),
nodes: Some(rpc.routing_table.closest(target)),
v: item.value().to_vec(),
k: item.key().to_vec(),
seq: *item.seq(),
sig: item.signature().to_vec(),
}),
}
}
None => ResponseSpecific::NoValues(NoValuesResponseArguments {
responder_id: rpc.id,
Expand Down

0 comments on commit 4580812

Please # to comment.