Skip to content

Commit

Permalink
Remove Skip in favor of tail recursion (get it for 'free' with promis…
Browse files Browse the repository at this point in the history
…es). Avoid promise trampoline in a large number of cases. Overall this commit is a significant performance increase and resource usage decrease. In *simple/limited* testing, most is about 25% faster than Bacon.js and RxJS now
  • Loading branch information
briancavalier committed Jul 22, 2014
1 parent 4648cc3 commit 28a90ca
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 58 deletions.
115 changes: 68 additions & 47 deletions Stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ var iterable = require('./lib/iterable');

module.exports = Stream;

/** @typedef {Yield|Skip|End} Step */
/** @typedef {Yield|End} Step */

var Yield = Stream.Yield = step.Yield;
var Skip = Stream.Skip = step.Skip;
var End = Stream.End = step.End;

var iterableFrom = iterable.from;
Expand Down Expand Up @@ -124,16 +123,16 @@ Stream.periodic = function(period, scheduler) {
* and the stream has ended.
*/
Stream.prototype.forEach = Stream.prototype.observe = function(f) {
return runStream(f, this.step, this.state);
return immediate(runStream, f, this.step, this.state);
};

function runStream(f, stepper, state) {
return next(stepper, state).then(function(s) {
return when(next(stepper, state), function(s) {
if (s.done) {
return s.value;
}

return Promise.resolve(f(s.value)).then(function (x) {
return when(f(s.value), function (x) {
return x instanceof End ? x.value
: runStream(f, stepper, s.state);
});
Expand All @@ -150,7 +149,7 @@ Stream.prototype.delay = function(delayTime, scheduler) {

var stepper = this.step;
return new Stream(function(s) {
return next(stepper, s.state).then(function(i) {
return when(next(stepper, s.state), function(i) {
return i.done ? i
: delay(s.value, yieldPair(i, s.value), scheduler);
});
Expand All @@ -168,18 +167,23 @@ Stream.prototype.debounce = function(period, scheduler) {

var stepper = this.step;
return new Stream(function(s) {
return next(stepper, s.state).then(function(i) {
if(i.done) {
return i;
}

var now = scheduler.now();
var end = s.value;
return now > end ? yieldPair(i, now + period) : skipPair(i, end);
});
return debounceNext(stepper, s, period, scheduler);
}, new Pair(scheduler.now(), this.state));
};

function debounceNext(stepper, s, period, scheduler) {
return when(next(stepper, s.state), function(i) {
if(i.done) {
return i;
}

var now = scheduler.now();
var end = s.value;
return now > end ? yieldPair(i, now + period)
: debounceNext(stepper, new Pair(end, i.state), period, scheduler);
});
}

/**
* Transform each value in the stream by applying f to each
* @param {function(*):*} f mapping function
Expand All @@ -188,7 +192,7 @@ Stream.prototype.debounce = function(period, scheduler) {
Stream.prototype.map = function(f) {
var stepper = this.step;
return new Stream(function (state) {
return next(stepper, state).then(function(i) {
return when(next(stepper, state), function(i) {
return i.done ? i
: new Yield(f(i.value), i.state);
});
Expand Down Expand Up @@ -236,14 +240,14 @@ Stream.prototype.flatMap = Stream.prototype.chain = function(f) {
};

function stepOuter(stepChain, f, outer) {
return streamNext(outer).then(function(i) {
return when(Promise.resolve(streamNext(outer)), function(i) {
return i.done ? i
: stepInner(stepChain, f, new Stream(outer.step, i.state), f(i.value));
});
}

function stepInner(stepChain, f, outer, inner) {
return streamNext(inner).then(function(ii) {
return when(Promise.resolve(streamNext(inner)), function(ii) {
return ii.done ? stepChain(new Outer(f, outer))
: new Yield(ii.value, new Inner(f, outer, new Stream(inner.step, ii.state)));
});
Expand All @@ -257,13 +261,17 @@ function stepInner(stepChain, f, outer, inner) {
Stream.prototype.filter = function(p) {
var stepper = this.step;
return new Stream(function(state) {
return next(stepper, state).then(function(i) {
return i.done || p(i.value) ? i
: new Skip(i.state);
});
return filterNext(p, stepper, state);
}, this.state);
};

function filterNext(p, stepper, state) {
return when(next(stepper, state), function(i) {
return i.done || p(i.value) ? i
: filterNext(p, stepper, i.state);
});
}

/**
* Remove adjacent duplicates: [a,b,b,c,b] -> [a,b,c,b]
* @param {?function(a:*, b:*):boolean} equals optional function to compare items.
Expand All @@ -277,28 +285,33 @@ Stream.prototype.distinct = function(equals) {

var stepper = this.step;
return new Stream(function(s) {
return next(stepper, s.state).then(function(i) {
if(i.done) {
return i;
}
return equals(s.value, i.value) ? skipPair(i, s.value)
: yieldPair(i, i.value);
});
return distinctNext(equals, stepper, s);
}, new Pair({}, this.state));
};

function distinctNext(equals, stepper, s) {
return when(next(stepper, s.state), function(i) {
if(i.done) {
return i;
}
return equals(s.value, i.value)
? distinctNext(equals, stepper, new Pair(s.value, i.state))
: yieldPair(i, i.value);
});
}

/**
* @returns {Promise} a promise for the first item in the stream
*/
Stream.prototype.head = function() {
return streamNext(this).then(getValueOrFail);
return when(Promise.resolve(streamNext(this)), getValueOrFail);
};

/**
* @returns {Stream} a stream containing all items in this stream except the first
*/
Stream.prototype.tail = function() {
return new Stream(this.step, streamNext(this).then(getState));
return new Stream(this.step, when(streamNext(this), getState));
};

/**
Expand All @@ -309,7 +322,7 @@ Stream.prototype.tail = function() {
Stream.prototype.takeWhile = function(p) {
var stepper = this.step;
return new Stream(function(s) {
return next(stepper, s).then(function(i) {
return when(next(stepper, s), function(i) {
return i.done || p(i.value) ? i
: new End();
});
Expand All @@ -323,7 +336,7 @@ Stream.prototype.takeWhile = function(p) {
Stream.prototype.take = function(n) {
var stepper = this.step;
return new Stream(function(s) {
return next(stepper, s.state).then(function(i) {
return when(next(stepper, s.state), function(i) {
var remaining = s.value - 1;
return i.done ? i
: s.value === 0 ? new End(i.value)
Expand Down Expand Up @@ -367,7 +380,7 @@ Stream.prototype.concat = function(s) {
Stream.prototype.scan = function(f, initial) {
var stepper = this.step;
return new Stream(function(s) {
return next(stepper, s.state).then(function(i) {
return when(next(stepper, s.state), function(i) {
if(i.done) {
return i;
}
Expand All @@ -387,23 +400,20 @@ Stream.prototype.scan = function(f, initial) {
* @returns {Promise} promise for the file result of the reduce
*/
Stream.prototype.reduce = function(f, initial) {
return reduce(f, initial, this.step, this.state);
return immediate(reduce, f, initial, this.step, this.state);
};

function reduce(f, z, stepper, state) {
return next(stepper, state).then(function(i) {
return i.done ? z
: reduce(f, f(z, i.value), stepper, i.state);
}
);
return when(next(stepper, state), function(i) {
return i.done ? z
: reduce(f, f(z, i.value), stepper, i.state);
});
}

// Helpers

function next(stepper, state) {
return Promise.resolve(state).then(stepper).then(function(i) {
return i.skip ? next(stepper, i.state) : i;
});
return when(state, stepper);
}

function streamNext(s) {
Expand Down Expand Up @@ -441,10 +451,6 @@ function yieldPair(step, x) {
return new Yield(step.value, new Pair(x, step.state));
}

function skipPair(step, x) {
return new Skip(new Pair(x, step.state));
}

function Outer(f, outer) {
this.f = f; this.outer = outer; this.inner = void 0;
}
Expand All @@ -467,3 +473,18 @@ function ensureScheduler(scheduler) {
}
return scheduler;
}

function when(x, f) {
return isPromise(x) ? x.then(f) : f(x);
}

function isPromise(x) {
return x !== null && (typeof x === 'object' || typeof x === 'function' ) && typeof x.then === 'function';
}

var slice = Array.prototype.slice;
function immediate(f) {
return Promise.resolve(slice.call(arguments, 1)).then(function(args) {
return f.apply(void 0, args);
});
}
10 changes: 7 additions & 3 deletions lib/iterable.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
var Promise = require('./Promise');
var step = require('./step');

var Yield = step.Yield;
var End = step.End;

exports.from = from;
exports.head = head;

Expand Down Expand Up @@ -37,7 +40,7 @@ function head(iterable) {
var iteration = iterator.next();
return Promise.resolve(iteration).then(function(iteration) {
return iteration.done ? iteration
: new step.Yield(iteration.value, new IterableWrapper(iterator));
: new Yield(iteration.value, new IterableWrapper(iterator));
});
}

Expand All @@ -63,6 +66,7 @@ function ArrayIterator(array) {
}

ArrayIterator.prototype.next = function() {
return this.index < this.array.length ? new step.Yield(this.array[this.index++])
: new step.End();
return this.index < this.array.length
? new Yield(this.array[this.index++])
: new End();
};
9 changes: 2 additions & 7 deletions lib/step.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,12 @@
/** @module */

exports.Yield = Yield;
exports.Skip = Skip;
exports.End = End;

function Yield(x, s) {
this.done = false; this.skip = false; this.value = x; this.state = s;
}

function Skip(s) {
this.done = false; this.skip = true; this.value = void 0; this.state = s;
this.done = false; this.value = x; this.state = s;
}

function End(x) {
this.done = true; this.skip = false; this.value = x; this.state = this;
this.done = true; this.value = x; this.state = this;
}
2 changes: 1 addition & 1 deletion test/Stream-test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ describe('Stream', function() {
});

it('should end if consumer returns End', function() {
var spy = this.spy(function(x) {
var spy = this.spy(function() {
return new Stream.End();
});

Expand Down

0 comments on commit 28a90ca

Please # to comment.