From 6d836d32097fb5843bf6564dbc9c74e8d79708ab Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 14 Aug 2024 15:26:18 -0400 Subject: [PATCH 1/2] use FinalizationRegistry for cloned response body --- lib/web/fetch/response.js | 4 ++++ test/fetch/fire-and-forget.js | 3 ++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/web/fetch/response.js b/lib/web/fetch/response.js index 603410a4a63..cd47700467e 100644 --- a/lib/web/fetch/response.js +++ b/lib/web/fetch/response.js @@ -254,6 +254,10 @@ class Response { // 2. Let clonedResponse be the result of cloning this’s response. const clonedResponse = cloneResponse(this[kState]) + if (hasFinalizationRegistry && this[kState].body?.stream) { + registry.register(this, new WeakRef(this[kState].body.stream)) + } + // 3. Return the result of creating a Response object, given // clonedResponse, this’s headers’s guard, and this’s relevant Realm. return fromInnerResponse(clonedResponse, getHeadersGuard(this[kHeaders])) diff --git a/test/fetch/fire-and-forget.js b/test/fetch/fire-and-forget.js index d8090885eb8..d356729b14d 100644 --- a/test/fetch/fire-and-forget.js +++ b/test/fetch/fire-and-forget.js @@ -37,8 +37,9 @@ test('does not need the body to be consumed to continue', { timeout: 180_000, sk // eslint-disable-next-line no-undef gc(true) const array = new Array(batch) - for (let i = 0; i < batch; i++) { + for (let i = 0; i < batch; i += 2) { array[i] = fetch(url).catch(() => {}) + array[i + 1] = fetch(url).then(r => r.clone()).catch(() => {}) } await Promise.all(array) await sleep(delay) From 6c73698446c6e18f6a1c26bb6765c80adbdb11e0 Mon Sep 17 00:00:00 2001 From: Khafra Date: Wed, 14 Aug 2024 16:17:15 -0400 Subject: [PATCH 2/2] fixup --- lib/web/fetch/body.js | 25 ++++++++++++++++++++++--- lib/web/fetch/request.js | 2 +- lib/web/fetch/response.js | 25 +++---------------------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/lib/web/fetch/body.js b/lib/web/fetch/body.js index 55718ac7c81..a3f9fe69656 100644 --- a/lib/web/fetch/body.js +++ b/lib/web/fetch/body.js @@ -16,12 +16,25 @@ const { kState } = require('./symbols') const { webidl } = require('./webidl') const { Blob } = require('node:buffer') const assert = require('node:assert') -const { isErrored } = require('../../core/util') const { isArrayBuffer } = require('node:util/types') const { serializeAMimeType } = require('./data-url') const { multipartFormDataParser } = require('./formdata-parser') +const { isDisturbed, isErrored } = require('node:stream') const textEncoder = new TextEncoder() +function noop () {} + +const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0 +let streamRegistry + +if (hasFinalizationRegistry) { + streamRegistry = new FinalizationRegistry((weakRef) => { + const stream = weakRef.deref() + if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) { + stream.cancel('Response object has been garbage collected').catch(noop) + } + }) +} // https://fetch.spec.whatwg.org/#concept-bodyinit-extract function extractBody (object, keepalive = false) { @@ -264,7 +277,7 @@ function safelyExtractBody (object, keepalive = false) { return extractBody(object, keepalive) } -function cloneBody (body) { +function cloneBody (instance, body) { // To clone a body body, run these steps: // https://fetch.spec.whatwg.org/#concept-body-clone @@ -272,6 +285,10 @@ function cloneBody (body) { // 1. Let « out1, out2 » be the result of teeing body’s stream. const [out1, out2] = body.stream.tee() + if (hasFinalizationRegistry) { + streamRegistry.register(instance, new WeakRef(out1)) + } + // 2. Set body’s stream to out1. body.stream = out1 @@ -496,5 +513,7 @@ module.exports = { extractBody, safelyExtractBody, cloneBody, - mixinBody + mixinBody, + streamRegistry, + hasFinalizationRegistry } diff --git a/lib/web/fetch/request.js b/lib/web/fetch/request.js index 51bb2b3ab14..c4da68430b6 100644 --- a/lib/web/fetch/request.js +++ b/lib/web/fetch/request.js @@ -875,7 +875,7 @@ function cloneRequest (request) { // 2. If request’s body is non-null, set newRequest’s body to the // result of cloning request’s body. if (request.body != null) { - newRequest.body = cloneBody(request.body) + newRequest.body = cloneBody(newRequest, request.body) } // 3. Return newRequest. diff --git a/lib/web/fetch/response.js b/lib/web/fetch/response.js index cd47700467e..fc5240ca0ae 100644 --- a/lib/web/fetch/response.js +++ b/lib/web/fetch/response.js @@ -1,7 +1,7 @@ 'use strict' const { Headers, HeadersList, fill, getHeadersGuard, setHeadersGuard, setHeadersList } = require('./headers') -const { extractBody, cloneBody, mixinBody } = require('./body') +const { extractBody, cloneBody, mixinBody, hasFinalizationRegistry, streamRegistry } = require('./body') const util = require('../../core/util') const nodeUtil = require('node:util') const { kEnumerableProperty } = util @@ -26,24 +26,9 @@ const { URLSerializer } = require('./data-url') const { kConstruct } = require('../../core/symbols') const assert = require('node:assert') const { types } = require('node:util') -const { isDisturbed, isErrored } = require('node:stream') const textEncoder = new TextEncoder('utf-8') -const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0 -let registry - -if (hasFinalizationRegistry) { - registry = new FinalizationRegistry((weakRef) => { - const stream = weakRef.deref() - if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) { - stream.cancel('Response object has been garbage collected').catch(noop) - } - }) -} - -function noop () {} - // https://fetch.spec.whatwg.org/#response-class class Response { // Creates network error Response. @@ -254,10 +239,6 @@ class Response { // 2. Let clonedResponse be the result of cloning this’s response. const clonedResponse = cloneResponse(this[kState]) - if (hasFinalizationRegistry && this[kState].body?.stream) { - registry.register(this, new WeakRef(this[kState].body.stream)) - } - // 3. Return the result of creating a Response object, given // clonedResponse, this’s headers’s guard, and this’s relevant Realm. return fromInnerResponse(clonedResponse, getHeadersGuard(this[kHeaders])) @@ -331,7 +312,7 @@ function cloneResponse (response) { // 3. If response’s body is non-null, then set newResponse’s body to the // result of cloning response’s body. if (response.body != null) { - newResponse.body = cloneBody(response.body) + newResponse.body = cloneBody(newResponse, response.body) } // 4. Return newResponse. @@ -536,7 +517,7 @@ function fromInnerResponse (innerResponse, guard) { // a primitive or an object, even undefined. If the held value is an object, the registry keeps // a strong reference to it (so it can pass it to the cleanup callback later). Reworded from // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry - registry.register(response, new WeakRef(innerResponse.body.stream)) + streamRegistry.register(response, new WeakRef(innerResponse.body.stream)) } return response