From fa4cc21fadc6272d300a7e8bf6f2e6a29f363b1a Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Tue, 18 Oct 2016 21:07:52 +0200 Subject: [PATCH 1/4] Implement buffer combinator --- src/stream/buffer.rs | 62 ++++++++++++++++++++++++++++++++++++++++++++ src/stream/mod.rs | 17 ++++++++++++ tests/stream.rs | 7 +++++ 3 files changed, 86 insertions(+) create mode 100644 src/stream/buffer.rs diff --git a/src/stream/buffer.rs b/src/stream/buffer.rs new file mode 100644 index 00000000000..24a577975f3 --- /dev/null +++ b/src/stream/buffer.rs @@ -0,0 +1,62 @@ +use std::mem; +use std::prelude::v1::*; + +use {Async, Poll}; +use stream::{Stream, Fuse}; + +/// An adaptor that buffers elements in a vector before passing them on as one item. +/// +/// This adaptor will buffer up a list of items in a stream and pass on the +/// vector used for buffering when a specified capacity has been reached. This is +/// created by the `Stream::buffer` method. +#[must_use = "streams do nothing unless polled"] +pub struct Buffer + where S: Stream +{ + capacity: usize, // TODO: Do we need this? Doesn't Vec::capacity() suffice? + items: Vec<::Item>, + stream: Fuse +} + +pub fn new(s: S, capacity: usize) -> Buffer + where S: Stream +{ + Buffer { + capacity: capacity, + items: Vec::with_capacity(capacity), + stream: super::fuse::new(s), + } +} + +impl Stream for Buffer + where S: Stream +{ + type Item = Vec<::Item>; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + let maybe_next = try_ready!(self.stream.poll()); + + if let Some(item) = maybe_next { + // Push the item into the buffer and check whether it is + // full. If so, replace our buffer with a new and empty one + // and return the full one. + self.items.push(item); + if self.items.len() < self.capacity { + Ok(Async::NotReady) + } else { + let full_buf = mem::replace(&mut self.items, Vec::with_capacity(self.capacity)); + Ok(Async::Ready(Some(full_buf))) + } + } else { + // Since the underlying stream ran out of values, return + // what we have buffered, if we have anything. + if self.items.len() > 0 { + let full_buf = mem::replace(&mut self.items, Vec::new()); + Ok(Async::Ready(Some(full_buf))) + } else { + Ok(Async::Ready(None)) + } + } + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 7cc61f0ac0a..30d179409d9 100755 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -60,12 +60,14 @@ pub use self::peek::Peekable; if_std! { use std; + mod buffer; mod buffered; mod buffer_unordered; mod catch_unwind; mod channel; mod collect; mod wait; + pub use self::buffer::Buffer; pub use self::buffered::Buffered; pub use self::buffer_unordered::BufferUnordered; pub use self::catch_unwind::CatchUnwind; @@ -673,6 +675,21 @@ pub trait Stream { catch_unwind::new(self) } + /// An adaptor for buffering up items of the stream before passing them on + /// as one vector. + /// + /// The vector will contain at most `capacity` elements, though can contain + /// less if the underlying stream ended and did not produce a multiple of + /// `capacity` elements. + /// + /// Errors are passed through. + #[cfg(feature = "use_std")] + fn buffer(self, capacity: usize) -> Buffer + where Self: Sized + { + buffer::new(self, capacity) + } + /// An adaptor for creating a buffered list of pending futures. /// /// If this stream's item can be converted into a future, then this adaptor diff --git a/tests/stream.rs b/tests/stream.rs index d2186206a32..512ef7d9234 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -265,3 +265,10 @@ fn wait() { assert_eq!(list().wait().collect::, _>>(), Ok(vec![1, 2, 3])); } + +#[test] +fn buffer() { + assert_done(|| list().buffer(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().buffer(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().buffer(2).collect(), Ok(vec![vec![1, 2], vec![3]])); +} \ No newline at end of file From 2fd8f799c4106b46e3886fde2f6159c498d9b538 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Wed, 19 Oct 2016 01:47:27 +0200 Subject: [PATCH 2/4] Repeatedly call poll until Async::NotReady --- src/stream/buffer.rs | 38 ++++++++++++++++++-------------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/stream/buffer.rs b/src/stream/buffer.rs index 24a577975f3..1127bfab8a5 100644 --- a/src/stream/buffer.rs +++ b/src/stream/buffer.rs @@ -35,27 +35,25 @@ impl Stream for Buffer type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { - let maybe_next = try_ready!(self.stream.poll()); - - if let Some(item) = maybe_next { - // Push the item into the buffer and check whether it is - // full. If so, replace our buffer with a new and empty one - // and return the full one. - self.items.push(item); - if self.items.len() < self.capacity { - Ok(Async::NotReady) - } else { - let full_buf = mem::replace(&mut self.items, Vec::with_capacity(self.capacity)); - Ok(Async::Ready(Some(full_buf))) - } - } else { - // Since the underlying stream ran out of values, return - // what we have buffered, if we have anything. - if self.items.len() > 0 { - let full_buf = mem::replace(&mut self.items, Vec::new()); - Ok(Async::Ready(Some(full_buf))) + loop { + if let Some(item) = try_ready!(self.stream.poll()) { + // Push the item into the buffer and check whether it is + // full. If so, replace our buffer with a new and empty one + // and return the full one. + self.items.push(item); + if self.items.len() >= self.capacity { + let full_buf = mem::replace(&mut self.items, Vec::with_capacity(self.capacity)); + return Ok(Async::Ready(Some(full_buf))) + } } else { - Ok(Async::Ready(None)) + // Since the underlying stream ran out of values, return + // what we have buffered, if we have anything. + return if self.items.len() > 0 { + let full_buf = mem::replace(&mut self.items, Vec::new()); + Ok(Async::Ready(Some(full_buf))) + } else { + Ok(Async::Ready(None)) + } } } } From 76c67505c3553825fc3483002e5f3b94dadebde0 Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Wed, 19 Oct 2016 20:16:43 +0200 Subject: [PATCH 3/4] Ensure that capacity is greater than zero --- src/stream/buffer.rs | 2 ++ src/stream/mod.rs | 2 +- tests/stream.rs | 6 ++++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/stream/buffer.rs b/src/stream/buffer.rs index 1127bfab8a5..c7568717322 100644 --- a/src/stream/buffer.rs +++ b/src/stream/buffer.rs @@ -21,6 +21,8 @@ pub struct Buffer pub fn new(s: S, capacity: usize) -> Buffer where S: Stream { + assert!(capacity > 0); + Buffer { capacity: capacity, items: Vec::with_capacity(capacity), diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 30d179409d9..9df650b6396 100755 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -680,7 +680,7 @@ pub trait Stream { /// /// The vector will contain at most `capacity` elements, though can contain /// less if the underlying stream ended and did not produce a multiple of - /// `capacity` elements. + /// `capacity` elements. `capacity` must be greater than zero. /// /// Errors are passed through. #[cfg(feature = "use_std")] diff --git a/tests/stream.rs b/tests/stream.rs index 512ef7d9234..03b12a5c00c 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -271,4 +271,10 @@ fn buffer() { assert_done(|| list().buffer(3).collect(), Ok(vec![vec![1, 2, 3]])); assert_done(|| list().buffer(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); assert_done(|| list().buffer(2).collect(), Ok(vec![vec![1, 2], vec![3]])); +} + +#[test] +#[should_panic] +fn buffer_panic_on_cap_zero() { + let _ = list().buffer(0); } \ No newline at end of file From 11f1d5eb431721f6b3706fc2c957787e4633aa4c Mon Sep 17 00:00:00 2001 From: Moritz Gunz Date: Wed, 19 Oct 2016 20:23:45 +0200 Subject: [PATCH 4/4] Rename to chunks --- src/stream/{buffer.rs => chunks.rs} | 16 +++++++------- src/stream/mod.rs | 33 ++++++++++++++--------------- tests/stream.rs | 12 +++++------ 3 files changed, 30 insertions(+), 31 deletions(-) rename src/stream/{buffer.rs => chunks.rs} (84%) diff --git a/src/stream/buffer.rs b/src/stream/chunks.rs similarity index 84% rename from src/stream/buffer.rs rename to src/stream/chunks.rs index c7568717322..56bcce427b1 100644 --- a/src/stream/buffer.rs +++ b/src/stream/chunks.rs @@ -4,13 +4,13 @@ use std::prelude::v1::*; use {Async, Poll}; use stream::{Stream, Fuse}; -/// An adaptor that buffers elements in a vector before passing them on as one item. +/// An adaptor that chunks up elements in a vector. /// -/// This adaptor will buffer up a list of items in a stream and pass on the +/// This adaptor will buffer up a list of items in the stream and pass on the /// vector used for buffering when a specified capacity has been reached. This is -/// created by the `Stream::buffer` method. +/// created by the `Stream::chunks` method. #[must_use = "streams do nothing unless polled"] -pub struct Buffer +pub struct Chunks where S: Stream { capacity: usize, // TODO: Do we need this? Doesn't Vec::capacity() suffice? @@ -18,19 +18,19 @@ pub struct Buffer stream: Fuse } -pub fn new(s: S, capacity: usize) -> Buffer +pub fn new(s: S, capacity: usize) -> Chunks where S: Stream { assert!(capacity > 0); - - Buffer { + + Chunks { capacity: capacity, items: Vec::with_capacity(capacity), stream: super::fuse::new(s), } } -impl Stream for Buffer +impl Stream for Chunks where S: Stream { type Item = Vec<::Item>; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 9df650b6396..e9d80f13ed6 100755 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -60,18 +60,18 @@ pub use self::peek::Peekable; if_std! { use std; - mod buffer; mod buffered; mod buffer_unordered; mod catch_unwind; mod channel; + mod chunks; mod collect; mod wait; - pub use self::buffer::Buffer; pub use self::buffered::Buffered; pub use self::buffer_unordered::BufferUnordered; pub use self::catch_unwind::CatchUnwind; pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError}; + pub use self::chunks::Chunks; pub use self::collect::Collect; pub use self::wait::Wait; @@ -675,21 +675,6 @@ pub trait Stream { catch_unwind::new(self) } - /// An adaptor for buffering up items of the stream before passing them on - /// as one vector. - /// - /// The vector will contain at most `capacity` elements, though can contain - /// less if the underlying stream ended and did not produce a multiple of - /// `capacity` elements. `capacity` must be greater than zero. - /// - /// Errors are passed through. - #[cfg(feature = "use_std")] - fn buffer(self, capacity: usize) -> Buffer - where Self: Sized - { - buffer::new(self, capacity) - } - /// An adaptor for creating a buffered list of pending futures. /// /// If this stream's item can be converted into a future, then this adaptor @@ -758,6 +743,20 @@ pub trait Stream { { peek::new(self) } + + /// An adaptor for chunking up items of the stream inside a vector. + /// + /// The vector will contain at most `capacity` elements, though can contain + /// less if the underlying stream ended and did not produce a multiple of + /// `capacity` elements. `capacity` must be greater than zero. + /// + /// Errors are passed through. + #[cfg(feature = "use_std")] + fn chunks(self, capacity: usize) -> Chunks + where Self: Sized + { + chunks::new(self, capacity) + } } impl<'a, S: ?Sized + Stream> Stream for &'a mut S { diff --git a/tests/stream.rs b/tests/stream.rs index 03b12a5c00c..6475aa545cc 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -267,14 +267,14 @@ fn wait() { } #[test] -fn buffer() { - assert_done(|| list().buffer(3).collect(), Ok(vec![vec![1, 2, 3]])); - assert_done(|| list().buffer(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); - assert_done(|| list().buffer(2).collect(), Ok(vec![vec![1, 2], vec![3]])); +fn chunks() { + assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); } #[test] #[should_panic] -fn buffer_panic_on_cap_zero() { - let _ = list().buffer(0); +fn chunks_panic_on_cap_zero() { + let _ = list().chunks(0); } \ No newline at end of file