From cde714f7f5b8469c5b598bb9d52455891a84386b Mon Sep 17 00:00:00 2001 From: Khafra Date: Thu, 15 Aug 2024 13:27:28 -0400 Subject: [PATCH] use FinalizationRegistry for cloned response body (#3458) Signed-off-by: Matteo Collina --- lib/web/fetch/body.js | 23 +++++++++++++++++++++-- lib/web/fetch/request.js | 2 +- lib/web/fetch/response.js | 21 +++------------------ test/fetch/fire-and-forget.js | 3 ++- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/lib/web/fetch/body.js b/lib/web/fetch/body.js index a8c996b50eb..464e7b50e5c 100644 --- a/lib/web/fetch/body.js +++ b/lib/web/fetch/body.js @@ -16,12 +16,25 @@ const { kState } = require('./symbols') const { webidl } = require('./webidl') const { Blob } = require('node:buffer') const assert = require('node:assert') -const { isErrored } = require('../../core/util') +const { isErrored, isDisturbed } = require('node:stream') const { isArrayBuffer } = require('node:util/types') const { serializeAMimeType } = require('./data-url') const { multipartFormDataParser } = require('./formdata-parser') const textEncoder = new TextEncoder() +function noop () {} + +const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0 +let streamRegistry + +if (hasFinalizationRegistry) { + streamRegistry = new FinalizationRegistry((weakRef) => { + const stream = weakRef.deref() + if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) { + stream.cancel('Response object has been garbage collected').catch(noop) + } + }) +} // https://fetch.spec.whatwg.org/#concept-bodyinit-extract function extractBody (object, keepalive = false) { @@ -264,7 +277,7 @@ function safelyExtractBody (object, keepalive = false) { return extractBody(object, keepalive) } -function cloneBody (body) { +function cloneBody (instance, body) { // To clone a body body, run these steps: // https://fetch.spec.whatwg.org/#concept-body-clone @@ -272,6 +285,10 @@ function cloneBody (body) { // 1. Let « out1, out2 » be the result of teeing body’s stream. const [out1, out2] = body.stream.tee() + if (hasFinalizationRegistry) { + streamRegistry.register(instance, new WeakRef(out1)) + } + // 2. Set body’s stream to out1. body.stream = out1 @@ -499,5 +516,7 @@ module.exports = { safelyExtractBody, cloneBody, mixinBody, + streamRegistry, + hasFinalizationRegistry, bodyUnusable } diff --git a/lib/web/fetch/request.js b/lib/web/fetch/request.js index b63d12efd38..542ea7fb28a 100644 --- a/lib/web/fetch/request.js +++ b/lib/web/fetch/request.js @@ -877,7 +877,7 @@ function cloneRequest (request) { // 2. If request’s body is non-null, set newRequest’s body to the // result of cloning request’s body. if (request.body != null) { - newRequest.body = cloneBody(request.body) + newRequest.body = cloneBody(newRequest, request.body) } // 3. Return newRequest. diff --git a/lib/web/fetch/response.js b/lib/web/fetch/response.js index 9909066fd38..155dbadd1ad 100644 --- a/lib/web/fetch/response.js +++ b/lib/web/fetch/response.js @@ -1,7 +1,7 @@ 'use strict' const { Headers, HeadersList, fill, getHeadersGuard, setHeadersGuard, setHeadersList } = require('./headers') -const { extractBody, cloneBody, mixinBody, bodyUnusable } = require('./body') +const { extractBody, cloneBody, mixinBody, hasFinalizationRegistry, streamRegistry, bodyUnusable } = require('./body') const util = require('../../core/util') const nodeUtil = require('node:util') const { kEnumerableProperty } = util @@ -26,24 +26,9 @@ const { URLSerializer } = require('./data-url') const { kConstruct } = require('../../core/symbols') const assert = require('node:assert') const { types } = require('node:util') -const { isDisturbed, isErrored } = require('node:stream') const textEncoder = new TextEncoder('utf-8') -const hasFinalizationRegistry = globalThis.FinalizationRegistry && process.version.indexOf('v18') !== 0 -let registry - -if (hasFinalizationRegistry) { - registry = new FinalizationRegistry((weakRef) => { - const stream = weakRef.deref() - if (stream && !stream.locked && !isDisturbed(stream) && !isErrored(stream)) { - stream.cancel('Response object has been garbage collected').catch(noop) - } - }) -} - -function noop () {} - // https://fetch.spec.whatwg.org/#response-class class Response { // Creates network error Response. @@ -327,7 +312,7 @@ function cloneResponse (response) { // 3. If response’s body is non-null, then set newResponse’s body to the // result of cloning response’s body. if (response.body != null) { - newResponse.body = cloneBody(response.body) + newResponse.body = cloneBody(newResponse, response.body) } // 4. Return newResponse. @@ -532,7 +517,7 @@ function fromInnerResponse (innerResponse, guard) { // a primitive or an object, even undefined. If the held value is an object, the registry keeps // a strong reference to it (so it can pass it to the cleanup callback later). Reworded from // https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/FinalizationRegistry - registry.register(response, new WeakRef(innerResponse.body.stream)) + streamRegistry.register(response, new WeakRef(innerResponse.body.stream)) } return response diff --git a/test/fetch/fire-and-forget.js b/test/fetch/fire-and-forget.js index d8090885eb8..d356729b14d 100644 --- a/test/fetch/fire-and-forget.js +++ b/test/fetch/fire-and-forget.js @@ -37,8 +37,9 @@ test('does not need the body to be consumed to continue', { timeout: 180_000, sk // eslint-disable-next-line no-undef gc(true) const array = new Array(batch) - for (let i = 0; i < batch; i++) { + for (let i = 0; i < batch; i += 2) { array[i] = fetch(url).catch(() => {}) + array[i + 1] = fetch(url).then(r => r.clone()).catch(() => {}) } await Promise.all(array) await sleep(delay)