diff --git a/.changeset/calm-adults-roll.md b/.changeset/calm-adults-roll.md new file mode 100644 index 00000000..98c56908 --- /dev/null +++ b/.changeset/calm-adults-roll.md @@ -0,0 +1,6 @@ +--- +"@tus/server": patch +"@tus/utils": patch +--- + +Consistent cancellation across streams and locks, fixing lock on file never being unlocked when the request ends prematurely. diff --git a/packages/server/src/handlers/BaseHandler.ts b/packages/server/src/handlers/BaseHandler.ts index 1cb3a9d4..c7df950f 100644 --- a/packages/server/src/handlers/BaseHandler.ts +++ b/packages/server/src/handlers/BaseHandler.ts @@ -1,6 +1,6 @@ import EventEmitter from 'node:events' import stream from 'node:stream/promises' -import {addAbortSignal, PassThrough} from 'node:stream' +import {PassThrough, Readable} from 'node:stream' import type http from 'node:http' import type {ServerOptions} from '../types' @@ -121,7 +121,7 @@ export class BaseHandler extends EventEmitter { const lock = locker.newLock(id) - await lock.lock(() => { + await lock.lock(context.signal, () => { context.cancel() }) @@ -129,7 +129,7 @@ export class BaseHandler extends EventEmitter { } protected writeToStore( - req: http.IncomingMessage, + data: Readable, upload: Upload, maxFileSize: number, context: CancellationContext @@ -145,16 +145,25 @@ export class BaseHandler extends EventEmitter { // Create a PassThrough stream as a proxy to manage the request stream. // This allows for aborting the write process without affecting the incoming request stream. const proxy = new PassThrough() - addAbortSignal(context.signal, proxy) + + // gracefully terminate the proxy stream when the request is aborted + const onAbort = () => { + data.unpipe(proxy) + + if (!proxy.closed) { + proxy.end() + } + } + context.signal.addEventListener('abort', onAbort, {once: true}) proxy.on('error', (err) => { - req.unpipe(proxy) + data.unpipe(proxy) reject(err.name === 'AbortError' ? ERRORS.ABORTED : err) }) const postReceive = throttle( (offset: number) => { - this.emit(EVENTS.POST_RECEIVE_V2, req, {...upload, offset}) + this.emit(EVENTS.POST_RECEIVE_V2, data, {...upload, offset}) }, this.options.postReceiveInterval, {leading: false} @@ -166,23 +175,18 @@ export class BaseHandler extends EventEmitter { postReceive(tempOffset) }) - req.on('error', () => { - if (!proxy.closed) { - // we end the stream gracefully here so that we can upload the remaining bytes to the store - // as an incompletePart - proxy.end() - } - }) - // Pipe the request stream through the proxy. We use the proxy instead of the request stream directly // to ensure that errors in the pipeline do not cause the request stream to be destroyed, // which would result in a socket hangup error for the client. stream - .pipeline(req.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => { + .pipeline(data.pipe(proxy), new StreamLimiter(maxFileSize), async (stream) => { return this.store.write(stream as StreamLimiter, upload.id, upload.offset) }) .then(resolve) .catch(reject) + .finally(() => { + context.signal.removeEventListener('abort', onAbort) + }) }) } diff --git a/packages/server/src/lockers/MemoryLocker.ts b/packages/server/src/lockers/MemoryLocker.ts index c1cf7d7d..393e0a89 100644 --- a/packages/server/src/lockers/MemoryLocker.ts +++ b/packages/server/src/lockers/MemoryLocker.ts @@ -49,11 +49,14 @@ class MemoryLock implements Lock { private timeout: number = 1000 * 30 ) {} - async lock(requestRelease: RequestRelease): Promise { + async lock(stopSignal: AbortSignal, requestRelease: RequestRelease): Promise { const abortController = new AbortController() + + const abortSignal = AbortSignal.any([stopSignal, abortController.signal]) + const lock = await Promise.race([ - this.waitTimeout(abortController.signal), - this.acquireLock(this.id, requestRelease, abortController.signal), + this.waitTimeout(abortSignal), + this.acquireLock(this.id, requestRelease, abortSignal), ]) abortController.abort() @@ -68,12 +71,12 @@ class MemoryLock implements Lock { requestRelease: RequestRelease, signal: AbortSignal ): Promise { + const lock = this.locker.locks.get(id) + if (signal.aborted) { - return false + return typeof lock !== 'undefined' } - const lock = this.locker.locks.get(id) - if (!lock) { const lock = { requestRelease, diff --git a/packages/server/src/server.ts b/packages/server/src/server.ts index dd06d376..cc56be5b 100644 --- a/packages/server/src/server.ts +++ b/packages/server/src/server.ts @@ -151,6 +151,11 @@ export class Server extends EventEmitter { ): Promise { const context = this.createContext(req) + // Once the request is closed we abort the context to clean up underline resources + req.on('close', () => { + context.abort() + }) + log(`[TusServer] handle: ${req.method} ${req.url}`) // Allow overriding the HTTP method. The reason for this is // that some libraries/environments to not support PATCH and @@ -289,6 +294,17 @@ export class Server extends EventEmitter { res.writeHead(status, headers) res.write(body) + + // Abort the context once the response is sent. + // Useful for clean-up when the server uses keep-alive + if (!isAborted) { + res.on('finish', () => { + if (!req.closed) { + context.abort() + } + }) + } + return res.end() } diff --git a/packages/server/test/Locker.test.ts b/packages/server/test/Locker.test.ts index 0f377025..bef07f0a 100644 --- a/packages/server/test/Locker.test.ts +++ b/packages/server/test/Locker.test.ts @@ -6,6 +6,7 @@ describe('MemoryLocker', () => { it('will acquire a lock by notifying another to release it', async () => { const locker = new MemoryLocker() const lockId = 'upload-id-1' + const abortController = new AbortController() const cancel = sinon.spy() const cancel2 = sinon.spy() @@ -13,12 +14,12 @@ describe('MemoryLocker', () => { const lock1 = locker.newLock(lockId) const lock2 = locker.newLock(lockId) - await lock1.lock(async () => { + await lock1.lock(abortController.signal, async () => { await lock1.unlock() cancel() }) - await lock2.lock(async () => { + await lock2.lock(abortController.signal, async () => { cancel2() }) @@ -32,19 +33,21 @@ describe('MemoryLocker', () => { const locker = new MemoryLocker({ acquireLockTimeout: 500, }) + const abortController = new AbortController() + const lockId = 'upload-id-1' const lock = locker.newLock(lockId) const cancel = sinon.spy() - await lock.lock(async () => { + await lock.lock(abortController.signal, async () => { cancel() // We note that the function has been called, but do not // release the lock }) try { - await lock.lock(async () => { + await lock.lock(abortController.signal, async () => { throw new Error('panic should not be called') }) } catch (e) { @@ -57,18 +60,20 @@ describe('MemoryLocker', () => { it('request lock and unlock', async () => { const locker = new MemoryLocker() const lockId = 'upload-id-1' + const abortController = new AbortController() + const lock = locker.newLock(lockId) const lock2 = locker.newLock(lockId) const cancel = sinon.spy() - await lock.lock(() => { + await lock.lock(abortController.signal, () => { cancel() setTimeout(async () => { await lock.unlock() }, 50) }) - await lock2.lock(() => { + await lock2.lock(abortController.signal, () => { throw new Error('should not be called') }) @@ -79,4 +84,38 @@ describe('MemoryLocker', () => { `request released called more times than expected - ${cancel.callCount}` ) }) + + it('will stop trying to acquire the lock if the abort signal is aborted', async () => { + const locker = new MemoryLocker() + const lockId = 'upload-id-1' + const abortController = new AbortController() + + const cancel = sinon.spy() + const cancel2 = sinon.spy() + + const lock1 = locker.newLock(lockId) + const lock2 = locker.newLock(lockId) + + await lock1.lock(abortController.signal, async () => { + // do not unlock when requested + cancel() + }) + + // Abort signal is aborted after lock2 tries to acquire the lock + setTimeout(() => { + abortController.abort() + }, 100) + + try { + await lock2.lock(abortController.signal, async () => { + cancel2() + }) + assert(false, 'lock2 should not have been acquired') + } catch (e) { + assert(e === ERRORS.ERR_LOCK_TIMEOUT, `error returned is not correct ${e}`) + } + + assert(cancel.callCount > 1, `calls count dont match ${cancel.callCount} !== 1`) + assert(cancel2.callCount === 0, `calls count dont match ${cancel.callCount} !== 1`) + }) }) diff --git a/packages/server/test/PatchHandler.test.ts b/packages/server/test/PatchHandler.test.ts index 1ac3acbb..a17fe804 100644 --- a/packages/server/test/PatchHandler.test.ts +++ b/packages/server/test/PatchHandler.test.ts @@ -12,7 +12,7 @@ import {EventEmitter} from 'node:events' import {addPipableStreamBody} from './utils' import {MemoryLocker} from '../src' import streamP from 'node:stream/promises' -import stream from 'node:stream' +import stream, {PassThrough} from 'node:stream' describe('PatchHandler', () => { const path = '/test/output' @@ -245,4 +245,69 @@ describe('PatchHandler', () => { assert.equal(context.signal.aborted, true) } }) + + it('should gracefully terminate request stream when context is cancelled', async () => { + handler = new PatchHandler(store, {path, locker: new MemoryLocker()}) + + const bodyStream = new PassThrough() // 20kb buffer + const req = addPipableStreamBody( + httpMocks.createRequest({ + method: 'PATCH', + url: `${path}/1234`, + body: bodyStream, + }) + ) + + const abortController = new AbortController() + context = { + cancel: () => abortController.abort(), + abort: () => abortController.abort(), + signal: abortController.signal, + } + + const res = httpMocks.createResponse({req}) + req.headers = { + 'upload-offset': '0', + 'content-type': 'application/offset+octet-stream', + } + req.url = `${path}/file` + + let accumulatedBuffer: Buffer = Buffer.alloc(0) + + store.getUpload.resolves(new Upload({id: '1234', offset: 0})) + store.write.callsFake(async (readable: http.IncomingMessage | stream.Readable) => { + const writeStream = new stream.PassThrough() + const chunks: Buffer[] = [] + + writeStream.on('data', (chunk) => { + chunks.push(chunk) // Accumulate chunks in the outer buffer + }) + + await streamP.pipeline(readable, writeStream) + + accumulatedBuffer = Buffer.concat([accumulatedBuffer, ...chunks]) + + return writeStream.readableLength + }) + store.declareUploadLength.resolves() + + await new Promise((resolve, reject) => { + handler.send(req, res, context).then(resolve).catch(reject) + + // sends the first 20kb + bodyStream.write(Buffer.alloc(1024 * 20)) + + // write 15kb + bodyStream.write(Buffer.alloc(1024 * 15)) + + // simulate that the request was cancelled + setTimeout(() => { + context.abort() + }, 200) + }) + + // We expect that all the data was written to the store, 35kb + assert.equal(accumulatedBuffer.byteLength, 35 * 1024) + bodyStream.end() + }) }) diff --git a/packages/server/test/utils.ts b/packages/server/test/utils.ts index deb06c84..aae8d03b 100644 --- a/packages/server/test/utils.ts +++ b/packages/server/test/utils.ts @@ -1,5 +1,5 @@ import type httpMocks from 'node-mocks-http' -import stream from 'node:stream' +import stream, {Readable, Transform, TransformCallback} from 'node:stream' import type http from 'node:http' export function addPipableStreamBody< @@ -8,15 +8,33 @@ export function addPipableStreamBody< // Create a Readable stream that simulates the request body const bodyStream = new stream.Duplex({ read() { - this.push( - mockRequest.body instanceof Buffer - ? mockRequest.body - : JSON.stringify(mockRequest.body) - ) - this.push(null) + // This function is intentionally left empty since the data flow + // is controlled by event listeners registered outside of this method. }, }) + // Handle cases where the body is a Readable stream + if (mockRequest.body instanceof Readable) { + // Pipe the mockRequest.body to the bodyStream + mockRequest.body.on('data', (chunk) => { + bodyStream.push(chunk) // Push the chunk to the bodyStream + }) + + mockRequest.body.on('end', () => { + bodyStream.push(null) // Signal the end of the stream + }) + } else { + // Handle cases where the body is not a stream (e.g., Buffer or plain object) + const bodyBuffer = + mockRequest.body instanceof Buffer + ? mockRequest.body + : Buffer.from(JSON.stringify(mockRequest.body)) + + // Push the bodyBuffer and signal the end of the stream + bodyStream.push(bodyBuffer) + bodyStream.push(null) + } + // Add the pipe method to the mockRequest // @ts-ignore mockRequest.pipe = (dest: stream.Writable) => bodyStream.pipe(dest) @@ -24,5 +42,6 @@ export function addPipableStreamBody< // Add the unpipe method to the mockRequest // @ts-ignore mockRequest.unpipe = (dest: stream.Writable) => bodyStream.unpipe(dest) + return mockRequest } diff --git a/packages/utils/src/models/Locker.ts b/packages/utils/src/models/Locker.ts index e4365f4b..c05ed54f 100644 --- a/packages/utils/src/models/Locker.ts +++ b/packages/utils/src/models/Locker.ts @@ -26,6 +26,6 @@ export interface Locker { * */ export interface Lock { - lock(cancelReq: RequestRelease): Promise + lock(signal: AbortSignal, cancelReq: RequestRelease): Promise unlock(): Promise }