Skip to content

Commit

Permalink
Emit global uTP listener events when uTP stream is closed or reset
Browse files Browse the repository at this point in the history
  • Loading branch information
ogenev committed Jun 7, 2022
1 parent 357a5eb commit 88ca107
Show file tree
Hide file tree
Showing 11 changed files with 543 additions and 276 deletions.
48 changes: 48 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion newsfragments/325.added.md
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
Process all closed uTP streams in UtpListener and pass the payload to overlay service.
- Rename `UtpSocket` to `UtpStream`.
- Refactor the way we are storing the received payload (DATA packets) in the uTP stream.
- Add a new AddActiveConnection UtpListener request and move the initialization of a uTP stream inside UtpListener.
- Add UtpStream -> UtpListener event channel and emit event inside UtpStream when stream state changes to Closed or Reset.
- Emit a global uTP listener event containing a uTP payload when a stream is closed.
- Remove redundant and dead code.
1 change: 1 addition & 0 deletions trin-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ features = ["bundled"]

[dev-dependencies]
quickcheck = "1.0.3"
ntest = "0.8.0"
13 changes: 13 additions & 0 deletions trin-core/src/portalnet/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use rand::seq::SliceRandom;
use serde_json::{json, Value};
use std::{
convert::TryFrom,
fmt,
net::{IpAddr, SocketAddr},
sync::Arc,
time::Duration,
Expand Down Expand Up @@ -54,6 +55,18 @@ pub struct Discovery {
pub listen_socket: SocketAddr,
}

impl fmt::Debug for Discovery {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Discovery: ( enr: {}, started: {}, listen_socket: {} )",
self.discv5.local_enr(),
self.started,
self.listen_socket
)
}
}

