Skip to content

Stream not destroyed when piped in Duplex.from() writable #55077

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
matthieusieben opened this issue Sep 23, 2024 · 7 comments
Open

Stream not destroyed when piped in Duplex.from() writable #55077

matthieusieben opened this issue Sep 23, 2024 · 7 comments
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.

Comments

@matthieusieben
Copy link
Contributor

Version

v22.9.0

Platform

Darwin MBPM.local 24.0.0 Darwin Kernel Version 24.0.0: Mon Aug 12 20:52:31 PDT 2024; root:xnu-11215.1.10~2/RELEASE_ARM64_T6030 arm64

Subsystem

stream

What steps will reproduce the bug?

const { Duplex, Readable, pipeline } = require('node:stream')

pipeline(
  Readable.from(
    (async function* () {
      try {
        console.log('Readable stream started')
        let i = 32
        while (i-- > 0) {
          yield Buffer.from('Hello, world!')
          console.log('Readable stream resumed')
        }
        console.log('Readable stream ended')
      } catch (err) {
        console.log('Readable stream errored', err)
      } finally {
        console.log('Readable stream destroyed')
      }
    })(),
    { objectMode: false, autoDestroy: true, highWaterMark: 1 }
  ),
  Duplex.from(async function (asyncGenerator) {
    asyncGenerator.return()
  }),
  (err) => {
    if (err) {
      console.error('Pipeline failed', err)
    } else {
      console.log('Pipeline succeeded')
    }
  }
)

How often does it reproduce? Is there a required condition?

Any code used as Duplex.from(async function () {}) that does not fully consume the input will prevent the input stream to be properly disposed:

  Duplex.from(async function (asyncGenerator) {
    for await (const chunk of asyncGenerator) {
      console.log('Writable stream received', chunk)
      break
    }
  })
  Duplex.from(async function (asyncGenerator) {
    asyncGenerator.retrun()
  })
  Duplex.from(async function (asyncGenerator) {
    // do nothing
  })

What is the expected behavior? Why is that the expected behavior?

The readable's destroy steps should be called.

What do you see instead?

The readable is not destroyed.

Additional information

No response

@avivkeller avivkeller added stream Issues and PRs related to the stream subsystem. web streams labels Sep 23, 2024
@avivkeller
Copy link
Member

avivkeller commented Sep 23, 2024

Related issue: #55010
CC @nodejs/streams

@ronag
Copy link
Member

ronag commented Sep 23, 2024

I don't understand what the problem is.

@matthieusieben
Copy link
Contributor Author

Calling asyncGenerator.retrun() should cause the readable to be destroyed, and Readable stream destroyed to be printed. This is not the case. The pipeline callback function is never called either.

@mcollina
Copy link
Member

Sounds like a bug.

@mcollina mcollina added the confirmed-bug Issues with confirmed bugs. label Sep 23, 2024
@mcollina
Copy link
Member

@matthieusieben would you like to send a PR to fix it?

@matthieusieben
Copy link
Contributor Author

I gave it a try but I struggle a bit.

@ronag
Copy link
Member

ronag commented Nov 23, 2024

Is this also a problem if you do:

  Duplex.from(async function (asyncGenerator) {
    for await (const _ of asyncGenerator) { return }
  }),

instead of:

  Duplex.from(async function (asyncGenerator) {
    asyncGenerator.return()
  }),

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
confirmed-bug Issues with confirmed bugs. stream Issues and PRs related to the stream subsystem.
Projects
None yet
Development

No branches or pull requests

4 participants