From 030eacb7991f25ab4b4193578bc6f823f6af8788 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 25 Apr 2023 12:59:03 +0530 Subject: [PATCH 1/2] stream: prevent pipeline hang with generator functions Fixes: https://github.com/nodejs/node/issues/47708 --- lib/internal/streams/pipeline.js | 3 +-- test/parallel/test-stream-pipeline.js | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js index 95737d95e48e41..062bdc192d1310 100644 --- a/lib/internal/streams/pipeline.js +++ b/lib/internal/streams/pipeline.js @@ -138,10 +138,9 @@ async function pumpToNode(iterable, writable, finish, { end }) { if (end) { writable.end(); + await wait(); } - await wait(); - finish(); } catch (err) { finish(error !== err ? aggregateTwoErrors(error, err) : err); diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index d37ca275f1dddf..f6ec97425987f0 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1616,3 +1616,20 @@ const tsp = require('timers/promises'); dup.push(null); dup.read(); } + +{ + let res = ''; + const writable = new Writable({ + write(chunk, enc, cb) { + res += chunk; + cb(); + } + }); + pipelinep(async function*() { + yield 'hello'; + yield 'world'; + }, writable, { end: false }).then(common.mustCall(() => { + assert.strictEqual(res, 'helloworld'); + assert.strictEqual(writable.closed, false); + })); +} From c3ab70d95e9360f30ae51a239071e5797c8ce8c1 Mon Sep 17 00:00:00 2001 From: Debadree Chatterjee Date: Tue, 25 Apr 2023 18:53:45 +0530 Subject: [PATCH 2/2] fixup! add a promise.resolve in test --- test/parallel/test-stream-pipeline.js | 1 + 1 file changed, 1 insertion(+) diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js index f6ec97425987f0..e9f6a2fdf711d3 100644 --- a/test/parallel/test-stream-pipeline.js +++ b/test/parallel/test-stream-pipeline.js @@ -1627,6 +1627,7 @@ const tsp = require('timers/promises'); }); pipelinep(async function*() { yield 'hello'; + await Promise.resolve(); yield 'world'; }, writable, { end: false }).then(common.mustCall(() => { assert.strictEqual(res, 'helloworld');