diff --git a/core/src/either.rs b/core/src/either.rs index 8049717a878..caa19837b57 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -25,6 +25,7 @@ use crate::{ Multiaddr, ProtocolName, }; use either::Either; +use futures::future::BoxFuture; use futures::prelude::*; use pin_project::pin_project; use std::{pin::Pin, task::Context, task::Poll}; @@ -137,27 +138,23 @@ where { type Output = future::Either; type Error = Either; - type ListenerUpgrade = EitherFuture; - type Dial = EitherFuture; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { match self.as_pin_mut() { Either::Left(a) => match a.poll(cx) { Poll::Pending => Poll::Pending, Poll::Ready(event) => { - Poll::Ready(event.map_upgrade(EitherFuture::First).map_err(Either::Left)) + Poll::Ready(event.map_out(future::Either::Left).map_err(Either::Left)) } }, Either::Right(b) => match b.poll(cx) { Poll::Pending => Poll::Pending, - Poll::Ready(event) => Poll::Ready( - event - .map_upgrade(EitherFuture::Second) - .map_err(Either::Right), - ), + Poll::Ready(event) => { + Poll::Ready(event.map_out(future::Either::Right).map_err(Either::Right)) + } }, } } @@ -183,16 +180,26 @@ where } } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { use TransportError::*; match self { Either::Left(a) => match a.dial(addr) { - Ok(connec) => Ok(EitherFuture::First(connec)), + Ok(connec) => Ok(connec + .map_ok(future::Either::Left) + .map_err(Either::Left) + .boxed()), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(Either::Left(err))), }, Either::Right(b) => match b.dial(addr) { - Ok(connec) => Ok(EitherFuture::Second(connec)), + Ok(connec) => Ok(connec + .map_ok(future::Either::Right) + .map_err(Either::Right) + .boxed()), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(Either::Right(err))), }, @@ -202,19 +209,25 @@ where fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> + ) -> Result>, TransportError> where Self: Sized, { use TransportError::*; match self { Either::Left(a) => match a.dial_as_listener(addr) { - Ok(connec) => Ok(EitherFuture::First(connec)), + Ok(connec) => Ok(connec + .map_ok(future::Either::Left) + .map_err(Either::Left) + .boxed()), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(Either::Left(err))), }, Either::Right(b) => match b.dial_as_listener(addr) { - Ok(connec) => Ok(EitherFuture::Second(connec)), + Ok(connec) => Ok(connec + .map_ok(future::Either::Right) + .map_err(Either::Right) + .boxed()), Err(MultiaddrNotSupported(addr)) => Err(MultiaddrNotSupported(addr)), Err(Other(err)) => Err(Other(Either::Right(err))), }, diff --git a/core/src/transport.rs b/core/src/transport.rs index 04196efca13..93cca7e2c6c 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -25,6 +25,7 @@ //! any desired protocols. The rest of the module defines combinators for //! modifying a transport through composition with other transports or protocol upgrades. +use futures::future::BoxFuture; use futures::prelude::*; use multiaddr::Multiaddr; use std::{ @@ -93,21 +94,6 @@ pub trait Transport { /// An error that occurred during connection setup. type Error: Error; - /// A pending [`Output`](Transport::Output) for an inbound connection, - /// obtained from the [`Transport`] stream. - /// - /// After a connection has been accepted by the transport, it may need to go through - /// asynchronous post-processing (i.e. protocol upgrade negotiations). Such - /// post-processing should not block the `Listener` from producing the next - /// connection, hence further connection setup proceeds asynchronously. - /// Once a `ListenerUpgrade` future resolves it yields the [`Output`](Transport::Output) - /// of the connection setup process. - type ListenerUpgrade: Future>; - - /// A pending [`Output`](Transport::Output) for an outbound connection, - /// obtained from [dialing](Transport::dial). - type Dial: Future>; - /// Listens on the given [`Multiaddr`] for inbound connections. fn listen_on(&mut self, addr: Multiaddr) -> Result>; @@ -121,7 +107,10 @@ pub trait Transport { /// /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to /// try an alternative [`Transport`], if available. - fn dial(&mut self, addr: Multiaddr) -> Result>; + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError>; /// As [`Transport::dial`] but has the local node act as a listener on the outgoing connection. /// @@ -131,7 +120,7 @@ pub trait Transport { fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result>; + ) -> Result>, TransportError>; /// Poll for [`TransportEvent`]s. /// @@ -147,7 +136,7 @@ pub trait Transport { fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll>; + ) -> Poll>; /// Performs a transport-specific mapping of an address `observed` by a remote onto a /// local `listen` address to yield an address for the local node that may be reachable @@ -171,8 +160,6 @@ pub trait Transport { fn boxed(self) -> boxed::Boxed where Self: Sized + Send + Unpin + 'static, - Self::Dial: Send + 'static, - Self::ListenerUpgrade: Send + 'static, Self::Error: Send + Sync, { boxed::boxed(self) @@ -220,7 +207,7 @@ pub trait Transport { fn and_then(self, f: C) -> and_then::AndThen where Self: Sized, - C: FnOnce(Self::Output, ConnectedPoint) -> F, + C: FnOnce(Self::Output, ConnectedPoint) -> F + Send + 'static, F: TryFuture, ::Error: Error + 'static, { @@ -256,7 +243,7 @@ impl Default for ListenerId { } /// Event produced by [`Transport`]s. -pub enum TransportEvent { +pub enum TransportEvent { /// A new address is being listened on. NewAddress { /// The listener that is listening on the new address. @@ -276,7 +263,7 @@ pub enum TransportEvent { /// The listener that produced the upgrade. listener_id: ListenerId, /// The produced upgrade. - upgrade: TUpgr, + upgrade: BoxFuture<'static, Result>, /// Local connection address. local_addr: Multiaddr, /// Address used to send back data to the incoming client. @@ -302,10 +289,9 @@ pub enum TransportEvent { }, } -impl TransportEvent { - /// In case this [`TransportEvent`] is an upgrade, apply the given function - /// to the upgrade and produce another transport event based the the function's result. - pub fn map_upgrade(self, map: impl FnOnce(TUpgr) -> U) -> TransportEvent { +impl TransportEvent { + /// TODO + pub fn map_out(self, map: impl Fn(TOut) -> U + Send + 'static) -> TransportEvent { match self { TransportEvent::Incoming { listener_id, @@ -314,7 +300,7 @@ impl TransportEvent { send_back_addr, } => TransportEvent::Incoming { listener_id, - upgrade: map(upgrade), + upgrade: upgrade.map_ok(map).boxed(), local_addr, send_back_addr, }, @@ -348,7 +334,10 @@ impl TransportEvent { /// In case this [`TransportEvent`] is an [`ListenerError`](TransportEvent::ListenerError), /// or [`ListenerClosed`](TransportEvent::ListenerClosed) apply the given function to the /// error and produce another transport event based on the function's result. - pub fn map_err(self, map_err: impl FnOnce(TErr) -> E) -> TransportEvent { + pub fn map_err( + self, + map_err: impl FnOnce(TErr) -> E + Send + 'static, + ) -> TransportEvent { match self { TransportEvent::Incoming { listener_id, @@ -357,7 +346,7 @@ impl TransportEvent { send_back_addr, } => TransportEvent::Incoming { listener_id, - upgrade, + upgrade: upgrade.map_err(map_err).boxed(), local_addr, send_back_addr, }, @@ -399,7 +388,7 @@ impl TransportEvent { /// /// Returns `None` if the event is not actually an incoming connection, /// otherwise the upgrade and the remote address. - pub fn into_incoming(self) -> Option<(TUpgr, Multiaddr)> { + pub fn into_incoming(self) -> Option<(BoxFuture<'static, Result>, Multiaddr)> { if let TransportEvent::Incoming { upgrade, send_back_addr, diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index fb5280568ea..8cd412bc00e 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -23,6 +23,7 @@ use crate::{ transport::{ListenerId, Transport, TransportError, TransportEvent}, }; use either::Either; +use futures::future::BoxFuture; use futures::prelude::*; use multiaddr::Multiaddr; use std::{error, marker::PhantomPinned, pin::Pin, task::Context, task::Poll}; @@ -45,14 +46,12 @@ impl AndThen { impl Transport for AndThen where T: Transport, - C: FnOnce(T::Output, ConnectedPoint) -> F + Clone, - F: TryFuture, + C: FnOnce(T::Output, ConnectedPoint) -> F + Clone + Send + 'static, + F: TryFuture + Send + 'static, F::Error: error::Error, { type Output = O; type Error = Either; - type ListenerUpgrade = AndThenFuture; - type Dial = AndThenFuture; fn listen_on(&mut self, addr: Multiaddr) -> Result> { self.transport @@ -64,7 +63,11 @@ where self.transport.remove_listener(id) } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let dialed_fut = self .transport .dial(addr.clone()) @@ -80,13 +83,14 @@ where )), _marker: PhantomPinned, }; - Ok(future) + Ok(future.boxed()) } fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { let dialed_fut = self .transport .dial_as_listener(addr.clone()) @@ -102,7 +106,7 @@ where )), _marker: PhantomPinned, }; - Ok(future) + Ok(future.boxed()) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { @@ -112,7 +116,7 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); match this.transport.poll(cx) { Poll::Ready(TransportEvent::Incoming { @@ -131,14 +135,15 @@ where inner: Either::Left(Box::pin(upgrade)), args: Some((this.fun.clone(), point)), _marker: PhantomPinned, - }, + } + .boxed(), local_addr, send_back_addr, }) } Poll::Ready(other) => { let mapped = other - .map_upgrade(|_upgrade| unreachable!("case already matched")) + .map_out(|_| unreachable!("case already matched")) .map_err(Either::Left); Poll::Ready(mapped) } diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index b2560c4a662..93e15ecd8fd 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; +use futures::future::BoxFuture; use futures::{prelude::*, stream::FusedStream}; use multiaddr::Multiaddr; use std::{ @@ -33,8 +34,6 @@ pub fn boxed(transport: T) -> Boxed where T: Transport + Send + Unpin + 'static, T::Error: Send + Sync, - T::Dial: Send + 'static, - T::ListenerUpgrade: Send + 'static, { Boxed { inner: Box::new(transport) as Box<_>, @@ -48,27 +47,25 @@ pub struct Boxed { inner: Box + Send + Unpin>, } -type Dial = Pin> + Send>>; -type ListenerUpgrade = Pin> + Send>>; - trait Abstract { fn listen_on(&mut self, addr: Multiaddr) -> Result>; fn remove_listener(&mut self, id: ListenerId) -> bool; - fn dial(&mut self, addr: Multiaddr) -> Result, TransportError>; - fn dial_as_listener(&mut self, addr: Multiaddr) -> Result, TransportError>; + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError>; + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError>; fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, io::Error>>; + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; } impl Abstract for T where T: Transport + 'static, T::Error: Send + Sync, - T::Dial: Send + 'static, - T::ListenerUpgrade: Send + 'static, { fn listen_on(&mut self, addr: Multiaddr) -> Result> { Transport::listen_on(self, addr).map_err(|e| e.map(box_err)) @@ -78,36 +75,32 @@ where Transport::remove_listener(self, id) } - fn dial(&mut self, addr: Multiaddr) -> Result, TransportError> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> { let fut = Transport::dial(self, addr) .map(|r| r.map_err(box_err)) .map_err(|e| e.map(box_err))?; - Ok(Box::pin(fut) as Dial<_>) + Ok(fut.boxed()) } - fn dial_as_listener(&mut self, addr: Multiaddr) -> Result, TransportError> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> { let fut = Transport::dial_as_listener(self, addr) .map(|r| r.map_err(box_err)) .map_err(|e| e.map(box_err))?; - Ok(Box::pin(fut) as Dial<_>) + Ok(Box::pin(fut) as BoxFuture<'static, io::Result>) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { Transport::address_translation(self, server, observed) } - fn poll( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll, io::Error>> { - self.poll(cx).map(|event| { - event - .map_upgrade(|upgrade| { - let up = upgrade.map_err(box_err); - Box::pin(up) as ListenerUpgrade - }) - .map_err(box_err) - }) + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll(cx).map(|event| event.map_err(box_err)) } } @@ -120,8 +113,6 @@ impl fmt::Debug for Boxed { impl Transport for Boxed { type Output = O; type Error = io::Error; - type ListenerUpgrade = ListenerUpgrade; - type Dial = Dial; fn listen_on(&mut self, addr: Multiaddr) -> Result> { self.inner.listen_on(addr) @@ -131,14 +122,19 @@ impl Transport for Boxed { self.inner.remove_listener(id) } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { self.inner.dial(addr) } fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { self.inner.dial_as_listener(addr) } @@ -149,13 +145,13 @@ impl Transport for Boxed { fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Pin::new(self.inner.as_mut()).poll(cx) } } impl Stream for Boxed { - type Item = TransportEvent, io::Error>; + type Item = TransportEvent; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Transport::poll(self, cx).map(Some) diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index bb7d542d292..49e970e69ab 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -18,10 +18,11 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::either::EitherFuture; use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; use either::Either; use futures::future; +use futures::future::BoxFuture; +use futures::{FutureExt, TryFutureExt}; use multiaddr::Multiaddr; use std::{pin::Pin, task::Context, task::Poll}; @@ -43,8 +44,6 @@ where { type Output = future::Either; type Error = Either; - type ListenerUpgrade = EitherFuture; - type Dial = EitherFuture; fn listen_on(&mut self, addr: Multiaddr) -> Result> { let addr = match self.0.listen_on(addr) { @@ -64,9 +63,18 @@ where self.0.remove_listener(id) || self.1.remove_listener(id) } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let addr = match self.0.dial(addr) { - Ok(connec) => return Ok(EitherFuture::First(connec)), + Ok(connec) => { + return Ok(connec + .map_ok(future::Either::Left) + .map_err(Either::Left) + .boxed()) + } Err(TransportError::MultiaddrNotSupported(addr)) => addr, Err(TransportError::Other(err)) => { return Err(TransportError::Other(Either::Left(err))) @@ -74,7 +82,12 @@ where }; let addr = match self.1.dial(addr) { - Ok(connec) => return Ok(EitherFuture::Second(connec)), + Ok(connec) => { + return Ok(connec + .map_ok(future::Either::Right) + .map_err(Either::Right) + .boxed()) + } Err(TransportError::MultiaddrNotSupported(addr)) => addr, Err(TransportError::Other(err)) => { return Err(TransportError::Other(Either::Right(err))) @@ -87,9 +100,15 @@ where fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { let addr = match self.0.dial_as_listener(addr) { - Ok(connec) => return Ok(EitherFuture::First(connec)), + Ok(connec) => { + return Ok(connec + .map_ok(future::Either::Left) + .map_err(Either::Left) + .boxed()) + } Err(TransportError::MultiaddrNotSupported(addr)) => addr, Err(TransportError::Other(err)) => { return Err(TransportError::Other(Either::Left(err))) @@ -97,7 +116,12 @@ where }; let addr = match self.1.dial_as_listener(addr) { - Ok(connec) => return Ok(EitherFuture::Second(connec)), + Ok(connec) => { + return Ok(connec + .map_ok(future::Either::Right) + .map_err(Either::Right) + .boxed()) + } Err(TransportError::MultiaddrNotSupported(addr)) => addr, Err(TransportError::Other(err)) => { return Err(TransportError::Other(Either::Right(err))) @@ -118,17 +142,17 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); match this.0.poll(cx) { Poll::Ready(ev) => { - return Poll::Ready(ev.map_upgrade(EitherFuture::First).map_err(Either::Left)) + return Poll::Ready(ev.map_out(future::Either::Left).map_err(Either::Left)) } Poll::Pending => {} } match this.1.poll(cx) { Poll::Ready(ev) => { - return Poll::Ready(ev.map_upgrade(EitherFuture::Second).map_err(Either::Right)) + return Poll::Ready(ev.map_out(future::Either::Right).map_err(Either::Right)) } Poll::Pending => {} } diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index a7d1cab9089..1eaa428a076 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -20,6 +20,7 @@ use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; use crate::Multiaddr; +use futures::future::BoxFuture; use futures::{prelude::*, task::Context, task::Poll}; use std::{fmt, io, marker::PhantomData, pin::Pin}; @@ -56,8 +57,6 @@ impl Clone for DummyTransport { impl Transport for DummyTransport { type Output = TOut; type Error = io::Error; - type ListenerUpgrade = futures::future::Pending>; - type Dial = futures::future::Pending>; fn listen_on(&mut self, addr: Multiaddr) -> Result> { Err(TransportError::MultiaddrNotSupported(addr)) @@ -67,14 +66,19 @@ impl Transport for DummyTransport { false } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { Err(TransportError::MultiaddrNotSupported(addr)) } fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { Err(TransportError::MultiaddrNotSupported(addr)) } @@ -85,7 +89,7 @@ impl Transport for DummyTransport { fn poll( self: Pin<&mut Self>, _: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { Poll::Pending } } diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index 50f7b826d36..de8eef820f2 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -22,6 +22,7 @@ use crate::{ connection::{ConnectedPoint, Endpoint}, transport::{Transport, TransportError, TransportEvent}, }; +use futures::future::BoxFuture; use futures::prelude::*; use multiaddr::Multiaddr; use std::{pin::Pin, task::Context, task::Poll}; @@ -54,12 +55,10 @@ impl Map { impl Transport for Map where T: Transport, - F: FnOnce(T::Output, ConnectedPoint) -> D + Clone, + F: FnOnce(T::Output, ConnectedPoint) -> D + Clone + Send + 'static, { type Output = D; type Error = T::Error; - type ListenerUpgrade = MapFuture; - type Dial = MapFuture; fn listen_on(&mut self, addr: Multiaddr) -> Result> { self.transport.listen_on(addr) @@ -69,31 +68,46 @@ where self.transport.remove_listener(id) } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let future = self.transport.dial(addr.clone())?; - let p = ConnectedPoint::Dialer { - address: addr, - role_override: Endpoint::Dialer, - }; - Ok(MapFuture { - inner: future, - args: Some((self.fun.clone(), p)), - }) + let f = self.fun.clone(); + + Ok(future + .map_ok(|o| { + f( + o, + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Dialer, + }, + ) + }) + .boxed()) } fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { - let future = self.transport.dial_as_listener(addr.clone())?; - let p = ConnectedPoint::Dialer { - address: addr, - role_override: Endpoint::Listener, - }; - Ok(MapFuture { - inner: future, - args: Some((self.fun.clone(), p)), - }) + ) -> Result>, TransportError> + { + let future = self.transport.dial(addr.clone())?; + let f = self.fun.clone(); + + Ok(future + .map_ok(|o| { + f( + o, + ConnectedPoint::Dialer { + address: addr, + role_override: Endpoint::Listener, + }, + ) + }) + .boxed()) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { @@ -103,7 +117,7 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); match this.transport.poll(cx) { Poll::Ready(TransportEvent::Incoming { @@ -121,15 +135,13 @@ where upgrade: MapFuture { inner: upgrade, args: Some((this.fun.clone(), point)), - }, + } + .boxed(), local_addr, send_back_addr, }) } - Poll::Ready(other) => { - let mapped = other.map_upgrade(|_upgrade| unreachable!("case already matched")); - Poll::Ready(mapped) - } + Poll::Ready(other) => Poll::Ready(todo!()), Poll::Pending => Poll::Pending, } } diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index 99f2912447f..ef76ea4adef 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; +use futures::future::BoxFuture; use futures::prelude::*; use multiaddr::Multiaddr; use std::{error, pin::Pin, task::Context, task::Poll}; @@ -42,13 +43,11 @@ impl MapErr { impl Transport for MapErr where T: Transport, - F: FnOnce(T::Error) -> TErr + Clone, + F: FnOnce(T::Error) -> TErr + Clone + Send + 'static, TErr: error::Error, { type Output = T::Output; type Error = TErr; - type ListenerUpgrade = MapErrListenerUpgrade; - type Dial = MapErrDial; fn listen_on(&mut self, addr: Multiaddr) -> Result> { let map = self.map.clone(); @@ -59,13 +58,14 @@ where self.transport.remove_listener(id) } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let map = self.map.clone(); match self.transport.dial(addr) { - Ok(future) => Ok(MapErrDial { - inner: future, - map: Some(map), - }), + Ok(future) => Ok(future.map_err(map).boxed()), Err(err) => Err(err.map(map)), } } @@ -73,13 +73,11 @@ where fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { let map = self.map.clone(); - match self.transport.dial_as_listener(addr) { - Ok(future) => Ok(MapErrDial { - inner: future, - map: Some(map), - }), + match self.transport.dial(addr) { + Ok(future) => Ok(future.map_err(map).boxed()), Err(err) => Err(err.map(map)), } } @@ -91,71 +89,9 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); let map = &*this.map; - this.transport.poll(cx).map(|ev| { - ev.map_upgrade(move |value| MapErrListenerUpgrade { - inner: value, - map: Some(map.clone()), - }) - .map_err(map.clone()) - }) - } -} - -/// Listening upgrade future for `MapErr`. -#[pin_project::pin_project] -pub struct MapErrListenerUpgrade { - #[pin] - inner: T::ListenerUpgrade, - map: Option, -} - -impl Future for MapErrListenerUpgrade -where - T: Transport, - F: FnOnce(T::Error) -> TErr, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match Future::poll(this.inner, cx) { - Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)), - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => { - let map = this.map.take().expect("poll() called again after error"); - Poll::Ready(Err(map(err))) - } - } - } -} - -/// Dialing future for `MapErr`. -#[pin_project::pin_project] -pub struct MapErrDial { - #[pin] - inner: T::Dial, - map: Option, -} - -impl Future for MapErrDial -where - T: Transport, - F: FnOnce(T::Error) -> TErr, -{ - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match Future::poll(this.inner, cx) { - Poll::Ready(Ok(value)) => Poll::Ready(Ok(value)), - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => { - let map = this.map.take().expect("poll() called again after error"); - Poll::Ready(Err(map(err))) - } - } + this.transport.poll(cx).map(|ev| ev.map_err(*map)) } } diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 65105b42b93..55bc42d143f 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -20,9 +20,10 @@ use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; use fnv::FnvHashMap; +use futures::future::BoxFuture; use futures::{ channel::mpsc, - future::{self, Ready}, + future::{self}, prelude::*, task::Context, task::Poll, @@ -176,8 +177,6 @@ impl Future for DialFuture { impl Transport for MemoryTransport { type Output = Channel>; type Error = MemoryTransportError; - type ListenerUpgrade = Ready>; - type Dial = DialFuture; fn listen_on(&mut self, addr: Multiaddr) -> Result> { let port = if let Ok(port) = parse_memory_addr(&addr) { @@ -216,7 +215,11 @@ impl Transport for MemoryTransport { } } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let port = if let Ok(port) = parse_memory_addr(&addr) { if let Some(port) = NonZeroU64::new(port) { port @@ -227,14 +230,17 @@ impl Transport for MemoryTransport { return Err(TransportError::MultiaddrNotSupported(addr)); }; - DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) + Ok(DialFuture::new(port) + .ok_or(TransportError::Other(MemoryTransportError::Unreachable))? + .boxed()) } fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { - self.dial(addr) + ) -> Result>, TransportError> + { + Ok(self.dial(addr)?.boxed()) } fn address_translation(&self, _server: &Multiaddr, _observed: &Multiaddr) -> Option { @@ -244,7 +250,7 @@ impl Transport for MemoryTransport { fn poll( mut self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> + ) -> Poll> where Self: Sized, { @@ -265,7 +271,7 @@ impl Transport for MemoryTransport { Poll::Pending => None, Poll::Ready(Some((channel, dial_port))) => Some(TransportEvent::Incoming { listener_id: listener.id, - upgrade: future::ready(Ok(channel)), + upgrade: future::ready(Ok(channel)).boxed(), local_addr: listener.addr.clone(), send_back_addr: Protocol::Memory(dial_port.get()).into(), }), diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index 2d93077659c..d12a9699480 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -19,6 +19,7 @@ // DEALINGS IN THE SOFTWARE. use crate::transport::{ListenerId, Transport, TransportError, TransportEvent}; +use futures::future::BoxFuture; use multiaddr::Multiaddr; use std::{pin::Pin, task::Context, task::Poll}; @@ -57,8 +58,6 @@ where { type Output = T::Output; type Error = T::Error; - type ListenerUpgrade = T::ListenerUpgrade; - type Dial = T::Dial; fn listen_on(&mut self, addr: Multiaddr) -> Result> { if let Some(inner) = self.0.as_mut() { @@ -76,7 +75,11 @@ where } } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { if let Some(inner) = self.0.as_mut() { inner.dial(addr) } else { @@ -87,7 +90,8 @@ where fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { if let Some(inner) = self.0.as_mut() { inner.dial_as_listener(addr) } else { @@ -106,7 +110,7 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { if let Some(inner) = self.project().0.as_pin_mut() { inner.poll(cx) } else { diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index c796e6f0775..24beae5437b 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -28,6 +28,7 @@ use crate::{ transport::{ListenerId, TransportError, TransportEvent}, Multiaddr, Transport, }; +use futures::future::BoxFuture; use futures::prelude::*; use futures_timer::Delay; use std::{error, fmt, io, pin::Pin, task::Context, task::Poll, time::Duration}; @@ -82,8 +83,6 @@ where { type Output = InnerTrans::Output; type Error = TransportTimeoutError; - type ListenerUpgrade = Timeout; - type Dial = Timeout; fn listen_on(&mut self, addr: Multiaddr) -> Result> { self.inner @@ -95,7 +94,11 @@ where self.inner.remove_listener(id) } - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let dial = self .inner .dial(addr) @@ -103,13 +106,15 @@ where Ok(Timeout { inner: dial, timer: Delay::new(self.outgoing_timeout), - }) + } + .boxed()) } fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { let dial = self .inner .dial_as_listener(addr) @@ -117,7 +122,8 @@ where Ok(Timeout { inner: dial, timer: Delay::new(self.outgoing_timeout), - }) + } + .boxed()) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { @@ -127,17 +133,12 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); let timeout = *this.incoming_timeout; - this.inner.poll(cx).map(|event| { - event - .map_upgrade(move |inner_fut| Timeout { - inner: inner_fut, - timer: Delay::new(timeout), - }) - .map_err(TransportTimeoutError::Other) - }) + this.inner + .poll(cx) + .map(|event| event.map_err(TransportTimeoutError::Other)) } } diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 4f28296065b..aecd773341e 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -33,8 +33,9 @@ use crate::{ self, apply_inbound, apply_outbound, InboundUpgrade, InboundUpgradeApply, OutboundUpgrade, OutboundUpgradeApply, UpgradeError, }, - Negotiated, PeerId, + Negotiated, PeerId, UpgradeInfo, }; +use futures::future::BoxFuture; use futures::{prelude::*, ready}; use multiaddr::Multiaddr; use std::{ @@ -98,10 +99,14 @@ where ) -> Authenticated Authenticate + Clone>> where T: Transport, - C: AsyncRead + AsyncWrite + Unpin, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = (PeerId, D), Error = E>, - U: OutboundUpgrade, Output = (PeerId, D), Error = E> + Clone, + U: InboundUpgrade, Output = (PeerId, D), Error = E> + Send + 'static, + U: OutboundUpgrade, Output = (PeerId, D), Error = E> + Clone + Send + 'static, + ::Info: Send + 'static, + <::InfoIter as IntoIterator>::IntoIter: Send + 'static, + >>::Future: Send + 'static, + >>::Future: Send + 'static, E: Error + 'static, { let version = self.version; @@ -205,10 +210,14 @@ where pub fn apply(self, upgrade: U) -> Authenticated> where T: Transport, - C: AsyncRead + AsyncWrite + Unpin, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, D: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, + U: InboundUpgrade, Output = D, Error = E> + Send + 'static, + U: OutboundUpgrade, Output = D, Error = E> + Clone + Send + 'static, + ::Info: Send + 'static, + <::InfoIter as IntoIterator>::IntoIter: Send + 'static, + >>::Future: Send + 'static, + >>::Future: Send + 'static, E: Error + 'static, { Authenticated(Builder::new( @@ -235,8 +244,8 @@ where T: Transport, C: AsyncRead + AsyncWrite + Unpin, M: StreamMuxer, - U: InboundUpgrade, Output = M, Error = E>, - U: OutboundUpgrade, Output = M, Error = E> + Clone, + U: InboundUpgrade, Output = M, Error = E> + Send + 'static, + U: OutboundUpgrade, Output = M, Error = E> + Clone + Send + 'static, E: Error + 'static, { let version = self.0.version; @@ -271,7 +280,7 @@ where U: InboundUpgrade, Output = M, Error = E>, U: OutboundUpgrade, Output = M, Error = E> + Clone, E: Error + 'static, - F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone, + F: for<'a> FnOnce(&'a PeerId, &'a ConnectedPoint) -> U + Clone + Send + 'static, { let version = self.0.version; Multiplexed(self.0.inner.and_then(move |(peer_id, c), endpoint| { @@ -296,8 +305,6 @@ impl Multiplexed { pub fn boxed(self) -> super::Boxed<(PeerId, StreamMuxerBox)> where T: Transport + Sized + Send + Unpin + 'static, - T::Dial: Send + 'static, - T::ListenerUpgrade: Send + 'static, T::Error: Send + Sync, M: StreamMuxer + Send + 'static, M::Substream: Send + 'static, @@ -331,10 +338,12 @@ where { type Output = T::Output; type Error = T::Error; - type ListenerUpgrade = T::ListenerUpgrade; - type Dial = T::Dial; - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { self.0.dial(addr) } @@ -345,7 +354,8 @@ where fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { self.0.dial_as_listener(addr) } @@ -360,7 +370,7 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { self.project().0.poll(cx) } } @@ -389,17 +399,23 @@ impl Transport for Upgrade where T: Transport, T::Error: 'static, - C: AsyncRead + AsyncWrite + Unpin, - U: InboundUpgrade, Output = D, Error = E>, - U: OutboundUpgrade, Output = D, Error = E> + Clone, + C: AsyncRead + AsyncWrite + Unpin + Send + 'static, + U: InboundUpgrade, Output = D, Error = E> + Send + 'static, + U: OutboundUpgrade, Output = D, Error = E> + Clone + Send + 'static, + ::Info: Send + 'static, + <::InfoIter as IntoIterator>::IntoIter: Send + 'static, + >>::Future: Send + 'static, + >>::Future: Send + 'static, E: Error + 'static, { type Output = (PeerId, D); type Error = TransportUpgradeError; - type ListenerUpgrade = ListenerUpgradeFuture; - type Dial = DialUpgradeFuture; - fn dial(&mut self, addr: Multiaddr) -> Result> { + fn dial( + &mut self, + addr: Multiaddr, + ) -> Result>, TransportError> + { let future = self .inner .dial(addr) @@ -407,7 +423,8 @@ where Ok(DialUpgradeFuture { future: Box::pin(future), upgrade: future::Either::Left(Some(self.upgrade.clone())), - }) + } + .boxed()) } fn remove_listener(&mut self, id: ListenerId) -> bool { @@ -417,7 +434,8 @@ where fn dial_as_listener( &mut self, addr: Multiaddr, - ) -> Result> { + ) -> Result>, TransportError> + { let future = self .inner .dial_as_listener(addr) @@ -425,7 +443,8 @@ where Ok(DialUpgradeFuture { future: Box::pin(future), upgrade: future::Either::Left(Some(self.upgrade.clone())), - }) + } + .boxed()) } fn listen_on(&mut self, addr: Multiaddr) -> Result> { @@ -441,16 +460,18 @@ where fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); let upgrade = this.upgrade.clone(); this.inner.poll(cx).map(|event| { - event - .map_upgrade(move |future| ListenerUpgradeFuture { - future: Box::pin(future), - upgrade: future::Either::Left(Some(upgrade)), - }) - .map_err(TransportUpgradeError::Transport) + todo!("event.and_then(upgrade)") + // + // event + // .map_upgrade(move |future| ListenerUpgradeFuture { + // future: Box::pin(future), + // upgrade: future::Either::Left(Some(upgrade)), + // }) + // .map_err(TransportUpgradeError::Transport) }) } } diff --git a/interop-tests/README.md b/interop-tests/README.md index 5327ca05568..34b3fe730ce 100644 --- a/interop-tests/README.md +++ b/interop-tests/README.md @@ -14,7 +14,7 @@ can dial/listen for ourselves we can do the following: transport=quic-v1 security=quic muxer=quic is_dialer="true" cargo run --bin ping` 3. In another terminal, run the listener: `REDIS_ADDR=localhost:6379 ip="0.0.0.0" transport=quic-v1 security=quic muxer=quic is_dialer="false" cargo run --bin ping` - +\ To test the interop with other versions do something similar, except replace one of these nodes with the other version's interop test. diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 8ea3967ccbc..1d80be745c3 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -231,7 +231,7 @@ where let mut inner = self.inner.lock(); Transport::poll(Pin::new(inner.deref_mut()), cx).map(|event| { event - .map_upgrade(|upgr| upgr.map_err::<_, fn(_) -> _>(DnsErr::Transport)) + .map_out(|upgr| upgr.map_err::<_, fn(_) -> _>(DnsErr::Transport)) .map_err(DnsErr::Transport) }) }