diff --git a/rustfmt.toml b/rustfmt.toml index 034cf00..c9225d8 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,3 +1,4 @@ fn_brace_style = "SameLineWhere" item_brace_style = "SameLineWhere" -use_try_shorthand = true \ No newline at end of file +use_try_shorthand = true +max_width = 80 \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 2304419..b282953 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,15 +27,22 @@ impl Pipeline // given another pipeline entry, send the results of the previous entry into // the next one #[must_use] - pub fn then(self, next: Entry, buffsize: usize) -> Pipeline + pub fn then(self, + next: Entry, + buffsize: usize) + -> Pipeline where Entry: PipelineEntry + Send + 'static, EntryOut: Send { self.pipe(move |tx, rx| next.process(tx, rx), buffsize) } - pub fn pipe(self, func: Func, buffsize: usize) -> Pipeline - where Func: FnOnce(mpsc::Receiver, mpsc::SyncSender) -> (), + pub fn pipe(self, + func: Func, + buffsize: usize) + -> Pipeline + where Func: FnOnce(mpsc::Receiver, mpsc::SyncSender) + -> (), Func: Send + 'static, EntryOut: Send { @@ -45,7 +52,10 @@ impl Pipeline Pipeline { rx } } - pub fn map(self, func: Func, buffsize: usize) -> Pipeline + pub fn map(self, + func: Func, + buffsize: usize) + -> Pipeline where Func: Fn(Output) -> EntryOut + Send + 'static, EntryOut: Send { @@ -76,7 +86,10 @@ impl IntoIterator for Pipeline } pub trait PipelineEntry { - fn process>(self, rx: I, tx: mpsc::SyncSender) -> (); + fn process>(self, + rx: I, + tx: mpsc::SyncSender) + -> (); } @@ -112,7 +125,9 @@ pub mod map { impl PipelineEntry for Mapper where Func: Fn(In) -> Out { - fn process>(self, rx: I, tx: mpsc::SyncSender) { + fn process>(self, + rx: I, + tx: mpsc::SyncSender) { for item in rx { let mapped = (self.func)(item); tx.send(mapped).expect("failed to send"); @@ -151,7 +166,9 @@ pub mod filter { impl PipelineEntry for Filter where Func: Fn(&In) -> bool { - fn process>(self, rx: I, tx: mpsc::SyncSender) { + fn process>(self, + rx: I, + tx: mpsc::SyncSender) { for item in rx { if (self.func)(&item) { tx.send(item).expect("failed to send") @@ -200,7 +217,9 @@ pub mod multiplex { In: Send + 'static, Out: Send + 'static { - fn process>(self, rx: I, tx: mpsc::SyncSender) { + fn process>(self, + rx: I, + tx: mpsc::SyncSender) { // workers will read their work out of this channel but send their // results directly into the regular tx channel let (master_tx, chan_rx) = mpsc::sync_channel(self.buffsize); @@ -283,7 +302,8 @@ mod tests { let source: Vec = (1..1000).collect(); let expect: Vec = source.iter().map(|x| x * 2).collect(); - let pbb: Pipeline = Pipeline::new(source, buffsize).map(|i| i * 2, buffsize); + let pbb: Pipeline = + Pipeline::new(source, buffsize).map(|i| i * 2, buffsize); let produced: Vec = pbb.into_iter().collect(); assert_eq!(produced, expect); @@ -293,7 +313,8 @@ mod tests { fn multiple_map() { let buffsize: usize = 10; let source: Vec = vec![1, 2, 3]; - let expect: Vec = source.iter().map(|x| (x * 2) * (x * 2)).collect(); + let expect: Vec = + source.iter().map(|x| (x * 2) * (x * 2)).collect(); let pbb: Pipeline = Pipeline::new(source, buffsize) .map(|i| i * 2, buffsize) @@ -354,13 +375,14 @@ mod tests { .filter(|x| x % 2 == 0) .collect(); - let pbb: Pipeline = Pipeline::new(source, buffsize).pipe(|in_, out| for item in in_ { - let item = item + 1; - if item % 2 == 0 { - out.send(item).expect("failed to send") - } - }, - 10); + let pbb: Pipeline = Pipeline::new(source, buffsize) + .pipe(|in_, out| for item in in_ { + let item = item + 1; + if item % 2 == 0 { + out.send(item).expect("failed to send") + } + }, + 10); let produced: Vec = pbb.into_iter().collect(); assert_eq!(produced, expect);