impl Discovery {
pub fn new(portal_config: PortalnetConfig) -> Result<Self, String> {
let listen_all_ips = SocketAddr::new("0.0.0.0".parse().unwrap(), portal_config.listen_port);
Expand Down
136 changes: 62 additions & 74 deletions trin-core/src/portalnet/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::portalnet::{
use crate::{
portalnet::types::content_key::RawContentKey,
utp::{
stream::{UtpListenerRequest, UtpSocket, BUF_SIZE},
stream::{UtpListenerRequest, UtpStream, BUF_SIZE},
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
},
};
Expand Down Expand Up @@ -281,49 +281,41 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
conn_id: u16,
) -> Result<Content, OverlayRequestError> {
// initiate the connection to the acceptor
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();
self.utp_listener_tx
.send(UtpListenerRequest::Connect(
conn_id,
enr.node_id(),
self.protocol.clone(),
UtpStreamId::FindContentStream,
tx,
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
let utp_request = UtpListenerRequest::Connect(
conn_id,
enr,
self.protocol.clone(),
UtpStreamId::FindContentStream,
tx,
);

self.utp_listener_tx.send(utp_request).map_err(|err| {
OverlayRequestError::UtpError(format!(
"Unable to send Connect request with FindContent stream to UtpListener: {err}"
))
.map_err(|err| {
OverlayRequestError::UtpError(format!(
"Unable to send Connect request with FindContent stream to UtpListener: {err}"
))
})?;
})?;

match rx.await {
Ok(conn) => {
match conn {
Ok(mut conn) => {
let mut result = Vec::new();
// Loop and receive all DATA packets, similar to `read_to_end`
loop {
let mut buf = [0; BUF_SIZE];
match conn.recv_from(&mut buf).await {
Ok((0, _)) => {
break;
}
Ok((bytes, _)) => {
result.extend_from_slice(&mut buf[..bytes]);
}
Err(err) => {
warn!("Unable to receive content via uTP: {err}");
return Err(OverlayRequestError::UtpError(err.to_string()));
}
}
Ok(mut conn) => {
let mut result = Vec::new();
// Loop and receive all DATA packets, similar to `read_to_end`
loop {
let mut buf = [0; BUF_SIZE];
match conn.recv_from(&mut buf).await {
Ok((0, _)) => {
break;
}
Ok((bytes, _)) => {
result.extend_from_slice(&mut buf[..bytes]);
}
Err(err) => {
warn!("Unable to receive content via uTP: {err}");
return Err(OverlayRequestError::UtpError(err.to_string()));
}
Ok(Content::Content(VariableList::from(result)))
}
Err(err) => {
warn!("Unable to initiate uTP stream with remote node. Error initializing uTP socket: {err}");
Err(OverlayRequestError::UtpError(err.to_string()))
}
}
Ok(Content::Content(VariableList::from(result)))
}
Err(err) => {
warn!("Unable to receive from uTP listener channel: {err}");
Expand Down Expand Up @@ -390,47 +382,43 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
}

// initiate the connection to the acceptor
let (tx, rx) = tokio::sync::oneshot::channel::<anyhow::Result<UtpSocket>>();

self.utp_listener_tx.send(UtpListenerRequest::Connect(
let (tx, rx) = tokio::sync::oneshot::channel::<UtpStream>();
let utp_request = UtpListenerRequest::Connect(
conn_id,
enr.node_id(),
enr,
self.protocol.clone(),
UtpStreamId::OfferStream,
tx,
)).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;
);

match rx.await? {
Ok(mut conn) => {
// Handle STATE packet for SYN
let mut buf = [0; BUF_SIZE];
conn.recv(&mut buf).await?;

let content_items = self.provide_requested_content(&response, content_keys_offered);

let content_message = UtpAccept {
message: content_items,
};

tokio::spawn(async move {
// send the content to the acceptor over a uTP stream
if let Err(err) = conn
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
.await
{
warn!("Error sending content {err}");
};
// Close uTP connection
if let Err(err) = conn.close().await {
warn!("Unable to close uTP connection!: {err}")
};
});
Ok(response)
}
Err(err) => Err(anyhow!(
"Unable to initialize Offer uTP stream with remote node: {err}"
)),
}
self.utp_listener_tx
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;

let mut conn = rx.await?;
// Handle STATE packet for SYN
let mut buf = [0; BUF_SIZE];
conn.recv(&mut buf).await?;

let content_items = self.provide_requested_content(&response, content_keys_offered);

let content_message = UtpAccept {
message: content_items,
};

tokio::spawn(async move {
// send the content to the acceptor over a uTP stream
if let Err(err) = conn
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
.await
{
warn!("Error sending content {err}");
};
// Close uTP connection
if let Err(err) = conn.close().await {
warn!("Unable to close uTP connection!: {err}")
};
});
Ok(response)
}

/// Provide the requested content key and content value for the acceptor
Expand Down
30 changes: 22 additions & 8 deletions trin-core/src/portalnet/overlay_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub const FIND_CONTENT_MAX_NODES: usize = 32;
/// which is more than 10x the ethereum mainnet node count) into a unique bucket by the 17th bucket index.
const EXPECTED_NON_EMPTY_BUCKETS: usize = 17;
/// Bucket refresh lookup interval in seconds
const BUCKET_REFRESH_INTERVAL: u64 = 60;
const BUCKET_REFRESH_INTERVAL_SECS: u64 = 60;

/// An overlay request error.
#[derive(Clone, Error, Debug)]
Expand Down Expand Up @@ -381,7 +381,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
async fn start(&mut self) {
// Construct bucket refresh interval
let mut bucket_refresh_interval =
tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL));
tokio::time::interval(Duration::from_secs(BUCKET_REFRESH_INTERVAL_SECS));

loop {
tokio::select! {
Expand Down Expand Up @@ -421,16 +421,16 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
}
}

/// Send request to UtpLister to add an uTP stream to the active connections
/// Send request to UtpListener to add a uTP stream to the active connections
fn add_utp_connection(
&self,
source: &NodeId,
conn_id_recv: u16,
stream_id: UtpStreamId,
) -> Result<(), OverlayRequestError> {
if let Some(enr) = self.discovery.discv5.find_enr(source) {
// Initialize active uTP stream with requested note
let utp_request = UtpListenerRequest::AddActiveConnection(
if let Some(enr) = self.find_enr(source) {
// Initialize active uTP stream with requesting node
let utp_request = UtpListenerRequest::InitiateConnection(
enr,
self.protocol.clone(),
stream_id,
Expand All @@ -441,8 +441,12 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
"Unable to send uTP AddActiveConnection request: {err}"
)));
}
Ok(())
} else {
Err(OverlayRequestError::UtpError(
"Can't find ENR in overlay routing table matching remote NodeId".to_string(),
))
}
Ok(())
}

/// Main bucket refresh lookup logic
Expand Down Expand Up @@ -507,6 +511,16 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
.await
}

/// Returns an ENR if one is known for the given NodeId in overlay routing table
pub fn find_enr(&self, node_id: &NodeId) -> Option<Enr> {
// check if we know this node id in our routing table
let key = kbucket::Key::from(*node_id);
if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&key) {
return Some(entry.value().enr.clone());
}
None
}

/// Processes an overlay request.
fn process_request(&mut self, request: OverlayRequest) {
// For incoming requests, handle the request, possibly send the response over the channel,
Expand Down Expand Up @@ -650,7 +664,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
self.add_utp_connection(
source,
conn_id_recv,
UtpStreamId::FindContentData(content),
UtpStreamId::ContentStream(content),
)?;

// Connection id is send as BE because uTP header values are stored also as BE
Expand Down
2 changes: 1 addition & 1 deletion trin-core/src/portalnet/types/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ pub enum ProtocolIdError {
}

/// Protocol identifiers
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq)]
pub enum ProtocolId {
State,
History,
Expand Down
Loading

0 comments on commit 88ca107

Please # to comment.