diff --git a/Stream.js b/Stream.js index 6231ece4..edae5177 100644 --- a/Stream.js +++ b/Stream.js @@ -11,10 +11,9 @@ var iterable = require('./lib/iterable'); module.exports = Stream; -/** @typedef {Yield|Skip|End} Step */ +/** @typedef {Yield|End} Step */ var Yield = Stream.Yield = step.Yield; -var Skip = Stream.Skip = step.Skip; var End = Stream.End = step.End; var iterableFrom = iterable.from; @@ -124,16 +123,16 @@ Stream.periodic = function(period, scheduler) { * and the stream has ended. */ Stream.prototype.forEach = Stream.prototype.observe = function(f) { - return runStream(f, this.step, this.state); + return immediate(runStream, f, this.step, this.state); }; function runStream(f, stepper, state) { - return next(stepper, state).then(function(s) { + return when(next(stepper, state), function(s) { if (s.done) { return s.value; } - return Promise.resolve(f(s.value)).then(function (x) { + return when(f(s.value), function (x) { return x instanceof End ? x.value : runStream(f, stepper, s.state); }); @@ -150,7 +149,7 @@ Stream.prototype.delay = function(delayTime, scheduler) { var stepper = this.step; return new Stream(function(s) { - return next(stepper, s.state).then(function(i) { + return when(next(stepper, s.state), function(i) { return i.done ? i : delay(s.value, yieldPair(i, s.value), scheduler); }); @@ -168,18 +167,23 @@ Stream.prototype.debounce = function(period, scheduler) { var stepper = this.step; return new Stream(function(s) { - return next(stepper, s.state).then(function(i) { - if(i.done) { - return i; - } - - var now = scheduler.now(); - var end = s.value; - return now > end ? yieldPair(i, now + period) : skipPair(i, end); - }); + return debounceNext(stepper, s, period, scheduler); }, new Pair(scheduler.now(), this.state)); }; +function debounceNext(stepper, s, period, scheduler) { + return when(next(stepper, s.state), function(i) { + if(i.done) { + return i; + } + + var now = scheduler.now(); + var end = s.value; + return now > end ? yieldPair(i, now + period) + : debounceNext(stepper, new Pair(end, i.state), period, scheduler); + }); +} + /** * Transform each value in the stream by applying f to each * @param {function(*):*} f mapping function @@ -188,7 +192,7 @@ Stream.prototype.debounce = function(period, scheduler) { Stream.prototype.map = function(f) { var stepper = this.step; return new Stream(function (state) { - return next(stepper, state).then(function(i) { + return when(next(stepper, state), function(i) { return i.done ? i : new Yield(f(i.value), i.state); }); @@ -236,14 +240,14 @@ Stream.prototype.flatMap = Stream.prototype.chain = function(f) { }; function stepOuter(stepChain, f, outer) { - return streamNext(outer).then(function(i) { + return when(Promise.resolve(streamNext(outer)), function(i) { return i.done ? i : stepInner(stepChain, f, new Stream(outer.step, i.state), f(i.value)); }); } function stepInner(stepChain, f, outer, inner) { - return streamNext(inner).then(function(ii) { + return when(Promise.resolve(streamNext(inner)), function(ii) { return ii.done ? stepChain(new Outer(f, outer)) : new Yield(ii.value, new Inner(f, outer, new Stream(inner.step, ii.state))); }); @@ -257,13 +261,17 @@ function stepInner(stepChain, f, outer, inner) { Stream.prototype.filter = function(p) { var stepper = this.step; return new Stream(function(state) { - return next(stepper, state).then(function(i) { - return i.done || p(i.value) ? i - : new Skip(i.state); - }); + return filterNext(p, stepper, state); }, this.state); }; +function filterNext(p, stepper, state) { + return when(next(stepper, state), function(i) { + return i.done || p(i.value) ? i + : filterNext(p, stepper, i.state); + }); +} + /** * Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b] * @param {?function(a:*, b:*):boolean} equals optional function to compare items. @@ -277,28 +285,33 @@ Stream.prototype.distinct = function(equals) { var stepper = this.step; return new Stream(function(s) { - return next(stepper, s.state).then(function(i) { - if(i.done) { - return i; - } - return equals(s.value, i.value) ? skipPair(i, s.value) - : yieldPair(i, i.value); - }); + return distinctNext(equals, stepper, s); }, new Pair({}, this.state)); }; +function distinctNext(equals, stepper, s) { + return when(next(stepper, s.state), function(i) { + if(i.done) { + return i; + } + return equals(s.value, i.value) + ? distinctNext(equals, stepper, new Pair(s.value, i.state)) + : yieldPair(i, i.value); + }); +} + /** * @returns {Promise} a promise for the first item in the stream */ Stream.prototype.head = function() { - return streamNext(this).then(getValueOrFail); + return when(Promise.resolve(streamNext(this)), getValueOrFail); }; /** * @returns {Stream} a stream containing all items in this stream except the first */ Stream.prototype.tail = function() { - return new Stream(this.step, streamNext(this).then(getState)); + return new Stream(this.step, when(streamNext(this), getState)); }; /** @@ -309,7 +322,7 @@ Stream.prototype.tail = function() { Stream.prototype.takeWhile = function(p) { var stepper = this.step; return new Stream(function(s) { - return next(stepper, s).then(function(i) { + return when(next(stepper, s), function(i) { return i.done || p(i.value) ? i : new End(); }); @@ -323,7 +336,7 @@ Stream.prototype.takeWhile = function(p) { Stream.prototype.take = function(n) { var stepper = this.step; return new Stream(function(s) { - return next(stepper, s.state).then(function(i) { + return when(next(stepper, s.state), function(i) { var remaining = s.value - 1; return i.done ? i : s.value === 0 ? new End(i.value) @@ -367,7 +380,7 @@ Stream.prototype.concat = function(s) { Stream.prototype.scan = function(f, initial) { var stepper = this.step; return new Stream(function(s) { - return next(stepper, s.state).then(function(i) { + return when(next(stepper, s.state), function(i) { if(i.done) { return i; } @@ -387,23 +400,20 @@ Stream.prototype.scan = function(f, initial) { * @returns {Promise} promise for the file result of the reduce */ Stream.prototype.reduce = function(f, initial) { - return reduce(f, initial, this.step, this.state); + return immediate(reduce, f, initial, this.step, this.state); }; function reduce(f, z, stepper, state) { - return next(stepper, state).then(function(i) { - return i.done ? z - : reduce(f, f(z, i.value), stepper, i.state); - } - ); + return when(next(stepper, state), function(i) { + return i.done ? z + : reduce(f, f(z, i.value), stepper, i.state); + }); } // Helpers function next(stepper, state) { - return Promise.resolve(state).then(stepper).then(function(i) { - return i.skip ? next(stepper, i.state) : i; - }); + return when(state, stepper); } function streamNext(s) { @@ -441,10 +451,6 @@ function yieldPair(step, x) { return new Yield(step.value, new Pair(x, step.state)); } -function skipPair(step, x) { - return new Skip(new Pair(x, step.state)); -} - function Outer(f, outer) { this.f = f; this.outer = outer; this.inner = void 0; } @@ -467,3 +473,18 @@ function ensureScheduler(scheduler) { } return scheduler; } + +function when(x, f) { + return isPromise(x) ? x.then(f) : f(x); +} + +function isPromise(x) { + return x !== null && (typeof x === 'object' || typeof x === 'function' ) && typeof x.then === 'function'; +} + +var slice = Array.prototype.slice; +function immediate(f) { + return Promise.resolve(slice.call(arguments, 1)).then(function(args) { + return f.apply(void 0, args); + }); +} diff --git a/lib/iterable.js b/lib/iterable.js index 59442be2..d452ea2d 100644 --- a/lib/iterable.js +++ b/lib/iterable.js @@ -6,6 +6,9 @@ var Promise = require('./Promise'); var step = require('./step'); +var Yield = step.Yield; +var End = step.End; + exports.from = from; exports.head = head; @@ -37,7 +40,7 @@ function head(iterable) { var iteration = iterator.next(); return Promise.resolve(iteration).then(function(iteration) { return iteration.done ? iteration - : new step.Yield(iteration.value, new IterableWrapper(iterator)); + : new Yield(iteration.value, new IterableWrapper(iterator)); }); } @@ -63,6 +66,7 @@ function ArrayIterator(array) { } ArrayIterator.prototype.next = function() { - return this.index < this.array.length ? new step.Yield(this.array[this.index++]) - : new step.End(); + return this.index < this.array.length + ? new Yield(this.array[this.index++]) + : new End(); }; diff --git a/lib/step.js b/lib/step.js index 0c99a7c0..d949577b 100644 --- a/lib/step.js +++ b/lib/step.js @@ -4,17 +4,12 @@ /** @module */ exports.Yield = Yield; -exports.Skip = Skip; exports.End = End; function Yield(x, s) { - this.done = false; this.skip = false; this.value = x; this.state = s; -} - -function Skip(s) { - this.done = false; this.skip = true; this.value = void 0; this.state = s; + this.done = false; this.value = x; this.state = s; } function End(x) { - this.done = true; this.skip = false; this.value = x; this.state = this; + this.done = true; this.value = x; this.state = this; } diff --git a/test/Stream-test.js b/test/Stream-test.js index b2b582ff..bec105ab 100644 --- a/test/Stream-test.js +++ b/test/Stream-test.js @@ -72,7 +72,7 @@ describe('Stream', function() { }); it('should end if consumer returns End', function() { - var spy = this.spy(function(x) { + var spy = this.spy(function() { return new Stream.End(); });