Skip to content

Commit

Permalink
Integrate with paritytech/litep2p#96
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Gheorghe <alexandru.gheorghe@parity.io>
  • Loading branch information
alexggh committed Apr 30, 2024
1 parent 9979f8f commit fd5dc46
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 47 deletions.
46 changes: 23 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -646,3 +646,6 @@ wasmi = { opt-level = 3 }
x25519-dalek = { opt-level = 3 }
yamux = { opt-level = 3 }
zeroize = { opt-level = 3 }

[patch."https://github.com/paritytech/litep2p"]
litep2p = { git = "https://github.com/paritytech//litep2p", branch = "lenxv/expose-peer-records" }
19 changes: 12 additions & 7 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use litep2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, KademliaEvent,
KademliaHandle, PeerRecord, QueryId, Quorum, Record, RecordKey,
KademliaHandle, QueryId, Quorum, Record, RecordKey, RecordsType,
},
ping::{Config as PingConfig, PingEvent},
},
Expand Down Expand Up @@ -128,8 +128,8 @@ pub enum DiscoveryEvent {
/// Query ID.
query_id: QueryId,

/// Record.
record: PeerRecord,
/// Records.
records: RecordsType,
},

/// Record was successfully stored on the DHT.
Expand Down Expand Up @@ -348,9 +348,14 @@ impl Discovery {
&mut self,
record: Record,
peers: Vec<sc_network_types::PeerId>,
update_local_storage: bool,
) -> QueryId {
self.kademlia_handle
.put_record_to_peers(record, peers.into_iter().map(|peer| peer.into()).collect())
.put_record_to_peers(
record,
peers.into_iter().map(|peer| peer.into()).collect(),
update_local_storage,
)
.await
}

Expand Down Expand Up @@ -474,13 +479,13 @@ impl Stream for Discovery {
peers: peers.into_iter().collect(),
}))
},
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, record })) => {
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` succeeded for {query_id:?}: {record:?}",
"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, record }));
return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
},
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Expand Down
56 changes: 39 additions & 17 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,9 +697,9 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.pending_put_values.insert(query_id, (key, Instant::now()));
}

NetworkServiceCommand::PutValueTo { record, peers, update_local_storage: _ } => {
NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => {
let kademlia_key = record.key.to_vec().into();
let query_id = self.discovery.put_value_to_peers(record, peers).await;
let query_id = self.discovery.put_value_to_peers(record, peers, update_local_storage).await;
self.pending_put_values.insert(query_id, (kademlia_key, Instant::now()));
}

Expand Down Expand Up @@ -802,40 +802,62 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.peerstore_handle.add_known_peer(peer.into());
}
}
Some(DiscoveryEvent::GetRecordSuccess { query_id, record }) => {
Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
match self.pending_get_values.remove(&query_id) {
None => log::warn!(
target: LOG_TARGET,
"`GET_VALUE` succeeded for a non-existent query",
),
Some((_key, started)) => {
Some((key, started)) => {
log::trace!(
target: LOG_TARGET,
"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
record.record.key,
key,
);

self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
let received_records = match records {
litep2p::protocol::libp2p::kademlia::RecordsType::LocalStore(record) => {
vec![
PeerRecord {
record: Record {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
key: record.key.to_vec().into(),
value: record.value,
publisher: record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
expires: record.expires,
},
peer: record.peer.map(|peer_id| {
peer: None,
}
]
},
litep2p::protocol::libp2p::kademlia::RecordsType::Network(records) => records.into_iter().map(|record| {
let peer_id: sc_network_types::PeerId = record.peer.into();

PeerRecord {
record: Record {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
}
expires: record.record.expires,
},
peer: Some(peer_id.into()),
}
}).collect::<Vec<_>>(),
};

for record in received_records {
self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record
)
)
)
);
);
}

if let Some(ref metrics) = self.metrics {
metrics
Expand Down

0 comments on commit fd5dc46

Please # to comment.