From 920aa9ec2e44ce0bafbfa1f61864079313837020 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Tue, 16 May 2023 19:29:22 +0100 Subject: [PATCH] feat: add abstract stream class (#402) --- packages/interface-stream-muxer/package.json | 8 + packages/interface-stream-muxer/src/stream.ts | 321 ++++++++++++++++++ 2 files changed, 329 insertions(+) create mode 100644 packages/interface-stream-muxer/src/stream.ts diff --git a/packages/interface-stream-muxer/package.json b/packages/interface-stream-muxer/package.json index 9bb7d6b88..d805332b9 100644 --- a/packages/interface-stream-muxer/package.json +++ b/packages/interface-stream-muxer/package.json @@ -31,6 +31,10 @@ ".": { "types": "./dist/src/index.d.ts", "import": "./dist/src/index.js" + }, + "./stream": { + "types": "./dist/src/stream.d.ts", + "import": "./dist/src/stream.js" } }, "eslintConfig": { @@ -134,6 +138,10 @@ "dependencies": { "@libp2p/interface-connection": "^5.0.0", "@libp2p/interfaces": "^3.0.0", + "@libp2p/logger": "^2.0.7", + "abortable-iterator": "^5.0.1", + "any-signal": "^4.1.1", + "it-pushable": "^3.1.3", "it-stream-types": "^2.0.1", "uint8arraylist": "^2.4.3" }, diff --git a/packages/interface-stream-muxer/src/stream.ts b/packages/interface-stream-muxer/src/stream.ts new file mode 100644 index 000000000..a1d60ebfd --- /dev/null +++ b/packages/interface-stream-muxer/src/stream.ts @@ -0,0 +1,321 @@ +import { CodeError } from '@libp2p/interfaces/errors' +import { logger } from '@libp2p/logger' +import { abortableSource } from 'abortable-iterator' +import { anySignal } from 'any-signal' +import { type Pushable, pushable } from 'it-pushable' +import { Uint8ArrayList } from 'uint8arraylist' +import type { Direction, Stream, StreamStat } from '@libp2p/interface-connection' +import type { Source } from 'it-stream-types' + +const log = logger('libp2p:stream') + +const ERR_STREAM_RESET = 'ERR_STREAM_RESET' +const ERR_STREAM_ABORT = 'ERR_STREAM_ABORT' +const ERR_SINK_ENDED = 'ERR_SINK_ENDED' +const ERR_DOUBLE_SINK = 'ERR_DOUBLE_SINK' + +export interface AbstractStreamInit { + /** + * A unique identifier for this stream + */ + id: string + + /** + * The stream direction + */ + direction: Direction + + /** + * The maximum allowable data size, any data larger than this will be + * chunked and sent in multiple data messages + */ + maxDataSize: number + + /** + * User specific stream metadata + */ + metadata?: Record + + /** + * Invoked when the stream ends + */ + onEnd?: (err?: Error | undefined) => void +} + +export abstract class AbstractStream implements Stream { + public id: string + public stat: StreamStat + public metadata: Record + public source: AsyncGenerator + + private readonly abortController: AbortController + private readonly resetController: AbortController + private readonly closeController: AbortController + private sourceEnded: boolean + private sinkEnded: boolean + private sinkSunk: boolean + private endErr: Error | undefined + private readonly streamSource: Pushable + private readonly onEnd?: (err?: Error | undefined) => void + private readonly maxDataSize: number + + constructor (init: AbstractStreamInit) { + this.abortController = new AbortController() + this.resetController = new AbortController() + this.closeController = new AbortController() + this.sourceEnded = false + this.sinkEnded = false + this.sinkSunk = false + + this.id = init.id + this.metadata = init.metadata ?? {} + this.stat = { + direction: init.direction, + timeline: { + open: Date.now() + } + } + this.maxDataSize = init.maxDataSize + this.onEnd = init.onEnd + + this.source = this.streamSource = pushable({ + onEnd: () => { + // already sent a reset message + if (this.stat.timeline.reset !== null) { + this.sendCloseRead() + } + + this.onSourceEnd() + } + }) + + // necessary because the libp2p upgrader wraps the sink function + this.sink = this.sink.bind(this) + } + + protected onSourceEnd (err?: Error): void { + if (this.sourceEnded) { + return + } + + this.stat.timeline.closeRead = Date.now() + this.sourceEnded = true + log.trace('%s stream %s source end - err: %o', this.stat.direction, this.id, err) + + if (err != null && this.endErr == null) { + this.endErr = err + } + + if (this.sinkEnded) { + this.stat.timeline.close = Date.now() + + if (this.onEnd != null) { + this.onEnd(this.endErr) + } + } + } + + protected onSinkEnd (err?: Error): void { + if (this.sinkEnded) { + return + } + + this.stat.timeline.closeWrite = Date.now() + this.sinkEnded = true + log.trace('%s stream %s sink end - err: %o', this.stat.direction, this.id, err) + + if (err != null && this.endErr == null) { + this.endErr = err + } + + if (this.sourceEnded) { + this.stat.timeline.close = Date.now() + + if (this.onEnd != null) { + this.onEnd(this.endErr) + } + } + } + + // Close for both Reading and Writing + close (): void { + log.trace('%s stream %s close', this.stat.direction, this.id) + + this.closeRead() + this.closeWrite() + } + + // Close for reading + closeRead (): void { + log.trace('%s stream %s closeRead', this.stat.direction, this.id) + + if (this.sourceEnded) { + return + } + + this.streamSource.end() + } + + // Close for writing + closeWrite (): void { + log.trace('%s stream %s closeWrite', this.stat.direction, this.id) + + if (this.sinkEnded) { + return + } + + this.closeController.abort() + + try { + // need to call this here as the sink method returns in the catch block + // when the close controller is aborted + this.sendCloseWrite() + } catch (err) { + log.trace('%s stream %s error sending close', this.stat.direction, this.id, err) + } + + this.onSinkEnd() + } + + // Close for reading and writing (local error) + abort (err: Error): void { + log.trace('%s stream %s abort', this.stat.direction, this.id, err) + // End the source with the passed error + this.streamSource.end(err) + this.abortController.abort() + this.onSinkEnd(err) + } + + // Close immediately for reading and writing (remote error) + reset (): void { + const err = new CodeError('stream reset', ERR_STREAM_RESET) + this.resetController.abort() + this.streamSource.end(err) + this.onSinkEnd(err) + } + + async sink (source: Source): Promise { + if (this.sinkSunk) { + throw new CodeError('sink already called on stream', ERR_DOUBLE_SINK) + } + + this.sinkSunk = true + + if (this.sinkEnded) { + throw new CodeError('stream closed for writing', ERR_SINK_ENDED) + } + + const signal = anySignal([ + this.abortController.signal, + this.resetController.signal, + this.closeController.signal + ]) + + try { + source = abortableSource(source, signal) + + if (this.stat.direction === 'outbound') { // If initiator, open a new stream + this.sendNewStream() + } + + for await (let data of source) { + while (data.length > 0) { + if (data.length <= this.maxDataSize) { + this.sendData(data instanceof Uint8Array ? new Uint8ArrayList(data) : data) + break + } + data = data instanceof Uint8Array ? new Uint8ArrayList(data) : data + this.sendData(data.sublist(0, this.maxDataSize)) + data.consume(this.maxDataSize) + } + } + } catch (err: any) { + if (err.type === 'aborted' && err.message === 'The operation was aborted') { + if (this.closeController.signal.aborted) { + return + } + + if (this.resetController.signal.aborted) { + err.message = 'stream reset' + err.code = ERR_STREAM_RESET + } + + if (this.abortController.signal.aborted) { + err.message = 'stream aborted' + err.code = ERR_STREAM_ABORT + } + } + + // Send no more data if this stream was remotely reset + if (err.code === ERR_STREAM_RESET) { + log.trace('%s stream %s reset', this.stat.direction, this.id) + } else { + log.trace('%s stream %s error', this.stat.direction, this.id, err) + try { + this.sendReset() + this.stat.timeline.reset = Date.now() + } catch (err) { + log.trace('%s stream %s error sending reset', this.stat.direction, this.id, err) + } + } + + this.streamSource.end(err) + this.onSinkEnd(err) + return + } finally { + signal.clear() + } + + try { + this.sendCloseWrite() + } catch (err) { + log.trace('%s stream %s error sending close', this.stat.direction, this.id, err) + } + + this.onSinkEnd() + } + + /** + * When an extending class reads data from it's implementation-specific source, + * call this method to allow the stream consumer to read the data. + */ + sourcePush (data: Uint8ArrayList): void { + this.streamSource.push(data) + } + + /** + * Returns the amount of unread data - can be used to prevent large amounts of + * data building up when the stream consumer is too slow. + */ + sourceReadableLength (): number { + return this.streamSource.readableLength + } + + /** + * Send a message to the remote muxer informing them a new stream is being + * opened + */ + abstract sendNewStream (): void + + /** + * Send a data message to the remote muxer + */ + abstract sendData (buf: Uint8ArrayList): void + + /** + * Send a reset message to the remote muxer + */ + abstract sendReset (): void + + /** + * Send a message to the remote muxer, informing them no more data messages + * will be sent by this end of the stream + */ + abstract sendCloseWrite (): void + + /** + * Send a message to the remote muxer, informing them no more data messages + * will be read by this end of the stream + */ + abstract sendCloseRead (): void +}