Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feat: always box transport upgrades #3436

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
Multiaddr, ProtocolName,
};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
use pin_project::pin_project;
use std::{pin::Pin, task::Context, task::Poll};
Expand Down Expand Up @@ -137,27 +138,23 @@ where
{
type Output = future::Either<A::Output, B::Output>;
type Error = Either<A::Error, B::Error>;
type ListenerUpgrade = EitherFuture<A::ListenerUpgrade, B::ListenerUpgrade>;
type Dial = EitherFuture<A::Dial, B::Dial>;

fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
) -> Poll<TransportEvent<Self::Output, Self::Error>> {
match self.as_pin_mut() {
Either::Left(a) => match a.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => {
Poll::Ready(event.map_upgrade(EitherFuture::First).map_err(Either::Left))
Poll::Ready(event.map_out(future::Either::Left).map_err(Either::Left))
}
},
Either::Right(b) => match b.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(event) => Poll::Ready(
event
.map_upgrade(EitherFuture::Second)
.map_err(Either::Right),
),
Poll::Ready(event) => {
Poll::Ready(event.map_out(future::Either::Right).map_err(Either::Right))
}
},
}
}
Expand All @@ -183,16 +180,26 @@ where
}
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(
&mut self,
addr: Multiaddr,
) -> Result<BoxFuture<'static, Result<Self::Output, Self::Error>>, TransportError<Self::Error>>
{
use TransportError::*;
match self {
Either::Left(a) => match a.dial(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Ok(connec) => Ok(connec
.map_ok(future::Either::Left)
.map_err(Either::Left)
.boxed()),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(Either::Left(err))),
},
Either::Right(b) => match b.dial(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Ok(connec) => Ok(connec
.map_ok(future::Either::Right)
.map_err(Either::Right)
.boxed()),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(Either::Right(err))),
},
Expand All @@ -202,19 +209,25 @@ where
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>>
) -> Result<BoxFuture<'static, Result<Self::Output, Self::Error>>, TransportError<Self::Error>>
where
Self: Sized,
{
use TransportError::*;
match self {
Either::Left(a) => match a.dial_as_listener(addr) {
Ok(connec) => Ok(EitherFuture::First(connec)),
Ok(connec) => Ok(connec
.map_ok(future::Either::Left)
.map_err(Either::Left)
.boxed()),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(Either::Left(err))),
},
Either::Right(b) => match b.dial_as_listener(addr) {
Ok(connec) => Ok(EitherFuture::Second(connec)),
Ok(connec) => Ok(connec
.map_ok(future::Either::Right)
.map_err(Either::Right)
.boxed()),
Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)),
Err(Other(err)) => Err(Other(Either::Right(err))),
},
Expand Down
51 changes: 20 additions & 31 deletions core/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
//! any desired protocols. The rest of the module defines combinators for
//! modifying a transport through composition with other transports or protocol upgrades.

