diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js index 63d56b6880b..9406a0b722f 100644 --- a/lib/interceptor/cache.js +++ b/lib/interceptor/cache.js @@ -1,5 +1,6 @@ 'use strict' +const assert = require('node:assert') const util = require('../core/util') const CacheHandler = require('../handler/cache-handler') const MemoryCacheStore = require('../cache/memory-cache-store') @@ -39,41 +40,37 @@ module.exports = (opts = {}) => { return dispatch(opts, handler) } + // TODO (perf): For small entries support returning a Buffer instead of a stream. const stream = store.createReadStream(opts) if (!stream) { // Request isn't cached return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } - let onErrorCalled = false - /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value */ const respondWithCachedValue = (stream, value) => { - const ac = new AbortController() - const signal = ac.signal + let completed = false - signal.onabort = (_, err) => { - stream.destroy() - if (!onErrorCalled) { - handler.onError(err) - onErrorCalled = true - } - } + assert(!stream.destroyed, 'stream should not be destroyed') + assert(!stream.readableDidRead, 'stream should not be readableDidRead') stream.on('error', (err) => { - if (!onErrorCalled) { + if (!completed && typeof handler.onError === 'function') { handler.onError(err) - onErrorCalled = true } }) try { if (typeof handler.onConnect === 'function') { - handler.onConnect(ac.abort) - signal.throwIfAborted() + handler.onConnect((err) => { + stream.destroy(err) + }) + if (stream.destroyed) { + return + } } if (typeof handler.onHeaders === 'function') { @@ -83,36 +80,33 @@ module.exports = (opts = {}) => { value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`)) - handler.onHeaders(value.statusCode, value.rawHeaders, stream.resume, value.statusMessage) - signal.throwIfAborted() + handler.onHeaders(value.statusCode, value.rawHeaders, () => stream.resume(), value.statusMessage) + if (stream.destroyed) { + return + } } if (opts.method === 'HEAD') { if (typeof handler.onComplete === 'function') { + completed = true handler.onComplete(null) - stream.destroy() } + stream.destroy() } else { - if (typeof handler.onData === 'function') { - stream.on('data', chunk => { - if (!handler.onData(chunk)) { - stream.pause() - } - }) - } - - if (typeof handler.onComplete === 'function') { - stream.on('end', () => { + stream.on('data', chunk => { + if (typeof handler.onData === 'function' && !handler.onData(chunk)) { + stream.pause() + } + }) + + stream.on('end', () => { + if (typeof handler.onComplete === 'function') { handler.onComplete(value.rawTrailers ?? []) - }) - } + } + }) } } catch (err) { stream.destroy(err) - if (!onErrorCalled && typeof handler.onError === 'function') { - handler.onError(err) - onErrorCalled = true - } } } @@ -125,9 +119,11 @@ module.exports = (opts = {}) => { return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } + // TODO (fix): It's weird that "value" lives on stream. const { value } = stream - // Dump body on error + // Dump body if cached... + // TODO (fix): This is a bit suspect. if (util.isStream(opts.body)) { opts.body?.on('error', () => {}).resume() } @@ -135,11 +131,13 @@ module.exports = (opts = {}) => { // Check if the response is stale const now = Date.now() if (now >= value.staleAt) { + // TODO (fix): This whole bit is a bit suspect. In particular given that + // we dumped the body above. + if (now >= value.deleteAt) { // Safety check in case the store gave us a response that should've been - // deleted already - dispatch(opts, new CacheHandler(globalOpts, opts, handler)) - return + // deleted already + return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) } if (!opts.headers) { @@ -149,21 +147,23 @@ module.exports = (opts = {}) => { opts.headers['if-modified-since'] = new Date(value.cachedAt).toUTCString() // Need to revalidate the response - dispatch( + return dispatch( opts, new CacheRevalidationHandler( () => respondWithCachedValue(stream, value), new CacheHandler(globalOpts, opts, handler) ) ) - - return } respondWithCachedValue(stream, value) } - Promise.resolve(stream).then(handleStream).catch(handler.onError) + Promise.resolve(stream).then(handleStream, err => { + if (typeof handler.onError === 'function') { + handler.onError(err) + } + }) return true } diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 7fa1aeab0ed..b05e0c871cb 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -49,7 +49,7 @@ declare namespace CacheHandler { export interface CacheStoreValue { statusCode: number; statusMessage: string; - rawHeaders: (Buffer | Buffer[])[]; + rawHeaders: Buffer[]; rawTrailers?: string[]; /** * Headers defined by the Vary header and their respective values for