Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/cache/entry.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
const { Request, Response } = require('minipass-fetch')
const Minipass = require('minipass')
const MinipassFlush = require('minipass-flush')
const MinipassPipeline = require('minipass-pipeline')
const cacache = require('cacache')
const url = require('url')

const CachingMinipassPipeline = require('../pipeline.js')
const CachePolicy = require('./policy.js')
const cacheKey = require('./key.js')
const remote = require('../remote.js')
Expand Down Expand Up @@ -269,7 +269,7 @@ class CacheEntry {
cacheWriteReject = reject
})

body = new MinipassPipeline(new MinipassFlush({
body = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, new MinipassFlush({
flush () {
return cacheWritePromise
},
Expand Down
41 changes: 41 additions & 0 deletions lib/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
'use strict'

const MinipassPipeline = require('minipass-pipeline')

class CachingMinipassPipeline extends MinipassPipeline {
#events = []
#data = new Map()

constructor (opts, ...streams) {
// CRITICAL: do NOT pass the streams to the call to super(), this will start
// the flow of data and potentially cause the events we need to catch to emit
// before we've finished our own setup. instead we call super() with no args,
// finish our setup, and then push the streams into ourselves to start the
// data flow
super()
this.#events = opts.events

/* istanbul ignore next - coverage disabled because this is pointless to test here */
if (streams.length) {
this.push(...streams)
}
}

on (event, handler) {
if (this.#events.includes(event) && this.#data.has(event)) {
return handler(...this.#data.get(event))
}

return super.on(event, handler)
}

emit (event, ...data) {
if (this.#events.includes(event)) {
this.#data.set(event, data)
}

return super.emit(event, ...data)
}
}

module.exports = CachingMinipassPipeline
6 changes: 4 additions & 2 deletions lib/remote.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
const Minipass = require('minipass')
const MinipassPipeline = require('minipass-pipeline')
const fetch = require('minipass-fetch')
const promiseRetry = require('promise-retry')
const ssri = require('ssri')

const CachingMinipassPipeline = require('./pipeline.js')
const getAgent = require('./agent.js')
const pkg = require('../package.json')

Expand Down Expand Up @@ -53,7 +53,9 @@ const remoteFetch = (request, options) => {
// we got a 200 response and the user has specified an expected
// integrity value, so wrap the response in an ssri stream to verify it
const integrityStream = ssri.integrityStream({ integrity: _opts.integrity })
const pipeline = new MinipassPipeline(res.body, integrityStream)
const pipeline = new CachingMinipassPipeline({
events: ['integrity', 'size'],
}, res.body, integrityStream)
// we also propagate the integrity and size events out to the pipeline so we can use
// this new response body as an integrityEmitter for cacache
integrityStream.on('integrity', i => pipeline.emit('integrity', i))
Expand Down
40 changes: 40 additions & 0 deletions test/pipeline.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'

const events = require('events')
const ssri = require('ssri')
const t = require('tap')

const CachingMinipassPipeline = require('../lib/pipeline.js')

t.test('caches events and emits them again for new listeners', async (t) => {
const INTEGRITY = ssri.fromData('foobarbazbuzz')
const integrityStream = ssri.integrityStream()
const pipeline = new CachingMinipassPipeline({ events: ['integrity', 'size'] }, integrityStream)
integrityStream.on('size', s => pipeline.emit('size', s))
integrityStream.on('integrity', i => pipeline.emit('integrity', i))

pipeline.write('foobarbazbuzz')
pipeline.resume()
// delay ending the stream so the early listeners will get the first events
setImmediate(() => pipeline.end())

const [earlySize, earlyIntegrity] = await Promise.all([
events.once(pipeline, 'size').then(res => res[0]),
events.once(pipeline, 'integrity').then(res => res[0]),
])

// now wait for the stream itself to have ended
await pipeline.promise()

// and add new listeners
const [lateSize, lateIntegrity] = await Promise.all([
events.once(pipeline, 'size').then(res => res[0]),
events.once(pipeline, 'integrity').then(res => res[0]),
])

// and make sure we got the same results
t.equal(earlySize, 13, 'got the right size')
t.same(earlyIntegrity, INTEGRITY, 'got the right integrity')
t.same(earlySize, lateSize, 'got the same size early and late')
t.same(earlyIntegrity, lateIntegrity, 'got the same integrity early and late')
})