Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 25, 2024
1 parent d44958d commit 41c3b90
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 131 deletions.
18 changes: 9 additions & 9 deletions lib/cache/sqlite-cache-store.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ const { DatabaseSync } = require('node:sqlite')
const { Writable } = require('stream')
const { assertCacheKey, assertCacheValue } = require('../util/cache.js')

const VERSION = 1
const VERSION = 2

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
* @implements {CacheStore}
*
* @typedef {{
* id: Readonly<number>
* rawHeaders?: string
* headers?: Record<string, string | string[]>
* vary?: string | object
* body: string
* } & import('../../types/cache-interceptor.d.ts').default.CacheValue} SqliteStoreValue
Expand Down Expand Up @@ -107,7 +107,7 @@ class SqliteCacheStore {
deleteAt INTEGER NOT NULL,
statusCode INTEGER NOT NULL,
statusMessage TEXT NOT NULL,
rawHeaders TEXT NULL,
headers TEXT NULL,
etag TEXT NULL,
vary TEXT NULL,
cachedAt INTEGER NOT NULL,
Expand All @@ -126,7 +126,7 @@ class SqliteCacheStore {
deleteAt,
statusCode,
statusMessage,
rawHeaders,
headers,
etag,
vary,
cachedAt,
Expand All @@ -145,7 +145,7 @@ class SqliteCacheStore {
deleteAt = ?,
statusCode = ?,
statusMessage = ?,
rawHeaders = ?,
headers = ?,
etag = ?,
cachedAt = ?,
staleAt = ?,
Expand All @@ -162,7 +162,7 @@ class SqliteCacheStore {
deleteAt,
statusCode,
statusMessage,
rawHeaders,
headers,
etag,
vary,
cachedAt,
Expand Down Expand Up @@ -221,7 +221,7 @@ class SqliteCacheStore {
body: value.body ? parseBufferArray(JSON.parse(value.body)) : null,
statusCode: value.statusCode,
statusMessage: value.statusMessage,
rawHeaders: value.rawHeaders ? parseBufferArray(JSON.parse(value.rawHeaders)) : undefined,
headers: value.headers ? JSON.parse(value.headers) : undefined,
etag: value.etag ? value.etag : undefined,
cachedAt: value.cachedAt,
staleAt: value.staleAt,
Expand Down Expand Up @@ -275,7 +275,7 @@ class SqliteCacheStore {
value.deleteAt,
value.statusCode,
value.statusMessage,
value.rawHeaders ? JSON.stringify(stringifyBufferArray(value.rawHeaders)) : null,
value.headers ? JSON.stringify(value.headers) : null,
value.etag,
value.cachedAt,
value.staleAt,
Expand All @@ -291,7 +291,7 @@ class SqliteCacheStore {
value.deleteAt,
value.statusCode,
value.statusMessage,
value.rawHeaders ? JSON.stringify(stringifyBufferArray(value.rawHeaders)) : null,
value.headers ? JSON.stringify(value.headers) : null,
value.etag ? value.etag : null,
value.vary ? JSON.stringify(value.vary) : null,
value.cachedAt,
Expand Down
1 change: 1 addition & 0 deletions lib/dispatcher/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Dispatcher extends EventEmitter {

dispatch = wrapInterceptor(dispatch)
dispatch = interceptor(dispatch)
dispatch = wrapInterceptor(dispatch)

if (dispatch == null || typeof dispatch !== 'function' || dispatch.length !== 2) {
throw new TypeError('invalid interceptor')
Expand Down
154 changes: 37 additions & 117 deletions lib/handler/cache-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,44 +49,25 @@ class CacheHandler extends DecoratorHandler {
this.#handler = handler
}

onConnect (...args) {
if (this.#writeStream) {
this.#writeStream.destroy()
this.#writeStream = undefined
}

if (typeof this.#handler.onConnect === 'function') {
this.#handler.onConnect(...args)
}
onRequestStart (controller, context) {
this.#writeStream?.destroy()
this.#writeStream = undefined
this.#handler.onRequestStart?.(controller, context)
}

/**
* @see {DispatchHandlers.onHeaders}
*
* @param {number} statusCode
* @param {Buffer[]} rawHeaders
* @param {() => void} resume
* @param {string} statusMessage
* @returns {boolean}
*/
onHeaders (
onResponseStart (
controller,
statusCode,
rawHeaders,
resume,
statusMessage
statusMessage,
headers
) {
const downstreamOnHeaders = () => {
if (typeof this.#handler.onHeaders === 'function') {
return this.#handler.onHeaders(
statusCode,
rawHeaders,
resume,
statusMessage
)
} else {
return true
}
}
const downstreamOnHeaders = () =>
this.#handler.onResponseStart?.(
controller,
statusCode,
statusMessage,
headers
)

if (
!util.safeHTTPMethods.includes(this.#cacheKey.method) &&
Expand All @@ -102,9 +83,6 @@ class CacheHandler extends DecoratorHandler {
return downstreamOnHeaders()
}

const parsedRawHeaders = util.parseRawHeaders(rawHeaders)
const headers = util.parseHeaders(parsedRawHeaders)

const cacheControlHeader = headers['cache-control']
if (!cacheControlHeader) {
// Don't have the cache control header or the cache is full
Expand All @@ -124,19 +102,15 @@ class CacheHandler extends DecoratorHandler {
: undefined
const deleteAt = determineDeleteAt(now, cacheControlDirectives, staleAt)

const strippedHeaders = stripNecessaryHeaders(
rawHeaders,
parsedRawHeaders,
cacheControlDirectives
)
const strippedHeaders = stripNecessaryHeaders(headers, cacheControlDirectives)

/**
* @type {import('../../types/cache-interceptor.d.ts').default.CacheValue}
*/
const value = {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
headers: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
Expand All @@ -152,7 +126,7 @@ class CacheHandler extends DecoratorHandler {
if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('drain', () => controller.resume())
.on('error', function () {
// TODO (fix): Make error somehow observable?
})
Expand All @@ -162,63 +136,36 @@ class CacheHandler extends DecoratorHandler {
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
controller.resume()
})
}
}

return downstreamOnHeaders()
}

/**
* @see {DispatchHandlers.onData}
*
* @param {Buffer} chunk
* @returns {boolean}
*/
onData (chunk) {
let paused = false

if (this.#writeStream) {
paused ||= this.#writeStream.write(chunk) === false
}

if (typeof this.#handler.onData === 'function') {
paused ||= this.#handler.onData(chunk) === false
onResponseData (controller, chunk) {
if (this.#writeStream?.write(chunk) === false) {
controller.pause()
}

return !paused
this.#handler.onResponseData?.(controller, chunk)
}

/**
* @see {DispatchHandlers.onComplete}
*
* @param {string[] | null} rawTrailers
*/
onComplete (rawTrailers) {
if (this.#writeStream) {
this.#writeStream.end()
}

if (typeof this.#handler.onComplete === 'function') {
return this.#handler.onComplete(rawTrailers)
}
onResponseEnd (controller, trailers) {
this.#writeStream?.end()
this.#handler.onResponseEnd?.(controller, trailers)
}

/**
* @see {DispatchHandlers.onError}
*
* @param {Error} err
*/
onError (err) {
if (this.#writeStream) {
this.#writeStream.destroy(err)
this.#writeStream = undefined
}

if (typeof this.#handler.onError === 'function') {
this.#handler.onError(err)
}
onResponseError (err) {
this.#writeStream?.destroy(err)
this.#writeStream = undefined
this.#handler?.onResponseError(err)
}
}

Expand Down Expand Up @@ -323,12 +270,11 @@ function determineDeleteAt (now, cacheControlDirectives, staleAt) {

/**
* Strips headers required to be removed in cached responses
* @param {Buffer[]} rawHeaders
* @param {string[]} parsedRawHeaders
* @param {Record<string, string | string[]>} headers
* @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives
* @returns {Buffer[]}
* @returns {Record<string, string | string []>}
*/
function stripNecessaryHeaders (rawHeaders, parsedRawHeaders, cacheControlDirectives) {
function stripNecessaryHeaders (headers, cacheControlDirectives) {
const headersToRemove = ['connection']

if (Array.isArray(cacheControlDirectives['no-cache'])) {
Expand All @@ -340,49 +286,23 @@ function stripNecessaryHeaders (rawHeaders, parsedRawHeaders, cacheControlDirect
}

let strippedHeaders

let offset = 0
for (let i = 0; i < parsedRawHeaders.length; i += 2) {
const headerName = parsedRawHeaders[i]

for (const headerName of Object.keys(headers)) {
if (headersToRemove.includes(headerName)) {
// We have at least one header we want to remove
if (!strippedHeaders) {
// This is the first header we want to remove, let's create the array
// Since we're stripping headers, this will over allocate. We'll trim
// it later.
strippedHeaders = new Array(parsedRawHeaders.length)

// Backfill the previous headers into it
for (let j = 0; j < i; j += 2) {
strippedHeaders[j] = parsedRawHeaders[j]
strippedHeaders[j + 1] = parsedRawHeaders[j + 1]
}
strippedHeaders = { ...headers }
}

// We can't map indices 1:1 from stripped headers to rawHeaders without
// creating holes (if we skip a header, we now have two holes where at
// element should be). So, let's keep an offset to keep strippedHeaders
// flattened. We can also use this at the end for trimming the empty
// elements off of strippedHeaders.
offset += 2
delete headers[headerName]

continue
}

// We want to keep this header. Let's add it to strippedHeaders if it exists
if (strippedHeaders) {
strippedHeaders[i - offset] = parsedRawHeaders[i]
strippedHeaders[i + 1 - offset] = parsedRawHeaders[i + 1]
}
}

if (strippedHeaders) {
// Trim off the empty values at the end
strippedHeaders.length -= offset
}

return strippedHeaders ? util.encodeRawHeaders(strippedHeaders) : rawHeaders
return strippedHeaders ?? headers
}

module.exports = CacheHandler
4 changes: 2 additions & 2 deletions lib/handler/unwrap-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ module.exports = class UnwrapHandler {

static unwrap (handler) {
// TODO (fix): More checks...
return handler.onConnect ? handler : new UnwrapHandler(handler)
return !handler.onRequestStart ? handler : new UnwrapHandler(handler)
}

onConnect (abort, context) {
Expand All @@ -73,7 +73,7 @@ module.exports = class UnwrapHandler {

onHeaders (statusCode, rawHeaders, resume, statusMessage) {
this.#controller[kResume] = resume
this.#handler.onResponseStart?.(this.#controller, statusCode, parseHeaders(rawHeaders))
this.#handler.onResponseStart?.(this.#controller, statusCode, statusMessage, parseHeaders(rawHeaders))
return !this.#controller.paused
}

Expand Down
2 changes: 1 addition & 1 deletion lib/handler/wrap-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ module.exports = class WrapHandler {
this.#handler.onUpgrade?.(statusCode, rawHeaders, socket)
}

onResponseHeaders (controller, statusCode, statusMessage, headers) {
onResponseStart (controller, statusCode, statusMessage, headers) {
const rawHeaders = []
for (const [key, val] of Object.entries(headers)) {
// TODO (fix): What if val is Array
Expand Down
4 changes: 2 additions & 2 deletions types/cache-interceptor.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ declare namespace CacheHandler {
export interface CacheValue {
statusCode: number
statusMessage: string
rawHeaders: Buffer[]
headers: Record<string, string | string[]>
vary?: Record<string, string | string[]>
etag?: string
cachedAt: number
Expand All @@ -49,7 +49,7 @@ declare namespace CacheHandler {
type GetResult = {
statusCode: number
statusMessage: string
rawHeaders: Buffer[]
headers: Record<string, string | string[]>
etag?: string
body: null | Readable | Iterable<Buffer> | AsyncIterable<Buffer> | Buffer | Iterable<string> | AsyncIterable<string> | string
cachedAt: number
Expand Down

0 comments on commit 41c3b90

Please # to comment.