From d015c34de700e0d2532ff5b9bfe9b53a8940f31e Mon Sep 17 00:00:00 2001 From: Austin Henrie Date: Mon, 21 Oct 2024 14:40:43 -0600 Subject: [PATCH 1/2] fix aborting Streams --- lib/web/fetch/index.js | 11 ++++++----- test/fetch/issue-1711.js | 33 --------------------------------- 2 files changed, 6 insertions(+), 38 deletions(-) delete mode 100644 test/fetch/issue-1711.js diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 00d34071f7f..b9b5369b57d 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -80,6 +80,12 @@ class Fetch extends EE { this.connection = null this.dump = false this.state = 'ongoing' + // 2 terminated listeners get added per request, + // but only 1 gets removed. If there are 20 redirects, + // 21 listeners will be added. + // See https://github.com/nodejs/undici/issues/1711 + // TODO (fix): Find and fix root cause for leaked listener. + this.setMaxListeners(21) } terminate (reason) { @@ -1943,7 +1949,6 @@ async function httpNetworkFetch ( // 19. Run these steps in parallel: // 1. Run these steps, but abort when fetchParams is canceled: - fetchParams.controller.onAborted = onAborted fetchParams.controller.on('terminated', onAborted) fetchParams.controller.resume = async () => { // 1. While true @@ -2205,10 +2210,6 @@ async function httpNetworkFetch ( fetchParams.controller.off('terminated', this.abort) } - if (fetchParams.controller.onAborted) { - fetchParams.controller.off('terminated', fetchParams.controller.onAborted) - } - fetchParams.controller.ended = true this.body.push(null) diff --git a/test/fetch/issue-1711.js b/test/fetch/issue-1711.js deleted file mode 100644 index b024e411195..00000000000 --- a/test/fetch/issue-1711.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict' - -const assert = require('node:assert') -const { once } = require('node:events') -const { createServer } = require('node:http') -const { test } = require('node:test') -const { fetch } = require('../..') - -test('Redirecting a bunch does not cause a MaxListenersExceededWarning', async (t) => { - let redirects = 0 - - const server = createServer((req, res) => { - if (redirects === 15) { - res.end('Okay goodbye') - return - } - - res.writeHead(302, { - Location: `/${redirects++}` - }) - res.end() - }).listen(0) - - t.after(server.close.bind(server)) - await once(server, 'listening') - - process.emitWarning = assert.bind(null, false) - - const url = `http://localhost:${server.address().port}` - const response = await fetch(url, { redirect: 'follow' }) - - assert.deepStrictEqual(response.url, `${url}/${redirects - 1}`) -}) From 4a0a7cdc7f57109fa28d567cf79e1d9f7542edef Mon Sep 17 00:00:00 2001 From: Austin Henrie Date: Tue, 29 Oct 2024 13:28:52 -0600 Subject: [PATCH 2/2] stop memory leak --- lib/web/fetch/index.js | 11 +++----- test/fetch/issue-1711.js | 60 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 7 deletions(-) create mode 100644 test/fetch/issue-1711.js diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index b9b5369b57d..38bcfd3dfc2 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -80,12 +80,6 @@ class Fetch extends EE { this.connection = null this.dump = false this.state = 'ongoing' - // 2 terminated listeners get added per request, - // but only 1 gets removed. If there are 20 redirects, - // 21 listeners will be added. - // See https://github.com/nodejs/undici/issues/1711 - // TODO (fix): Find and fix root cause for leaked listener. - this.setMaxListeners(21) } terminate (reason) { @@ -1949,7 +1943,10 @@ async function httpNetworkFetch ( // 19. Run these steps in parallel: // 1. Run these steps, but abort when fetchParams is canceled: - fetchParams.controller.on('terminated', onAborted) + if (!fetchParams.controller.resume) { + fetchParams.controller.on('terminated', onAborted) + } + fetchParams.controller.resume = async () => { // 1. While true while (true) { diff --git a/test/fetch/issue-1711.js b/test/fetch/issue-1711.js new file mode 100644 index 00000000000..be48d160c20 --- /dev/null +++ b/test/fetch/issue-1711.js @@ -0,0 +1,60 @@ +'use strict' + +const assert = require('node:assert') +const { once } = require('node:events') +const { createServer } = require('node:http') +const { test } = require('node:test') +const { fetch } = require('../..') + +test('Redirecting a bunch does not cause a MaxListenersExceededWarning', async (t) => { + let redirects = 0 + + const server = createServer((req, res) => { + if (redirects === 15) { + res.end('Okay goodbye') + return + } + + res.writeHead(302, { + Location: `/${redirects++}` + }) + res.end() + }).listen(0) + + t.after(server.close.bind(server)) + await once(server, 'listening') + + process.emitWarning = assert.bind(null, false) + + const url = `http://localhost:${server.address().port}` + const response = await fetch(url, { redirect: 'follow' }) + + assert.deepStrictEqual(response.url, `${url}/${redirects - 1}`) +}) + +test( + 'aborting a Stream throws', + () => { + return new Promise((resolve, reject) => { + const httpServer = createServer((request, response) => { + response.end(new Uint8Array(20000)) + }).listen(async () => { + const serverAddress = httpServer.address() + + if (typeof serverAddress === 'object') { + const abortController = new AbortController() + const readStream = (await fetch(`http://localhost:${serverAddress?.port}`, { signal: abortController.signal })).arrayBuffer() + abortController.abort() + setTimeout(reject) + + try { + await readStream + } catch { + httpServer.close() + resolve() + } + } + }) + }) + } +)