use futures::future::BoxFuture;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{
Expand Down Expand Up @@ -93,21 +94,6 @@ pub trait Transport {
/// An error that occurred during connection setup.
type Error: Error;

/// A pending [`Output`](Transport::Output) for an inbound connection,
/// obtained from the [`Transport`] stream.
///
/// After a connection has been accepted by the transport, it may need to go through
/// asynchronous post-processing (i.e. protocol upgrade negotiations). Such
/// post-processing should not block the `Listener` from producing the next
/// connection, hence further connection setup proceeds asynchronously.
/// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output)
/// of the connection setup process.
type ListenerUpgrade: Future<Output = Result<Self::Output, Self::Error>>;

/// A pending [`Output`](Transport::Output) for an outbound connection,
/// obtained from [dialing](Transport::dial).
type Dial: Future<Output = Result<Self::Output, Self::Error>>;

/// Listens on the given [`Multiaddr`] for inbound connections.
fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>>;

Expand All @@ -121,7 +107,10 @@ pub trait Transport {
///
/// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to
/// try an alternative [`Transport`], if available.
fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>>;
fn dial(
&mut self,
addr: Multiaddr,
) -> Result<BoxFuture<'static, Result<Self::Output, Self::Error>>, TransportError<Self::Error>>;

/// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection.
///
Expand All @@ -131,7 +120,7 @@ pub trait Transport {
fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>>;
) -> Result<BoxFuture<'static, Result<Self::Output, Self::Error>>, TransportError<Self::Error>>;

/// Poll for [`TransportEvent`]s.
///
Expand All @@ -147,7 +136,7 @@ pub trait Transport {
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>>;
) -> Poll<TransportEvent<Self::Output, Self::Error>>;

/// Performs a transport-specific mapping of an address `observed` by a remote onto a
/// local `listen` address to yield an address for the local node that may be reachable
Expand All @@ -171,8 +160,6 @@ pub trait Transport {
fn boxed(self) -> boxed::Boxed<Self::Output>
where
Self: Sized + Send + Unpin + 'static,
Self::Dial: Send + 'static,
Self::ListenerUpgrade: Send + 'static,
Self::Error: Send + Sync,
{
boxed::boxed(self)
Expand Down Expand Up @@ -220,7 +207,7 @@ pub trait Transport {
fn and_then<C, F, O>(self, f: C) -> and_then::AndThen<Self, C>
where
Self: Sized,
C: FnOnce(Self::Output, ConnectedPoint) -> F,
C: FnOnce(Self::Output, ConnectedPoint) -> F + Send + 'static,
F: TryFuture<Ok = O>,
<F as TryFuture>::Error: Error + 'static,
{
Expand Down Expand Up @@ -256,7 +243,7 @@ impl Default for ListenerId {
}

/// Event produced by [`Transport`]s.
pub enum TransportEvent<TUpgr, TErr> {
pub enum TransportEvent<TOut, TErr> {
/// A new address is being listened on.
NewAddress {
/// The listener that is listening on the new address.
Expand All @@ -276,7 +263,7 @@ pub enum TransportEvent<TUpgr, TErr> {
/// The listener that produced the upgrade.
listener_id: ListenerId,
/// The produced upgrade.
upgrade: TUpgr,
upgrade: BoxFuture<'static, Result<TOut, TErr>>,
/// Local connection address.
local_addr: Multiaddr,
/// Address used to send back data to the incoming client.
Expand All @@ -302,10 +289,9 @@ pub enum TransportEvent<TUpgr, TErr> {
},
}

impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
/// In case this [`TransportEvent`] is an upgrade, apply the given function
/// to the upgrade and produce another transport event based the the function's result.
pub fn map_upgrade<U>(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent<U, TErr> {
impl<TOut, TErr> TransportEvent<TOut, TErr> {
/// TODO
pub fn map_out<U>(self, map: impl Fn(TOut) -> U + Send + 'static) -> TransportEvent<U, TErr> {
match self {
TransportEvent::Incoming {
listener_id,
Expand All @@ -314,7 +300,7 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
send_back_addr,
} => TransportEvent::Incoming {
listener_id,
upgrade: map(upgrade),
upgrade: upgrade.map_ok(map).boxed(),
local_addr,
send_back_addr,
},
Expand Down Expand Up @@ -348,7 +334,10 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
/// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError),
/// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the
/// error and produce another transport event based on the function's result.
pub fn map_err<E>(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent<TUpgr, E> {
pub fn map_err<E>(
self,
map_err: impl FnOnce(TErr) -> E + Send + 'static,
) -> TransportEvent<TOut, E> {
match self {
TransportEvent::Incoming {
listener_id,
Expand All @@ -357,7 +346,7 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
send_back_addr,
} => TransportEvent::Incoming {
listener_id,
upgrade,
upgrade: upgrade.map_err(map_err).boxed(),
local_addr,
send_back_addr,
},
Expand Down Expand Up @@ -399,7 +388,7 @@ impl<TUpgr, TErr> TransportEvent<TUpgr, TErr> {
///
/// Returns `None` if the event is not actually an incoming connection,
/// otherwise the upgrade and the remote address.
pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> {
pub fn into_incoming(self) -> Option<(BoxFuture<'static, Result<TOut, TErr>>, Multiaddr)> {
if let TransportEvent::Incoming {
upgrade,
send_back_addr,
Expand Down
27 changes: 16 additions & 11 deletions core/src/transport/and_then.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
transport::{ListenerId, Transport, TransportError, TransportEvent},
};
use either::Either;
use futures::future::BoxFuture;
use futures::prelude::*;
use multiaddr::Multiaddr;
use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll};
Expand All @@ -45,14 +46,12 @@ impl<T, C> AndThen<T, C> {
impl<T, C, F, O> Transport for AndThen<T, C>
where
T: Transport,
C: FnOnce(T::Output, ConnectedPoint) -> F + Clone,
F: TryFuture<Ok = O>,
C: FnOnce(T::Output, ConnectedPoint) -> F + Clone + Send + 'static,
F: TryFuture<Ok = O> + Send + 'static,
F::Error: error::Error,
{
type Output = O;
type Error = Either<T::Error, F::Error>;
type ListenerUpgrade = AndThenFuture<T::ListenerUpgrade, C, F>;
type Dial = AndThenFuture<T::Dial, C, F>;

fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<Self::Error>> {
self.transport
Expand All @@ -64,7 +63,11 @@ where
self.transport.remove_listener(id)
}

fn dial(&mut self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
fn dial(
&mut self,
addr: Multiaddr,
) -> Result<BoxFuture<'static, Result<Self::Output, Self::Error>>, TransportError<Self::Error>>
{
let dialed_fut = self
.transport
.dial(addr.clone())
Expand All @@ -80,13 +83,14 @@ where
)),
_marker: PhantomPinned,
};
Ok(future)
Ok(future.boxed())
}

fn dial_as_listener(
&mut self,
addr: Multiaddr,
) -> Result<Self::Dial, TransportError<Self::Error>> {
) -> Result<BoxFuture<'static, Result<Self::Output, Self::Error>>, TransportError<Self::Error>>
{
let dialed_fut = self
.transport
.dial_as_listener(addr.clone())
Expand All @@ -102,7 +106,7 @@ where
)),
_marker: PhantomPinned,
};
Ok(future)
Ok(future.boxed())
}

fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
Expand All @@ -112,7 +116,7 @@ where
fn poll(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
) -> Poll<TransportEvent<Self::Output, Self::Error>> {
let this = self.project();
match this.transport.poll(cx) {
Poll::Ready(TransportEvent::Incoming {
Expand All @@ -131,14 +135,15 @@ where
inner: Either::Left(Box::pin(upgrade)),
args: Some((this.fun.clone(), point)),
_marker: PhantomPinned,
},
}
.boxed(),
local_addr,
send_back_addr,
})
}
Poll::Ready(other) => {
let mapped = other
.map_upgrade(|_upgrade| unreachable!("case already matched"))
.map_out(|_| unreachable!("case already matched"))
.map_err(Either::Left);
Poll::Ready(mapped)
}
Expand Down
Loading