diff --git a/src/multiplex/client.rs b/src/multiplex/client.rs index c3fd6ea..175dacd 100644 --- a/src/multiplex/client.rs +++ b/src/multiplex/client.rs @@ -1,5 +1,4 @@ use crate::MakeTransport; -use futures::future; use futures::sync::oneshot; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use std::collections::VecDeque; @@ -36,7 +35,6 @@ pub trait Transport: Sink + Stream + TagStore pub struct Maker { t_maker: NT, _req: PhantomData, - in_flight: Option, } impl Maker { @@ -45,15 +43,8 @@ impl Maker { Maker { t_maker: t, _req: PhantomData, - in_flight: None, } } - - /// Limit each new `Client` instance to `in_flight` pending requests. - pub fn with_limit(mut self, in_flight: usize) -> Self { - self.in_flight = Some(in_flight); - self - } } /// A `Future` that will resolve into a `Buffer>`. @@ -62,7 +53,6 @@ where NT: MakeTransport, { maker: Option, - in_flight: Option, } /// A failure to spawn a new `Client`. @@ -96,11 +86,7 @@ where None => unreachable!("poll called after future resolved"), Some(mut fut) => match fut.poll().map_err(SpawnError::Inner)? { Async::Ready(t) => { - let c = if let Some(f) = self.in_flight { - Client::with_limit(t, f) - } else { - Client::new(t) - }; + let c = Client::new(t); Ok(Async::Ready( Buffer::new_direct(c, 0, &DefaultExecutor::current()) @@ -133,7 +119,6 @@ where fn call(&mut self, target: Target) -> Self::Future { NewSpawnedClientFuture { maker: Some(self.t_maker.make_transport(target)), - in_flight: self.in_flight.clone(), } } @@ -155,7 +140,6 @@ where responses: VecDeque<(T::Tag, oneshot::Sender)>, transport: T, - max_in_flight: Option, in_flight: usize, finish: bool, @@ -269,24 +253,6 @@ where requests: VecDeque::default(), responses: VecDeque::default(), transport, - max_in_flight: None, - in_flight: 0, - error: PhantomData::, - finish: false, - } - } - - /// Construct a new [`Client`] over the given `transport` with a maxmimum limit on the number - /// of in-flight requests. - /// - /// Note that setting the limit to 1 implies that for each `Request`, the `Response` must be - /// received before another request is sent on the same transport. - pub fn with_limit(transport: T, max_in_flight: usize) -> Self { - Client { - requests: VecDeque::with_capacity(max_in_flight), - responses: VecDeque::with_capacity(max_in_flight), - transport, - max_in_flight: Some(max_in_flight), in_flight: 0, error: PhantomData::, finish: false, @@ -307,17 +273,7 @@ where type Future = Box + Send>; fn poll_ready(&mut self) -> Result, Self::Error> { - if let Some(mif) = self.max_in_flight { - if self.in_flight + self.requests.len() >= mif { - // not enough request slots -- need to handle some outstanding - self.poll_service()?; - - if self.in_flight + self.requests.len() >= mif { - // that didn't help -- wait to be awoken again - return Ok(Async::NotReady); - } - } - } + // NOTE: it'd be great if we could poll_ready the Sink, but alas.. return Ok(Async::Ready(())); } @@ -409,18 +365,14 @@ where } fn call(&mut self, mut req: T::SinkItem) -> Self::Future { - if let Some(mif) = self.max_in_flight { - if self.in_flight + self.requests.len() >= mif { - return Box::new(future::err(E::from(Error::TransportFull))); - } - } - assert!(!self.finish, "invoked call() after poll_close()"); let (tx, rx) = oneshot::channel(); let id = self.transport.assign_tag(&mut req); self.requests.push_back(req); self.responses.push_back((id, tx)); + + // TODO: one day, we'll use existentials here Box::new(rx.map_err(|_| E::from(Error::ClientDropped))) } } diff --git a/src/multiplex/server.rs b/src/multiplex/server.rs index ee00a46..accd1bc 100644 --- a/src/multiplex/server.rs +++ b/src/multiplex/server.rs @@ -24,7 +24,6 @@ where transport: T, service: S, - max_in_flight: Option, in_flight: usize, finish: bool, } @@ -133,17 +132,12 @@ where /// Requests are passed to `Service::call` as they arrive, and responses are written back to /// the underlying `transport` in the order that they complete. If a later request completes /// before an earlier request, its response is still sent immediately. - /// - /// If `limit` is `Some(n)`, at most `n` requests are allowed to be pending at any given point - /// in time. - pub fn multiplexed(transport: T, service: S, limit: Option) -> Self { - let cap = limit.unwrap_or(16); + pub fn new(transport: T, service: S) -> Self { Server { - responses: VecDeque::with_capacity(cap), + responses: VecDeque::new(), pending: FuturesUnordered::new(), transport, service, - max_in_flight: limit, in_flight: 0, finish: false, } @@ -248,15 +242,6 @@ where return Ok(Async::NotReady); } - // we can't send any more, so see if there are more requests for us - if let Some(max) = self.max_in_flight { - if self.in_flight >= max { - // we can't accept any more requests until we finish some responses - return Ok(Async::NotReady); - } - } - - // we are allowed to receive another request // is the service ready? try_ready!(self.service.poll_ready().map_err(Error::from_service_error)); diff --git a/src/pipeline/client.rs b/src/pipeline/client.rs index db75d87..27d27f4 100644 --- a/src/pipeline/client.rs +++ b/src/pipeline/client.rs @@ -1,5 +1,4 @@ use crate::MakeTransport; -use futures::future; use futures::sync::oneshot; use futures::{Async, AsyncSink, Future, Poll, Sink, Stream}; use std::collections::VecDeque; @@ -23,7 +22,6 @@ where responses: VecDeque>, transport: T, - max_in_flight: Option, in_flight: usize, finish: bool, @@ -62,7 +60,6 @@ where NT: MakeTransport, { maker: Option, - in_flight: Option, } /// A failure to spawn a new `Client`. @@ -92,11 +89,7 @@ where None => unreachable!("poll called after future resolved"), Some(mut fut) => match fut.poll().map_err(SpawnError::Inner)? { Async::Ready(t) => { - let c = if let Some(f) = self.in_flight { - Client::with_limit(t, f) - } else { - Client::new(t) - }; + let c = Client::new(t); Ok(Async::Ready( Buffer::new_direct(c, 0, &DefaultExecutor::current()) @@ -128,7 +121,6 @@ where fn call(&mut self, target: Target) -> Self::Future { NewSpawnedClientFuture { maker: Some(self.t_maker.make_transport(target)), - in_flight: self.in_flight.clone(), } } @@ -243,24 +235,6 @@ where requests: VecDeque::default(), responses: VecDeque::default(), transport, - max_in_flight: None, - in_flight: 0, - error: PhantomData::, - finish: false, - } - } - - /// Construct a new [`Client`] over the given `transport` with a maxmimum limit on the number - /// of in-flight requests. - /// - /// Note that setting the limit to 1 implies that for each `Request`, the `Response` must be - /// received before another request is sent on the same transport. - pub fn with_limit(transport: T, max_in_flight: usize) -> Self { - Client { - requests: VecDeque::with_capacity(max_in_flight), - responses: VecDeque::with_capacity(max_in_flight), - transport, - max_in_flight: Some(max_in_flight), in_flight: 0, error: PhantomData::, finish: false, @@ -283,17 +257,7 @@ where type Future = Box + Send>; fn poll_ready(&mut self) -> Result, Self::Error> { - if let Some(mif) = self.max_in_flight { - if self.in_flight + self.requests.len() >= mif { - // not enough request slots -- need to handle some outstanding - self.poll_service()?; - - if self.in_flight + self.requests.len() >= mif { - // that didn't help -- wait to be awoken again - return Ok(Async::NotReady); - } - } - } + // NOTE: it'd be great if we could poll_ready the Sink, but alas.. return Ok(Async::Ready(())); } @@ -373,12 +337,6 @@ where } fn call(&mut self, req: T::SinkItem) -> Self::Future { - if let Some(mif) = self.max_in_flight { - if self.in_flight + self.requests.len() >= mif { - return Box::new(future::err(E::from(Error::TransportFull))); - } - } - assert!(!self.finish, "invoked call() after poll_close()"); let (tx, rx) = oneshot::channel(); diff --git a/src/pipeline/server.rs b/src/pipeline/server.rs index e8cd6e0..f1b540e 100644 --- a/src/pipeline/server.rs +++ b/src/pipeline/server.rs @@ -18,7 +18,6 @@ where transport: T, service: S, - max_in_flight: Option, in_flight: usize, finish: bool, } @@ -126,23 +125,6 @@ where T: Sink + Stream, S: DirectService, { - /// Construct a new [`Server`] over the given `transport` that services requests using the - /// given `service`. - /// - /// With this constructor, all requests are handled one at a time, and the next request is not - /// sent to the `Service` until the previous request has been fully completed. To allow - /// pipelined requests, use [`Server::pipelined`]. - pub fn new(transport: T, service: S) -> Self { - Server { - responses: VecDeque::default(), - transport, - service, - max_in_flight: Some(1), - in_flight: 0, - finish: false, - } - } - /// Construct a new [`Server`] over the given `transport` that services requests using the /// given `service`. /// @@ -150,16 +132,11 @@ where /// the underlying `transport` in the order that the requests arrive. If a later request /// completes before an earlier request, its result will be buffered until all preceeding /// requests have been sent. - /// - /// If `limit` is `Some(n)`, at most `n` requests are allowed to be pending at any given point - /// in time. - pub fn pipelined(transport: T, service: S, limit: Option) -> Self { - let cap = limit.unwrap_or(16); + pub fn new(transport: T, service: S) -> Self { Server { - responses: VecDeque::with_capacity(cap), + responses: VecDeque::new(), transport, service, - max_in_flight: limit, in_flight: 0, finish: false, } @@ -270,15 +247,6 @@ where return Ok(Async::NotReady); } - // we can't send any more, so see if there are more requests for us - if let Some(max) = self.max_in_flight { - if self.in_flight >= max { - // we can't accept any more requests until we finish some responses - return Ok(Async::NotReady); - } - } - - // we are allowed to receive another request // is the service ready? try_ready!(self.service.poll_ready().map_err(Error::from_service_error)); diff --git a/tests/multiplex/mod.rs b/tests/multiplex/mod.rs index a76d65c..d3c0a34 100644 --- a/tests/multiplex/mod.rs +++ b/tests/multiplex/mod.rs @@ -1,5 +1,5 @@ -use async_bincode::*; use crate::{EchoService, PanicError, Request, Response}; +use async_bincode::*; use slab::Slab; use tokio; use tokio::prelude::*; @@ -42,7 +42,7 @@ fn integration() { .map(AsyncBincodeStream::from) .map(AsyncBincodeStream::for_async) .map_err(PanicError::from) - .map(|stream| Server::multiplexed(stream, EchoService, None)); + .map(|stream| Server::new(stream, EchoService)); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn( diff --git a/tests/pipeline/client.rs b/tests/pipeline/client.rs index 14f9c8a..83a8713 100644 --- a/tests/pipeline/client.rs +++ b/tests/pipeline/client.rs @@ -1,5 +1,5 @@ -use async_bincode::*; use crate::{PanicError, Request, Response}; +use async_bincode::*; use tokio; use tokio::prelude::*; use tokio_tower::pipeline::Client; @@ -15,10 +15,7 @@ fn it_works() { .map(AsyncBincodeStream::from) .map(AsyncBincodeStream::for_async) .map_err(PanicError::from) - .map(|s| { - // need to limit to one-in-flight for poll_ready to be sufficient to drive Service - Client::with_limit(s, 1) - }); + .map(Client::new); let mut rt = tokio::runtime::Runtime::new().unwrap(); rt.spawn(