Skip to content

Commit 9c2504a

Browse files
benjamingrruyadorno
authored andcommitted
stream: add reduce
PR-URL: #41669 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent ca1cd42 commit 9c2504a

File tree

5 files changed

+246
-16
lines changed

5 files changed

+246
-16
lines changed

doc/api/stream.md

+44
Original file line numberDiff line numberDiff line change
@@ -2088,6 +2088,49 @@ const pairs = await Readable.from(['a', 'b', 'c']).asIndexedPairs().toArray();
20882088
console.log(pairs); // [[0, 'a'], [1, 'b'], [2, 'c']]
20892089
```
20902090

2091+
### `readable.reduce(fn[, initial[, options]])`
2092+
2093+
<!-- YAML
2094+
added: REPLACEME
2095+
-->
2096+
2097+
> Stability: 1 - Experimental
2098+
2099+
* `fn` {Function|AsyncFunction} a reducer function to call over every chunk
2100+
in the stream.
2101+
* `previous` {any} the value obtained from the last call to `fn` or the
2102+
`initial` value if specified or the first chunk of the stream otherwise.
2103+
* `data` {any} a chunk of data from the stream.
2104+
* `options` {Object}
2105+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
2106+
abort the `fn` call early.
2107+
* `initial` {any} the initial value to use in the reduction.
2108+
* `options` {Object}
2109+
* `signal` {AbortSignal} allows destroying the stream if the signal is
2110+
aborted.
2111+
* Returns: {Promise} a promise for the final value of the reduction.
2112+
2113+
This method calls `fn` on each chunk of the stream in order, passing it the
2114+
result from the calculation on the previous element. It returns a promise for
2115+
the final value of the reduction.
2116+
2117+
The reducer function iterates the stream element-by-element which means that
2118+
there is no `concurrency` parameter or parallism. To perform a `reduce`
2119+
concurrently, it can be chained to the [`readable.map`][] method.
2120+
2121+
If no `initial` value is supplied the first chunk of the stream is used as the
2122+
initial value. If the stream is empty, the promise is rejected with a
2123+
`TypeError` with the `ERR_INVALID_ARGS` code property.
2124+
2125+
```mjs
2126+
import { Readable } from 'stream';
2127+
2128+
const ten = await Readable.from([1, 2, 3, 4]).reduce((previous, data) => {
2129+
return previous + data;
2130+
});
2131+
console.log(ten); // 10
2132+
```
2133+
20912134
### Duplex and transform streams
20922135

20932136
#### Class: `stream.Duplex`
@@ -4163,6 +4206,7 @@ contain multi-byte characters.
41634206
[`process.stdin`]: process.md#processstdin
41644207
[`process.stdout`]: process.md#processstdout
41654208
[`readable._read()`]: #readable_readsize
4209+
[`readable.map`]: #readablemapfn-options
41664210
[`readable.push('')`]: #readablepush
41674211
[`readable.setEncoding()`]: #readablesetencodingencoding
41684212
[`stream.Readable.from()`]: #streamreadablefromiterable-options

lib/internal/streams/end-of-stream.js

+15
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ const {
1717
validateObject,
1818
} = require('internal/validators');
1919

20+
const { Promise } = primordials;
21+
2022
const {
2123
isClosed,
2224
isReadable,
@@ -234,4 +236,17 @@ function eos(stream, options, callback) {
234236
return cleanup;
235237
}
236238

239+
function finished(stream, opts) {
240+
return new Promise((resolve, reject) => {
241+
eos(stream, opts, (err) => {
242+
if (err) {
243+
reject(err);
244+
} else {
245+
resolve();
246+
}
247+
});
248+
});
249+
}
250+
237251
module.exports = eos;
252+
module.exports.finished = finished;

lib/internal/streams/operators.js

+56-3
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,14 @@ const { AbortController } = require('internal/abort_controller');
55
const {
66
codes: {
77
ERR_INVALID_ARG_TYPE,
8+
ERR_MISSING_ARGS,
89
ERR_OUT_OF_RANGE,
910
},
1011
AbortError,
1112
} = require('internal/errors');
1213
const { validateInteger } = require('internal/validators');
1314
const { kWeakHandler } = require('internal/event_target');
15+
const { finished } = require('internal/streams/end-of-stream');
1416

1517
const {
1618
ArrayPrototypePush,
@@ -198,8 +200,8 @@ async function every(fn, options) {
198200
'fn', ['Function', 'AsyncFunction'], fn);
199201
}
200202
// https://en.wikipedia.org/wiki/De_Morgan%27s_laws
201-
return !(await some.call(this, async (x) => {
202-
return !(await fn(x));
203+
return !(await some.call(this, async (...args) => {
204+
return !(await fn(...args));
203205
}, options));
204206
}
205207

@@ -230,11 +232,61 @@ function filter(fn, options) {
230232
return this.map(filterFn, options);
231233
}
232234

235+
// Specific to provide better error to reduce since the argument is only
236+
// missing if the stream has no items in it - but the code is still appropriate
237+
class ReduceAwareErrMissingArgs extends ERR_MISSING_ARGS {
238+
constructor() {
239+
super('reduce');
240+
this.message = 'Reduce of an empty stream requires an initial value';
241+
}
242+
}
243+
244+
async function reduce(reducer, initialValue, options) {
245+
if (typeof reducer !== 'function') {
246+
throw new ERR_INVALID_ARG_TYPE(
247+
'reducer', ['Function', 'AsyncFunction'], reducer);
248+
}
249+
let hasInitialValue = arguments.length > 1;
250+
if (options?.signal?.aborted) {
251+
const err = new AbortError(undefined, { cause: options.signal.reason });
252+
this.once('error', () => {}); // The error is already propagated
253+
await finished(this.destroy(err));
254+
throw err;
255+
}
256+
const ac = new AbortController();
257+
const signal = ac.signal;
258+
if (options?.signal) {
259+
const opts = { once: true, [kWeakHandler]: this };
260+
options.signal.addEventListener('abort', () => ac.abort(), opts);
261+
}
262+
let gotAnyItemFromStream = false;
263+
try {
264+
for await (const value of this) {
265+
gotAnyItemFromStream = true;
266+
if (options?.signal?.aborted) {
267+
throw new AbortError();
268+
}
269+
if (!hasInitialValue) {
270+
initialValue = value;
271+
hasInitialValue = true;
272+
} else {
273+
initialValue = await reducer(initialValue, value, { signal });
274+
}
275+
}
276+
if (!gotAnyItemFromStream && !hasInitialValue) {
277+
throw new ReduceAwareErrMissingArgs();
278+
}
279+
} finally {
280+
ac.abort();
281+
}
282+
return initialValue;
283+
}
284+
233285
async function toArray(options) {
234286
const result = [];
235287
for await (const val of this) {
236288
if (options?.signal?.aborted) {
237-
throw new AbortError({ cause: options.signal.reason });
289+
throw new AbortError(undefined, { cause: options.signal.reason });
238290
}
239291
ArrayPrototypePush(result, val);
240292
}
@@ -312,6 +364,7 @@ module.exports.streamReturningOperators = {
312364
module.exports.promiseReturningOperators = {
313365
every,
314366
forEach,
367+
reduce,
315368
toArray,
316369
some,
317370
};

lib/stream/promises.js

+1-13
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const {
1111
} = require('internal/streams/utils');
1212

1313
const { pipelineImpl: pl } = require('internal/streams/pipeline');
14-
const eos = require('internal/streams/end-of-stream');
14+
const { finished } = require('internal/streams/end-of-stream');
1515

1616
function pipeline(...streams) {
1717
return new Promise((resolve, reject) => {
@@ -35,18 +35,6 @@ function pipeline(...streams) {
3535
});
3636
}
3737

38-
function finished(stream, opts) {
39-
return new Promise((resolve, reject) => {
40-
eos(stream, opts, (err) => {
41-
if (err) {
42-
reject(err);
43-
} else {
44-
resolve();
45-
}
46-
});
47-
});
48-
}
49-
5038
module.exports = {
5139
finished,
5240
pipeline,

test/parallel/test-stream-reduce.js

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
9+
function sum(p, c) {
10+
return p + c;
11+
}
12+
13+
{
14+
// Does the same thing as `(await stream.toArray()).reduce(...)`
15+
(async () => {
16+
const tests = [
17+
[[], sum, 0],
18+
[[1], sum, 0],
19+
[[1, 2, 3, 4, 5], sum, 0],
20+
[[...Array(100).keys()], sum, 0],
21+
[['a', 'b', 'c'], sum, ''],
22+
[[1, 2], sum],
23+
[[1, 2, 3], (x, y) => y],
24+
];
25+
for (const [values, fn, initial] of tests) {
26+
const streamReduce = await Readable.from(values)
27+
.reduce(fn, initial);
28+
const arrayReduce = values.reduce(fn, initial);
29+
assert.deepStrictEqual(streamReduce, arrayReduce);
30+
}
31+
// Does the same thing as `(await stream.toArray()).reduce(...)` with an
32+
// asynchronous reducer
33+
for (const [values, fn, initial] of tests) {
34+
const streamReduce = await Readable.from(values)
35+
.map(async (x) => x)
36+
.reduce(fn, initial);
37+
const arrayReduce = values.reduce(fn, initial);
38+
assert.deepStrictEqual(streamReduce, arrayReduce);
39+
}
40+
})().then(common.mustCall());
41+
}
42+
{
43+
// Works with an async reducer, with or without initial value
44+
(async () => {
45+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c, 0);
46+
assert.strictEqual(six, 6);
47+
})().then(common.mustCall());
48+
(async () => {
49+
const six = await Readable.from([1, 2, 3]).reduce(async (p, c) => p + c);
50+
assert.strictEqual(six, 6);
51+
})().then(common.mustCall());
52+
}
53+
{
54+
// Works lazily
55+
assert.rejects(Readable.from([1, 2, 3, 4, 5, 6])
56+
.map(common.mustCall((x) => {
57+
return x;
58+
}, 3)) // Two consumed and one buffered by `map` due to default concurrency
59+
.reduce(async (p, c) => {
60+
if (p === 1) {
61+
throw new Error('boom');
62+
}
63+
return c;
64+
}, 0)
65+
, /boom/).then(common.mustCall());
66+
}
67+
68+
{
69+
// Support for AbortSignal
70+
const ac = new AbortController();
71+
assert.rejects(async () => {
72+
await Readable.from([1, 2, 3]).reduce(async (p, c) => {
73+
if (c === 3) {
74+
await new Promise(() => {}); // Explicitly do not pass signal here
75+
}
76+
return Promise.resolve();
77+
}, 0, { signal: ac.signal });
78+
}, {
79+
name: 'AbortError',
80+
}).then(common.mustCall());
81+
ac.abort();
82+
}
83+
84+
85+
{
86+
// Support for AbortSignal - pre aborted
87+
const stream = Readable.from([1, 2, 3]);
88+
assert.rejects(async () => {
89+
await stream.reduce(async (p, c) => {
90+
if (c === 3) {
91+
await new Promise(() => {}); // Explicitly do not pass signal here
92+
}
93+
return Promise.resolve();
94+
}, 0, { signal: AbortSignal.abort() });
95+
}, {
96+
name: 'AbortError',
97+
}).then(common.mustCall(() => {
98+
assert.strictEqual(stream.destroyed, true);
99+
}));
100+
}
101+
102+
{
103+
// Support for AbortSignal - deep
104+
const stream = Readable.from([1, 2, 3]);
105+
assert.rejects(async () => {
106+
await stream.reduce(async (p, c, { signal }) => {
107+
signal.addEventListener('abort', common.mustCall(), { once: true });
108+
if (c === 3) {
109+
await new Promise(() => {}); // Explicitly do not pass signal here
110+
}
111+
return Promise.resolve();
112+
}, 0, { signal: AbortSignal.abort() });
113+
}, {
114+
name: 'AbortError',
115+
}).then(common.mustCall(() => {
116+
assert.strictEqual(stream.destroyed, true);
117+
}));
118+
}
119+
120+
{
121+
// Error cases
122+
assert.rejects(() => Readable.from([]).reduce(1), /TypeError/);
123+
assert.rejects(() => Readable.from([]).reduce('5'), /TypeError/);
124+
}
125+
126+
{
127+
// Test result is a Promise
128+
const result = Readable.from([1, 2, 3, 4, 5]).reduce(sum, 0);
129+
assert.ok(result instanceof Promise);
130+
}

0 commit comments

Comments
 (0)