From ed614510edb7cf300234b3227f89d380d3376c79 Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sat, 2 Jul 2022 12:47:57 +0200 Subject: [PATCH 01/10] added tests --- test/fixtures/transport-transform.js | 2 +- test/transport/pipeline.test.js | 110 +++++++++++++++++++++++++++ 2 files changed, 111 insertions(+), 1 deletion(-) diff --git a/test/fixtures/transport-transform.js b/test/fixtures/transport-transform.js index 2ba524bfc..294f95604 100644 --- a/test/fixtures/transport-transform.js +++ b/test/fixtures/transport-transform.js @@ -8,7 +8,7 @@ module.exports = (options) => { autoDestroy: true, objectMode: true, transform (chunk, enc, cb) { - chunk.service = 'pino' + chunk.service = (options && options.payload) || 'pino' this.push(JSON.stringify(chunk)) cb() } diff --git a/test/transport/pipeline.test.js b/test/transport/pipeline.test.js index 25b244e06..d7d001414 100644 --- a/test/transport/pipeline.test.js +++ b/test/transport/pipeline.test.js @@ -34,3 +34,113 @@ test('pino.transport with a pipeline', async ({ same, teardown }) => { service: 'pino' // this property was added by the transform }) }) + +test('pino.transport with targets and a shared pipeline', async ({ same, teardown }) => { + const destinationA = join( + os.tmpdir(), + '_' + Math.random().toString(36).substr(2, 9) + ) + const destinationB = join( + os.tmpdir(), + '_' + Math.random().toString(36).substr(2, 9) + ) + const transport = pino.transport({ + targets: [ + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationA } + }, + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationB } + } + ], + pipeline: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), + options: { payload: 'foobar' } + } + ] + }) + + teardown(transport.end.bind(transport)) + const instance = pino(transport) + instance.info('hello') + await watchFileCreated(destinationA) + await watchFileCreated(destinationB) + const resultA = JSON.parse(await readFile(destinationA)) + const resultB = JSON.parse(await readFile(destinationB)) + delete resultA.time + delete resultB.time + same(resultA, { + pid, + hostname, + level: 30, + msg: 'hello', + service: 'foobar' // this property was added by the transform + }) + same(resultB, { + pid, + hostname, + level: 30, + msg: 'hello', + service: 'foobar' // this property was added by the transform + }) +}) + +test('pino.transport with targets and a custom pipeline', async ({ same, teardown }) => { + const destinationA = join( + os.tmpdir(), + '_' + Math.random().toString(36).substr(2, 9) + ) + const destinationB = join( + os.tmpdir(), + '_' + Math.random().toString(36).substr(2, 9) + ) + const transport = pino.transport({ + targets: [ + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationA }, + pipeline: [{ + target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), + options: { payload: 'customPipeline' } + }] + }, + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationB } + } + ], + pipeline: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), + options: { payload: 'globalPipeline' } + } + ] + }) + + teardown(transport.end.bind(transport)) + const instance = pino(transport) + instance.info('hello') + await watchFileCreated(destinationA) + await watchFileCreated(destinationB) + const resultA = JSON.parse(await readFile(destinationA)) + const resultB = JSON.parse(await readFile(destinationB)) + delete resultA.time + delete resultB.time + same(resultA, { + pid, + hostname, + level: 30, + msg: 'hello', + service: 'customPipeline' // this property was added by the transform + }) + same(resultB, { + pid, + hostname, + level: 30, + msg: 'hello', + service: 'globalPipeline' // this property was added by the transform + }) +}) From 36a71dceb69ed5a6ac6f2f81dba6e5697337f568 Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Fri, 8 Jul 2022 13:56:37 +0200 Subject: [PATCH 02/10] Added test --- test/transport/pipeline.test.js | 56 +++++++++++++++++++++++++++++++-- 1 file changed, 54 insertions(+), 2 deletions(-) diff --git a/test/transport/pipeline.test.js b/test/transport/pipeline.test.js index d7d001414..dceb6ff4a 100644 --- a/test/transport/pipeline.test.js +++ b/test/transport/pipeline.test.js @@ -35,7 +35,7 @@ test('pino.transport with a pipeline', async ({ same, teardown }) => { }) }) -test('pino.transport with targets and a shared pipeline', async ({ same, teardown }) => { +test('pino.transport with targets using a shared pipeline', async ({ same, teardown }) => { const destinationA = join( os.tmpdir(), '_' + Math.random().toString(36).substr(2, 9) @@ -88,7 +88,59 @@ test('pino.transport with targets and a shared pipeline', async ({ same, teardow }) }) -test('pino.transport with targets and a custom pipeline', async ({ same, teardown }) => { +test('pino.transport with shared pipeline and target with excludeFromPipeline flag', async ({ same, teardown }) => { + const destinationA = join( + os.tmpdir(), + '_' + Math.random().toString(36).substr(2, 9) + ) + const destinationB = join( + os.tmpdir(), + '_' + Math.random().toString(36).substr(2, 9) + ) + const transport = pino.transport({ + targets: [ + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationA } + }, + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationB, excludeFromPipeline: true } + } + ], + pipeline: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), + options: { payload: 'foobar' } + } + ] + }) + + teardown(transport.end.bind(transport)) + const instance = pino(transport) + instance.info('hello') + await watchFileCreated(destinationA) + await watchFileCreated(destinationB) + const resultA = JSON.parse(await readFile(destinationA)) + const resultB = JSON.parse(await readFile(destinationB)) + delete resultA.time + delete resultB.time + same(resultA, { + pid, + hostname, + level: 30, + msg: 'hello', + service: 'foobar' // this property was added by the transform + }) + same(resultB, { + pid, + hostname, + level: 30, + msg: 'hello' + }) +}) + +test('pino.transport with target using a custom pipeline', async ({ same, teardown }) => { const destinationA = join( os.tmpdir(), '_' + Math.random().toString(36).substr(2, 9) From cb63778e069e32296d8bebd129f9d65320379a5e Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sat, 27 Apr 2024 11:58:11 +0200 Subject: [PATCH 03/10] Implemented support for named 'pipelines' property in pino.transport --- lib/transport.js | 50 +++++++- lib/worker-pipeline.js | 154 +++++++++++++++++++++---- test/fixtures/transport-transform.js | 2 +- test/transport/pipeline.test.js | 163 +++++++++++++++------------ 4 files changed, 266 insertions(+), 103 deletions(-) diff --git a/lib/transport.js b/lib/transport.js index 6be075fdf..00a867eb4 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -71,7 +71,7 @@ function flush (stream) { } function transport (fullOptions) { - const { pipeline, targets, levels, dedupe, options = {}, worker = {}, caller = getCallers() } = fullOptions + const { pipeline, pipelines, targets, levels, dedupe, options = {}, worker = {}, caller = getCallers() } = fullOptions // Backwards compatibility const callers = typeof caller === 'string' ? [caller] : caller @@ -85,14 +85,54 @@ function transport (fullOptions) { throw new Error('only one of target or targets can be specified') } + // Avoid having to deal with combinatorial explosion of cases to handle + if (pipeline && pipelines) { + throw new Error('only one of pipeline or pipelines can be specified') + } + if (targets) { target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js') - options.targets = targets.map((dest) => { - return { - ...dest, - target: fixTarget(dest.target) + options.targets = targets + .map((dest) => { + return { + ...dest, + target: fixTarget(dest.target) + } + }) + } + + /** + * In case 'pipelines' property is defined + */ + if (pipelines) { + target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js') + + // Prepare the different pipelines + options.pipelines = [] + + const pipelineTargets = {} + for (const pKey in pipelines) { + pipelineTargets[pKey] = pipelines[pKey].map(p => { + return { target: fixTarget(p.target) } + }) + } + + // We wrap each target to be a logical pipeline and + // match with an actual pipeline, if target.pipeline is defined + options.targets.forEach((t) => { + if (t.pipeline) { + const pipeline = pipelineTargets[t.pipeline] + // Build an array of targets of the form: + // [{target: transform1}, .. {target: transformN}, {target: destinationStream}] + options.pipelines.push(pipeline.concat(t)) + } else { + // [{target: destinationStream}] + options.pipelines.push([t]) } }) + + // Signal for the worker + options.targets = undefined } else if (pipeline) { target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js') options.targets = pipeline.map((dest) => { diff --git a/lib/worker-pipeline.js b/lib/worker-pipeline.js index 76cb3b888..0c2e5f18e 100644 --- a/lib/worker-pipeline.js +++ b/lib/worker-pipeline.js @@ -9,30 +9,140 @@ const { pipeline, PassThrough } = require('stream') /* istanbul ignore file */ -module.exports = async function ({ targets }) { - const streams = await Promise.all(targets.map(async (t) => { - const fn = await loadTransportStreamBuilder(t.target) - const stream = await fn(t.options) - return stream - })) - const ee = new EE() - - const stream = new PassThrough({ - autoDestroy: true, - destroy (_, cb) { - ee.on('error', cb) - ee.on('closed', cb) - } - }) +module.exports = async function ({ targets, pipelines }) { + if (targets === undefined && pipelines) { + // Create an instance of ThreadStream using target (worker-pipeline.js) as internal thread worker + /** + * In case a pipeline is defined, a single ThreadStream is created as usual + * representing the PassThrough(stream1 + .. + streamN) + * + * If we want to implement a `tee`, the quickest way is to create as many + * pipelines as paths that can be created from a transport configuration + * and return a PassThrough stream wrapping all the pass through streams + * as entry point for each pipeline. + * // TODO: to rephrase + * + * // TODO: propose to deprecate "pipeline" property only + * + * Given for example the following transport configuration: + * + * const transport = pino.transport({ + * targets: [ + * { + * // targetA + * target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + * options: { destination: destinationA } + * // An undefined "pipeline" property means do not use a pipeline as source + * }, + * { + * // targetB + * target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + * options: { destination: destinationB }, + * pipeline: "pipeA" // use pipeA as source + * } + * ], + * pipelines: { + * "pipeA": [ + * { + * target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), + * } + * ] + * } + * }) + * + * We would have 3 pipelines: + * + * 2 actual ones: + * p1 = pipeline(passThrough1, pipeA, targetA) + * p2 = pipeline(passThrough2, pipeA, targetB) + * + * PassThroughEntry // a stream that serves purely as entry point for ThreadStream + * + * 1 pipeline + * p3 = pipeline(PassThroughEntry, passThrough1, passThrough2) + * + * return PassThroughEntry + */ + + const entryStreams = [] + for (let i = 0; i < pipelines.length; i++) { + const pipelineTargets = pipelines[i] + const streams = await Promise.all(pipelineTargets.map(async (t) => { + const fn = await loadTransportStreamBuilder(t.target) + const stream = await fn(t.options) + return stream + })) + + const ee = new EE() + + const stream = new PassThrough({ + autoDestroy: true, + destroy (_, cb) { + ee.on('error', cb) + ee.on('closed', cb) + } + }) - pipeline(stream, ...streams, function (err) { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - ee.emit('error', err) - return + pipeline(stream, ...streams, function (err) { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + ee.emit('error', err) + return + } + + ee.emit('closed') + }) + + entryStreams.push(stream) } - ee.emit('closed') - }) + const ee = new EE() + const sourceStream = new PassThrough({ + autoDestroy: true, + destroy (_, cb) { + ee.on('error', cb) + ee.on('closed', cb) + } + }) + + pipeline(sourceStream, ...entryStreams, function (err) { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + ee.emit('error', err) + return + } - return stream + ee.emit('closed') + }) + + return sourceStream + } else { + // Maintaining bakcwards compatibility + const streams = await Promise.all(targets.map(async (t) => { + const fn = await loadTransportStreamBuilder(t.target) + const stream = await fn(t.options) + return stream + })) + + if (targets) { + const ee = new EE() + + const stream = new PassThrough({ + autoDestroy: true, + destroy (_, cb) { + ee.on('error', cb) + ee.on('closed', cb) + } + }) + + pipeline(stream, ...streams, function (err) { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + ee.emit('error', err) + return + } + + ee.emit('closed') + }) + + return stream + } + } } diff --git a/test/fixtures/transport-transform.js b/test/fixtures/transport-transform.js index 294f95604..2ba524bfc 100644 --- a/test/fixtures/transport-transform.js +++ b/test/fixtures/transport-transform.js @@ -8,7 +8,7 @@ module.exports = (options) => { autoDestroy: true, objectMode: true, transform (chunk, enc, cb) { - chunk.service = (options && options.payload) || 'pino' + chunk.service = 'pino' this.push(JSON.stringify(chunk)) cb() } diff --git a/test/transport/pipeline.test.js b/test/transport/pipeline.test.js index dceb6ff4a..a990d237a 100644 --- a/test/transport/pipeline.test.js +++ b/test/transport/pipeline.test.js @@ -10,6 +10,14 @@ const pino = require('../../') const { pid } = process const hostname = os.hostname() +/** + * The idea to address the requirement of mix&match target and pipeline + * https://github.com/pinojs/pino/issues/1302 + * is to implement a sort of `tee`. + * Targets defined in `pipelines` property will be used to generate + * an output value which will be then written to different end targets. + * + */ test('pino.transport with a pipeline', async ({ same, teardown }) => { const destination = file() const transport = pino.transport({ @@ -48,19 +56,22 @@ test('pino.transport with targets using a shared pipeline', async ({ same, teard targets: [ { target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationA } + options: { destination: destinationA }, + pipeline: 'pipelineA' }, { target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationB } + options: { destination: destinationB }, + pipeline: 'pipelineA' } ], - pipeline: [ - { - target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), - options: { payload: 'foobar' } - } - ] + pipelines: { + pipelineA: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js') + } + ] + } }) teardown(transport.end.bind(transport)) @@ -77,18 +88,18 @@ test('pino.transport with targets using a shared pipeline', async ({ same, teard hostname, level: 30, msg: 'hello', - service: 'foobar' // this property was added by the transform + service: 'pino' // this property was added by the transform }) same(resultB, { pid, hostname, level: 30, msg: 'hello', - service: 'foobar' // this property was added by the transform + service: 'pino' // this property was added by the transform }) }) -test('pino.transport with shared pipeline and target with excludeFromPipeline flag', async ({ same, teardown }) => { +test('pino.transport with shared pipeline and target with pipeline=undefined', async ({ same, teardown }) => { const destinationA = join( os.tmpdir(), '_' + Math.random().toString(36).substr(2, 9) @@ -101,19 +112,21 @@ test('pino.transport with shared pipeline and target with excludeFromPipeline fl targets: [ { target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationA } + options: { destination: destinationA }, + pipeline: 'pipelineA' }, { target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationB, excludeFromPipeline: true } + options: { destination: destinationB } } ], - pipeline: [ - { - target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), - options: { payload: 'foobar' } - } - ] + pipelines: { + pipelineA: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js') + } + ] + } }) teardown(transport.end.bind(transport)) @@ -130,7 +143,7 @@ test('pino.transport with shared pipeline and target with excludeFromPipeline fl hostname, level: 30, msg: 'hello', - service: 'foobar' // this property was added by the transform + service: 'pino' // this property was added by the transform }) same(resultB, { pid, @@ -140,59 +153,59 @@ test('pino.transport with shared pipeline and target with excludeFromPipeline fl }) }) -test('pino.transport with target using a custom pipeline', async ({ same, teardown }) => { - const destinationA = join( - os.tmpdir(), - '_' + Math.random().toString(36).substr(2, 9) - ) - const destinationB = join( - os.tmpdir(), - '_' + Math.random().toString(36).substr(2, 9) - ) - const transport = pino.transport({ - targets: [ - { - target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationA }, - pipeline: [{ - target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), - options: { payload: 'customPipeline' } - }] - }, - { - target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationB } - } - ], - pipeline: [ - { - target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), - options: { payload: 'globalPipeline' } - } - ] - }) +// test('pino.transport with target using a custom pipeline', async ({ same, teardown }) => { +// const destinationA = join( +// os.tmpdir(), +// '_' + Math.random().toString(36).substr(2, 9) +// ) +// const destinationB = join( +// os.tmpdir(), +// '_' + Math.random().toString(36).substr(2, 9) +// ) +// const transport = pino.transport({ +// targets: [ +// { +// target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), +// options: { destination: destinationA }, +// pipeline: [{ +// target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), +// options: { payload: 'customPipeline' } +// }] +// }, +// { +// target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), +// options: { destination: destinationB } +// } +// ], +// pipeline: [ +// { +// target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), +// options: { payload: 'globalPipeline' } +// } +// ] +// }) - teardown(transport.end.bind(transport)) - const instance = pino(transport) - instance.info('hello') - await watchFileCreated(destinationA) - await watchFileCreated(destinationB) - const resultA = JSON.parse(await readFile(destinationA)) - const resultB = JSON.parse(await readFile(destinationB)) - delete resultA.time - delete resultB.time - same(resultA, { - pid, - hostname, - level: 30, - msg: 'hello', - service: 'customPipeline' // this property was added by the transform - }) - same(resultB, { - pid, - hostname, - level: 30, - msg: 'hello', - service: 'globalPipeline' // this property was added by the transform - }) -}) +// teardown(transport.end.bind(transport)) +// const instance = pino(transport) +// instance.info('hello') +// await watchFileCreated(destinationA) +// await watchFileCreated(destinationB) +// const resultA = JSON.parse(await readFile(destinationA)) +// const resultB = JSON.parse(await readFile(destinationB)) +// delete resultA.time +// delete resultB.time +// same(resultA, { +// pid, +// hostname, +// level: 30, +// msg: 'hello', +// service: 'customPipeline' // this property was added by the transform +// }) +// same(resultB, { +// pid, +// hostname, +// level: 30, +// msg: 'hello', +// service: 'globalPipeline' // this property was added by the transform +// }) +// }) From db239f58f00c49fa70ab65e7cb0e0d42f46b2139 Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sat, 4 May 2024 06:43:48 +0200 Subject: [PATCH 04/10] Updated pino.d.ts and api.md to include pipeline within targets --- docs/api.md | 40 ++++++++++++++++++++++++++++++++-------- pino.d.ts | 2 +- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/docs/api.md b/docs/api.md index 1fc438423..80fa249d8 100644 --- a/docs/api.md +++ b/docs/api.md @@ -241,13 +241,13 @@ child.info('this will have both `foo: 1` and `bar: 2`') logger.info('this will still only have `foo: 1`') ``` -As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey), -the object returned from the `mixin` method will also be nested. Prior versions would mix -this object into the root. +As of pino 7.x, when the `mixin` is used with the [`nestedKey` option](#opt-nestedkey), +the object returned from the `mixin` method will also be nested. Prior versions would mix +this object into the root. ```js const logger = pino({ - nestedKey: 'payload', + nestedKey: 'payload', mixin() { return { requestId: requestId.currentId() } } @@ -590,7 +590,7 @@ when using the `transport` option. In this case, an `Error` will be thrown. #### `onChild` (Function) -The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument. +The `onChild` function is a synchronous callback that will be called on each creation of a new child, passing the child instance as its first argument. Any error thrown inside the callback will be uncaught and should be handled inside the callback. ```js const parent = require('pino')({ onChild: (instance) => { @@ -609,7 +609,7 @@ Default: `pino.destination(1)` (STDOUT) The `destination` parameter can be a file descriptor, a file path, or an object with `dest` property pointing to a fd or path. An ordinary Node.js `stream` file descriptor can be passed as the -destination (such as the result +destination (such as the result of `fs.createWriteStream`) but for peak log writing performance, it is strongly recommended to use `pino.destination` to create the destination stream. Note that the `destination` parameter can be the result of `pino.transport()`. @@ -1001,7 +1001,7 @@ Adds to the bindings of this logger instance. **Note:** Does not overwrite bindings. Can potentially result in duplicate keys in log lines. -* See [`bindings` parameter in `logger.child`](#logger-child-bindings) +* See [`bindings` parameter in `logger.child`](#logger-child-bindings) ### `logger.flush([cb])` @@ -1239,6 +1239,30 @@ const transport = pino.transport({ pino(transport) ``` +Multiple transports can now be defined to include pipelines: + +```js +const pino = require('pino') +const transport = pino.transport({ + targets: [{ + level: 'info', + target: 'pino-pretty' // must be installed separately + }, { + level: 'trace', + target: 'pino/file', + options: { destination: '/path/to/store/logs' } + }, { + pipeline: [{ + target: 'pino-syslog' // must be installed separately + }, { + target: 'pino-socket' // must be installed separately + }] + } + ] +}) +pino(transport) +``` + If `WeakRef`, `WeakMap`, and `FinalizationRegistry` are available in the current runtime (v14.5.0+), then the thread will be automatically terminated in case the stream or logger goes out of scope. The `transport()` function adds a listener to `process.on('beforeExit')` and `process.on('exit')` to ensure the worker @@ -1276,7 +1300,7 @@ For more on transports, how they work, and how to create them see the [`Transpor * `target`: The transport to pass logs through. This may be an installed module name or an absolute path. * `options`: An options object which is serialized (see [Structured Clone Algorithm](https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm)), passed to the worker thread, parsed and then passed to the exported transport function. * `worker`: [Worker thread](https://nodejs.org/api/worker_threads.html#worker_threads_new_worker_filename_options) configuration options. Additionally, the `worker` option supports `worker.autoEnd`. If this is set to `false` logs will not be flushed on process exit. It is then up to the developer to call `transport.end()` to flush logs. -* `targets`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport. +* `targets`: May be specified instead of `target`. Must be an array of transport configurations and/or pipelines. Transport configurations include the aforementioned `options` and `target` options plus a `level` option which will send only logs above a specified level to a transport. * `pipeline`: May be specified instead of `target`. Must be an array of transport configurations. Transport configurations include the aforementioned `options` and `target` options. All intermediate steps in the pipeline _must_ be `Transform` streams and not `Writable`. * `dedupe`: See [pino.multistream options](#pino-multistream) diff --git a/pino.d.ts b/pino.d.ts index 41ffdfb4c..841192c49 100644 --- a/pino.d.ts +++ b/pino.d.ts @@ -262,7 +262,7 @@ declare namespace pino { } interface TransportMultiOptions> extends TransportBaseOptions{ - targets: readonly TransportTargetOptions[], + targets: readonly (TransportTargetOptions|TransportPipelineOptions)[], levels?: Record dedupe?: boolean } From c3e9fec553d0c869f7a72cc43d1524e448586d6a Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sun, 5 May 2024 11:52:50 +0200 Subject: [PATCH 05/10] - Reverted changes related to the support of named pipelines - Implemented support for mixed target&pipeline definitions within `targets` in `transport.js` - Merged logic from both `worker.js` and `worker-pipeline.js` into `worker.js` - Fixed `pipeline.test.js` - Fixed docs to reflect changes above TODO: - Remove `worker-pipeline.js` - Fix `transport.js` to use only `worker.js` - Fix related docs - Fix UTs --- lib/transport.js | 54 +++--------- lib/worker.js | 61 ++++++++++++- test/transport/pipeline.test.js | 148 +++----------------------------- 3 files changed, 82 insertions(+), 181 deletions(-) diff --git a/lib/transport.js b/lib/transport.js index 00a867eb4..19ab496bf 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -71,7 +71,7 @@ function flush (stream) { } function transport (fullOptions) { - const { pipeline, pipelines, targets, levels, dedupe, options = {}, worker = {}, caller = getCallers() } = fullOptions + const { pipeline, targets, levels, dedupe, options = {}, worker = {}, caller = getCallers() } = fullOptions // Backwards compatibility const callers = typeof caller === 'string' ? [caller] : caller @@ -85,54 +85,22 @@ function transport (fullOptions) { throw new Error('only one of target or targets can be specified') } - // Avoid having to deal with combinatorial explosion of cases to handle - if (pipeline && pipelines) { - throw new Error('only one of pipeline or pipelines can be specified') - } - if (targets) { target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js') - options.targets = targets - .map((dest) => { + options.targets = targets.filter(dest => dest.target).map((dest) => { + return { + ...dest, + target: fixTarget(dest.target) + } + }) + options.pipelines = targets.filter(dest => dest.pipeline).map((dest) => { + return dest.pipeline.map((t) => { return { - ...dest, - target: fixTarget(dest.target) + ...t, + target: fixTarget(t.target) } }) - } - - /** - * In case 'pipelines' property is defined - */ - if (pipelines) { - target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js') - - // Prepare the different pipelines - options.pipelines = [] - - const pipelineTargets = {} - for (const pKey in pipelines) { - pipelineTargets[pKey] = pipelines[pKey].map(p => { - return { target: fixTarget(p.target) } - }) - } - - // We wrap each target to be a logical pipeline and - // match with an actual pipeline, if target.pipeline is defined - options.targets.forEach((t) => { - if (t.pipeline) { - const pipeline = pipelineTargets[t.pipeline] - // Build an array of targets of the form: - // [{target: transform1}, .. {target: transformN}, {target: destinationStream}] - options.pipelines.push(pipeline.concat(t)) - } else { - // [{target: destinationStream}] - options.pipelines.push([t]) - } }) - - // Signal for the worker - options.targets = undefined } else if (pipeline) { target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js') options.targets = pipeline.map((dest) => { diff --git a/lib/worker.js b/lib/worker.js index c20c19add..4f7be9431 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,5 +1,7 @@ 'use strict' +const EE = require('events') +const { pipeline, PassThrough } = require('stream') const pino = require('../pino.js') const build = require('pino-abstract-transport') const loadTransportStreamBuilder = require('./transport-stream') @@ -9,7 +11,8 @@ const loadTransportStreamBuilder = require('./transport-stream') /* istanbul ignore file */ -module.exports = async function ({ targets, levels, dedupe }) { +module.exports = async function ({ targets, pipelines, levels, dedupe }) { + // Process targets targets = await Promise.all(targets.map(async (t) => { const fn = await loadTransportStreamBuilder(t.target) const stream = await fn(t.options) @@ -18,12 +21,33 @@ module.exports = async function ({ targets, levels, dedupe }) { stream } })) + + const targetStreams = [...targets] + + // Process pipelines + if (pipelines && pipelines.length) { + pipelines = await Promise.all( + pipelines.map(async (p) => { + const pipeDests = await Promise.all( + p.map(async (t) => { + const fn = await loadTransportStreamBuilder(t.target) + const stream = await fn(t.options) + return stream + } + )) + + return { stream: createPipeline(pipeDests) } + }) + ) + targetStreams.push(...pipelines) + } + return build(process, { parse: 'lines', metadata: true, close (err, cb) { let expected = 0 - for (const transport of targets) { + for (const transport of targetStreams) { expected++ transport.stream.on('close', closeCb) transport.stream.end() @@ -37,8 +61,9 @@ module.exports = async function ({ targets, levels, dedupe }) { } }) + // TODO: Why split2 was not used for pipelines? function process (stream) { - const multi = pino.multistream(targets, { levels, dedupe }) + const multi = pino.multistream(targetStreams, { levels, dedupe }) // TODO manage backpressure stream.on('data', function (chunk) { const { lastTime, lastMsg, lastObj, lastLevel } = this @@ -51,4 +76,34 @@ module.exports = async function ({ targets, levels, dedupe }) { multi.write(chunk + '\n') }) } + +/** + * Creates a pipeline using the provided streams and return an instance of `PassThrough` stream + * as a source for the pipeline. + * + * @param {(TransformStream|WritableStream)[]} streams An array of streams. + * All intermediate streams in the array *MUST* be `Transform` streams and only the last one `Writable`. + * @returns A `PassThrough` stream instance representing the source stream of the pipeline + */ + function createPipeline (streams) { + const ee = new EE() + const stream = new PassThrough({ + autoDestroy: true, + destroy (_, cb) { + ee.on('error', cb) + ee.on('closed', cb) + } + }) + + pipeline(stream, ...streams, function (err) { + if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { + ee.emit('error', err) + return + } + + ee.emit('closed') + }) + + return stream + } } diff --git a/test/transport/pipeline.test.js b/test/transport/pipeline.test.js index a990d237a..14d435ae5 100644 --- a/test/transport/pipeline.test.js +++ b/test/transport/pipeline.test.js @@ -10,14 +10,6 @@ const pino = require('../../') const { pid } = process const hostname = os.hostname() -/** - * The idea to address the requirement of mix&match target and pipeline - * https://github.com/pinojs/pino/issues/1302 - * is to implement a sort of `tee`. - * Targets defined in `pipelines` property will be used to generate - * an output value which will be then written to different end targets. - * - */ test('pino.transport with a pipeline', async ({ same, teardown }) => { const destination = file() const transport = pino.transport({ @@ -43,7 +35,7 @@ test('pino.transport with a pipeline', async ({ same, teardown }) => { }) }) -test('pino.transport with targets using a shared pipeline', async ({ same, teardown }) => { +test('pino.transport with targets containing pipelines', async ({ same, teardown }) => { const destinationA = join( os.tmpdir(), '_' + Math.random().toString(36).substr(2, 9) @@ -56,22 +48,20 @@ test('pino.transport with targets using a shared pipeline', async ({ same, teard targets: [ { target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationA }, - pipeline: 'pipelineA' + options: { destination: destinationA } }, { - target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationB }, - pipeline: 'pipelineA' + pipeline: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js') + }, + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationB } + } + ] } - ], - pipelines: { - pipelineA: [ - { - target: join(__dirname, '..', 'fixtures', 'transport-transform.js') - } - ] - } + ] }) teardown(transport.end.bind(transport)) @@ -87,8 +77,7 @@ test('pino.transport with targets using a shared pipeline', async ({ same, teard pid, hostname, level: 30, - msg: 'hello', - service: 'pino' // this property was added by the transform + msg: 'hello' }) same(resultB, { pid, @@ -98,114 +87,3 @@ test('pino.transport with targets using a shared pipeline', async ({ same, teard service: 'pino' // this property was added by the transform }) }) - -test('pino.transport with shared pipeline and target with pipeline=undefined', async ({ same, teardown }) => { - const destinationA = join( - os.tmpdir(), - '_' + Math.random().toString(36).substr(2, 9) - ) - const destinationB = join( - os.tmpdir(), - '_' + Math.random().toString(36).substr(2, 9) - ) - const transport = pino.transport({ - targets: [ - { - target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationA }, - pipeline: 'pipelineA' - }, - { - target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination: destinationB } - } - ], - pipelines: { - pipelineA: [ - { - target: join(__dirname, '..', 'fixtures', 'transport-transform.js') - } - ] - } - }) - - teardown(transport.end.bind(transport)) - const instance = pino(transport) - instance.info('hello') - await watchFileCreated(destinationA) - await watchFileCreated(destinationB) - const resultA = JSON.parse(await readFile(destinationA)) - const resultB = JSON.parse(await readFile(destinationB)) - delete resultA.time - delete resultB.time - same(resultA, { - pid, - hostname, - level: 30, - msg: 'hello', - service: 'pino' // this property was added by the transform - }) - same(resultB, { - pid, - hostname, - level: 30, - msg: 'hello' - }) -}) - -// test('pino.transport with target using a custom pipeline', async ({ same, teardown }) => { -// const destinationA = join( -// os.tmpdir(), -// '_' + Math.random().toString(36).substr(2, 9) -// ) -// const destinationB = join( -// os.tmpdir(), -// '_' + Math.random().toString(36).substr(2, 9) -// ) -// const transport = pino.transport({ -// targets: [ -// { -// target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), -// options: { destination: destinationA }, -// pipeline: [{ -// target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), -// options: { payload: 'customPipeline' } -// }] -// }, -// { -// target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), -// options: { destination: destinationB } -// } -// ], -// pipeline: [ -// { -// target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), -// options: { payload: 'globalPipeline' } -// } -// ] -// }) - -// teardown(transport.end.bind(transport)) -// const instance = pino(transport) -// instance.info('hello') -// await watchFileCreated(destinationA) -// await watchFileCreated(destinationB) -// const resultA = JSON.parse(await readFile(destinationA)) -// const resultB = JSON.parse(await readFile(destinationB)) -// delete resultA.time -// delete resultB.time -// same(resultA, { -// pid, -// hostname, -// level: 30, -// msg: 'hello', -// service: 'customPipeline' // this property was added by the transform -// }) -// same(resultB, { -// pid, -// hostname, -// level: 30, -// msg: 'hello', -// service: 'globalPipeline' // this property was added by the transform -// }) -// }) From dbb40d5039250c93cda1f3c0fcb9a3a3100a45a3 Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sun, 5 May 2024 12:54:59 +0200 Subject: [PATCH 06/10] - Removed `worker-pipeline.js` - Updated docs to remove mentions of `worker-pipeline.js` - Fixed failing UTs - Fixed `transport.js` to use only `worker.js` also when `pipeline` is defined - Fixed `worker.js` to work properly when only `pipeline` is defined --- docs/bundling.md | 4 +- lib/transport.js | 6 +- lib/worker-pipeline.js | 148 ------------------------ lib/worker.js | 26 +++-- test/transport/bundlers-support.test.js | 30 ----- 5 files changed, 19 insertions(+), 195 deletions(-) delete mode 100644 lib/worker-pipeline.js diff --git a/docs/bundling.md b/docs/bundling.md index 6467b8e52..1c326a936 100644 --- a/docs/bundling.md +++ b/docs/bundling.md @@ -7,7 +7,6 @@ In particular, a bundler must ensure that the following files are also bundled s * `lib/worker.js` from the `thread-stream` dependency * `file.js` * `lib/worker.js` -* `lib/worker-pipeline.js` * Any transport used by the user (like `pino-pretty`) Once the files above have been generated, the bundler must also add information about the files above by injecting a code that sets `__bundlerPathsOverrides` in the `globalThis` object. @@ -22,12 +21,11 @@ globalThis.__bundlerPathsOverrides = { 'thread-stream-worker': pinoWebpackAbsolutePath('./thread-stream-worker.js') 'pino/file': pinoWebpackAbsolutePath('./pino-file.js'), 'pino-worker': pinoWebpackAbsolutePath('./pino-worker.js'), - 'pino-pipeline-worker': pinoWebpackAbsolutePath('./pino-pipeline-worker.js'), 'pino-pretty': pinoWebpackAbsolutePath('./pino-pretty.js'), }; ``` -Note that `pino/file`, `pino-worker`, `pino-pipeline-worker`, and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration. +Note that `pino/file`, `pino-worker` and `thread-stream-worker` are required identifiers. Other identifiers are possible based on the user configuration. ## Webpack Plugin diff --git a/lib/transport.js b/lib/transport.js index 19ab496bf..ee480abf9 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -102,13 +102,13 @@ function transport (fullOptions) { }) }) } else if (pipeline) { - target = bundlerOverrides['pino-pipeline-worker'] || join(__dirname, 'worker-pipeline.js') - options.targets = pipeline.map((dest) => { + target = bundlerOverrides['pino-worker'] || join(__dirname, 'worker.js') + options.pipelines = [pipeline.map((dest) => { return { ...dest, target: fixTarget(dest.target) } - }) + })] } if (levels) { diff --git a/lib/worker-pipeline.js b/lib/worker-pipeline.js deleted file mode 100644 index 0c2e5f18e..000000000 --- a/lib/worker-pipeline.js +++ /dev/null @@ -1,148 +0,0 @@ -'use strict' - -const EE = require('events') -const loadTransportStreamBuilder = require('./transport-stream') -const { pipeline, PassThrough } = require('stream') - -// This file is not checked by the code coverage tool, -// as it is not reliable. - -/* istanbul ignore file */ - -module.exports = async function ({ targets, pipelines }) { - if (targets === undefined && pipelines) { - // Create an instance of ThreadStream using target (worker-pipeline.js) as internal thread worker - /** - * In case a pipeline is defined, a single ThreadStream is created as usual - * representing the PassThrough(stream1 + .. + streamN) - * - * If we want to implement a `tee`, the quickest way is to create as many - * pipelines as paths that can be created from a transport configuration - * and return a PassThrough stream wrapping all the pass through streams - * as entry point for each pipeline. - * // TODO: to rephrase - * - * // TODO: propose to deprecate "pipeline" property only - * - * Given for example the following transport configuration: - * - * const transport = pino.transport({ - * targets: [ - * { - * // targetA - * target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - * options: { destination: destinationA } - * // An undefined "pipeline" property means do not use a pipeline as source - * }, - * { - * // targetB - * target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - * options: { destination: destinationB }, - * pipeline: "pipeA" // use pipeA as source - * } - * ], - * pipelines: { - * "pipeA": [ - * { - * target: join(__dirname, '..', 'fixtures', 'transport-transform.js'), - * } - * ] - * } - * }) - * - * We would have 3 pipelines: - * - * 2 actual ones: - * p1 = pipeline(passThrough1, pipeA, targetA) - * p2 = pipeline(passThrough2, pipeA, targetB) - * - * PassThroughEntry // a stream that serves purely as entry point for ThreadStream - * - * 1 pipeline - * p3 = pipeline(PassThroughEntry, passThrough1, passThrough2) - * - * return PassThroughEntry - */ - - const entryStreams = [] - for (let i = 0; i < pipelines.length; i++) { - const pipelineTargets = pipelines[i] - const streams = await Promise.all(pipelineTargets.map(async (t) => { - const fn = await loadTransportStreamBuilder(t.target) - const stream = await fn(t.options) - return stream - })) - - const ee = new EE() - - const stream = new PassThrough({ - autoDestroy: true, - destroy (_, cb) { - ee.on('error', cb) - ee.on('closed', cb) - } - }) - - pipeline(stream, ...streams, function (err) { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - ee.emit('error', err) - return - } - - ee.emit('closed') - }) - - entryStreams.push(stream) - } - - const ee = new EE() - const sourceStream = new PassThrough({ - autoDestroy: true, - destroy (_, cb) { - ee.on('error', cb) - ee.on('closed', cb) - } - }) - - pipeline(sourceStream, ...entryStreams, function (err) { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - ee.emit('error', err) - return - } - - ee.emit('closed') - }) - - return sourceStream - } else { - // Maintaining bakcwards compatibility - const streams = await Promise.all(targets.map(async (t) => { - const fn = await loadTransportStreamBuilder(t.target) - const stream = await fn(t.options) - return stream - })) - - if (targets) { - const ee = new EE() - - const stream = new PassThrough({ - autoDestroy: true, - destroy (_, cb) { - ee.on('error', cb) - ee.on('closed', cb) - } - }) - - pipeline(stream, ...streams, function (err) { - if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { - ee.emit('error', err) - return - } - - ee.emit('closed') - }) - - return stream - } - } -} diff --git a/lib/worker.js b/lib/worker.js index 4f7be9431..a6511c6c7 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -12,20 +12,24 @@ const loadTransportStreamBuilder = require('./transport-stream') /* istanbul ignore file */ module.exports = async function ({ targets, pipelines, levels, dedupe }) { + const targetStreams = [] + // Process targets - targets = await Promise.all(targets.map(async (t) => { - const fn = await loadTransportStreamBuilder(t.target) - const stream = await fn(t.options) - return { - level: t.level, - stream - } - })) + if (targets) { + targets = await Promise.all(targets.map(async (t) => { + const fn = await loadTransportStreamBuilder(t.target) + const stream = await fn(t.options) + return { + level: t.level, + stream + } + })) - const targetStreams = [...targets] + targetStreams.push(...targets) + } // Process pipelines - if (pipelines && pipelines.length) { + if (pipelines) { pipelines = await Promise.all( pipelines.map(async (p) => { const pipeDests = await Promise.all( @@ -77,7 +81,7 @@ module.exports = async function ({ targets, pipelines, levels, dedupe }) { }) } -/** + /** * Creates a pipeline using the provided streams and return an instance of `PassThrough` stream * as a source for the pipeline. * diff --git a/test/transport/bundlers-support.test.js b/test/transport/bundlers-support.test.js index efe8a5edc..cb10221c0 100644 --- a/test/transport/bundlers-support.test.js +++ b/test/transport/bundlers-support.test.js @@ -95,33 +95,3 @@ test('pino.transport with worker destination overridden by bundler and mjs trans globalThis.__bundlerPathsOverrides = undefined }) - -test('pino.transport with worker-pipeline destination overridden by bundler', async ({ same, teardown }) => { - globalThis.__bundlerPathsOverrides = { - 'pino-pipeline-worker': join(__dirname, '..', '..', 'lib/worker-pipeline.js') - } - - const destination = file() - const transport = pino.transport({ - pipeline: [ - { - target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), - options: { destination } - } - ] - }) - teardown(transport.end.bind(transport)) - const instance = pino(transport) - instance.info('hello') - await watchFileCreated(destination) - const result = JSON.parse(await readFile(destination)) - delete result.time - same(result, { - pid, - hostname, - level: 30, - msg: 'hello' - }) - - globalThis.__bundlerPathsOverrides = undefined -}) From ddbbafb995646266ccef43b13aa29648e381b04c Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sun, 5 May 2024 13:28:02 +0200 Subject: [PATCH 07/10] added a simple flow schema to worker.js --- lib/worker.js | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/lib/worker.js b/lib/worker.js index a6511c6c7..bd51a5231 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -11,6 +11,37 @@ const loadTransportStreamBuilder = require('./transport-stream') /* istanbul ignore file */ + +// The worker flow can be schematized as follows +// +// ┌──────────────────────────────────────────────────┐ ┌─────┐ +// │ │ │ │ +// │ │ │ p │ +// │ target ┌───────────────┐ │ │ i │ +// │ ──────────► │ │ │ │ n │ +// │ targets target │ pino. │ │ │ o │ +// │ ────────────► ──────────► │ multistream ├────┼──►│ . │ source +// │ target │ │ │ │ m │ │ +// │ ──────────► └───────────────┘ │ │ u │ │write +// │ │ │ l │ ▼ +// │ pipeline ┌───────────────┐ │ │ t │ ┌────────┐ +// │ ──────────► │ PassThrough ├────┼──►│ i ├──────┤ │ +// │ └───────────────┘ │ │ s │ write│ Thread │ +// │ │ │ t │◄─────┤ Stream │ +// │ pipeline ┌───────────────┐ │ │ r │ │ │ +// │ ──────────► │ PassThrough ├────┼──►│ e │ └────────┘ +// │ └───────────────┘ │ │ a │ +// │ │ │ m │ +// │ │ │ │ +// │ OR │ │ │ +// │ │ │ │ +// │ pipeline ┌──────────────┐ │ │ │ +// │ ────────────► │ PassThrough ├───────────────────┼──►│ │ +// │ └──────────────┘ │ │ │ +// │ │ │ │ +// └──────────────────────────────────────────────────┘ └─────┘ + + module.exports = async function ({ targets, pipelines, levels, dedupe }) { const targetStreams = [] From 31faf4ed081a16b8c6cc57e9d59c810edd35a5e4 Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sun, 5 May 2024 13:28:58 +0200 Subject: [PATCH 08/10] added a simple flow schema to worker.js --- lib/worker.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/worker.js b/lib/worker.js index bd51a5231..42e0a9998 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -11,7 +11,6 @@ const loadTransportStreamBuilder = require('./transport-stream') /* istanbul ignore file */ - // The worker flow can be schematized as follows // // ┌──────────────────────────────────────────────────┐ ┌─────┐ @@ -41,7 +40,6 @@ const loadTransportStreamBuilder = require('./transport-stream') // │ │ │ │ // └──────────────────────────────────────────────────┘ └─────┘ - module.exports = async function ({ targets, pipelines, levels, dedupe }) { const targetStreams = [] From 40297e31fecbf14910c16175630e174a10c61e0a Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sat, 11 May 2024 12:14:18 +0200 Subject: [PATCH 09/10] Added a special case in worker.js to skip the multistream instance when a single target or pipeline is defined --- lib/worker.js | 136 +++++++++++++++++++++++++++++++++----------------- 1 file changed, 91 insertions(+), 45 deletions(-) diff --git a/lib/worker.js b/lib/worker.js index 42e0a9998..a0864b765 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -11,40 +11,73 @@ const loadTransportStreamBuilder = require('./transport-stream') /* istanbul ignore file */ -// The worker flow can be schematized as follows -// -// ┌──────────────────────────────────────────────────┐ ┌─────┐ -// │ │ │ │ -// │ │ │ p │ -// │ target ┌───────────────┐ │ │ i │ -// │ ──────────► │ │ │ │ n │ -// │ targets target │ pino. │ │ │ o │ -// │ ────────────► ──────────► │ multistream ├────┼──►│ . │ source -// │ target │ │ │ │ m │ │ -// │ ──────────► └───────────────┘ │ │ u │ │write -// │ │ │ l │ ▼ -// │ pipeline ┌───────────────┐ │ │ t │ ┌────────┐ -// │ ──────────► │ PassThrough ├────┼──►│ i ├──────┤ │ -// │ └───────────────┘ │ │ s │ write│ Thread │ -// │ │ │ t │◄─────┤ Stream │ -// │ pipeline ┌───────────────┐ │ │ r │ │ │ -// │ ──────────► │ PassThrough ├────┼──►│ e │ └────────┘ -// │ └───────────────┘ │ │ a │ -// │ │ │ m │ -// │ │ │ │ -// │ OR │ │ │ -// │ │ │ │ -// │ pipeline ┌──────────────┐ │ │ │ -// │ ────────────► │ PassThrough ├───────────────────┼──►│ │ -// │ └──────────────┘ │ │ │ -// │ │ │ │ -// └──────────────────────────────────────────────────┘ └─────┘ +/* + * > Multiple targets & pipelines + * + * + * ┌─────────────────────────────────────────────────┐ ┌─────┐ + * │ │ │ p │ + * │ │ │ i │ + * │ target │ │ n │ + * │ │ ────────────────────────────────┼────┤ o │ + * │ targets │ target │ │ . │ + * │ ────────────► │ ────────────────────────────────┼────┤ m │ source + * │ │ target │ │ u │ │ + * │ │ ────────────────────────────────┼────┤ l │ │write + * │ │ │ │ t │ ▼ + * │ │ pipeline ┌───────────────┐ │ │ i │ ┌────────┐ + * │ │ ──────────► │ PassThrough ├───┼────┤ s ├──────┤ │ + * │ │ └───────────────┘ │ │ t │ write│ Thread │ + * │ │ │ │ r │◄─────┤ Stream │ + * │ │ pipeline ┌───────────────┐ │ │ e │ │ │ + * │ │ ──────────► │ PassThrough ├───┼────┤ a │ └────────┘ + * │ └───────────────┘ │ │ m │ + * │ │ │ │ + * └─────────────────────────────────────────────────┘ └─────┘ + * + * + * + * > One single pipeline or target + * + * + * source + * │ + * ┌────────────────────────────────────────────────┐ │write + * │ │ ▼ + * │ │ ┌────────┐ + * │ targets │ target │ │ │ + * │ ────────────► │ ──────────────────────────────┤ │ │ + * │ │ │ │ │ + * │ ├──────┤ │ + * │ │ │ │ + * │ │ │ │ + * │ OR │ │ │ + * │ │ │ │ + * │ │ │ │ + * │ ┌──────────────┐ │ │ │ + * │ targets │ pipeline │ │ │ │ Thread │ + * │ ────────────► │ ────────────►│ PassThrough ├─┤ │ Stream │ + * │ │ │ │ │ │ │ + * │ └──────────────┘ │ │ │ + * │ │ │ │ + * │ OR │ write│ │ + * │ │◄─────┤ │ + * │ │ │ │ + * │ ┌──────────────┐ │ │ │ + * │ pipeline │ │ │ │ │ + * │ ──────────────►│ PassThrough ├────────────────┤ │ │ + * │ │ │ │ │ │ + * │ └──────────────┘ │ └────────┘ + * │ │ + * │ │ + * └────────────────────────────────────────────────┘ + */ module.exports = async function ({ targets, pipelines, levels, dedupe }) { const targetStreams = [] // Process targets - if (targets) { + if (targets && targets.length) { targets = await Promise.all(targets.map(async (t) => { const fn = await loadTransportStreamBuilder(t.target) const stream = await fn(t.options) @@ -58,7 +91,7 @@ module.exports = async function ({ targets, pipelines, levels, dedupe }) { } // Process pipelines - if (pipelines) { + if (pipelines && pipelines.length) { pipelines = await Promise.all( pipelines.map(async (p) => { const pipeDests = await Promise.all( @@ -75,24 +108,37 @@ module.exports = async function ({ targets, pipelines, levels, dedupe }) { targetStreams.push(...pipelines) } - return build(process, { - parse: 'lines', - metadata: true, - close (err, cb) { - let expected = 0 - for (const transport of targetStreams) { - expected++ - transport.stream.on('close', closeCb) - transport.stream.end() - } + // Skip building the multistream step if either one single pipeline or target is defined and + // return directly the stream instance back to TreadStream. + // This is equivalent to define either: + // + // pino.transport({ target: ... }) + // + // OR + // + // pino.transport({ pipeline: ... }) + if (targetStreams.length === 1) { + return targetStreams[0].stream + } else { + return build(process, { + parse: 'lines', + metadata: true, + close (err, cb) { + let expected = 0 + for (const transport of targetStreams) { + expected++ + transport.stream.on('close', closeCb) + transport.stream.end() + } - function closeCb () { - if (--expected === 0) { - cb(err) + function closeCb () { + if (--expected === 0) { + cb(err) + } } } - } - }) + }) + } // TODO: Why split2 was not used for pipelines? function process (stream) { From 57bcd7d5d5faf4314a178cf2a1463d8bd23d2c4a Mon Sep 17 00:00:00 2001 From: Daniele Bacarella Date: Sat, 11 May 2024 14:37:56 +0200 Subject: [PATCH 10/10] - Added optional 'level' property to TransportPipelineOptions interface - A level can now be defined for pipelines defined inside 'targets' - Added UT in 'pipeline.test.js' to check expected behaviour with 'dedupe' --- lib/transport.js | 1 + lib/worker.js | 8 +++- pino.d.ts | 1 + test/transport/pipeline.test.js | 68 +++++++++++++++++++++++++++------ 4 files changed, 66 insertions(+), 12 deletions(-) diff --git a/lib/transport.js b/lib/transport.js index ee480abf9..b8ce347f2 100644 --- a/lib/transport.js +++ b/lib/transport.js @@ -97,6 +97,7 @@ function transport (fullOptions) { return dest.pipeline.map((t) => { return { ...t, + level: dest.level, // duplicate the pipeline `level` property defined in the upper level target: fixTarget(t.target) } }) diff --git a/lib/worker.js b/lib/worker.js index a0864b765..dcbd5d391 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -94,15 +94,21 @@ module.exports = async function ({ targets, pipelines, levels, dedupe }) { if (pipelines && pipelines.length) { pipelines = await Promise.all( pipelines.map(async (p) => { + let level const pipeDests = await Promise.all( p.map(async (t) => { + // level assigned to pipeline is duplicated over all its targets, just store it + level = t.level const fn = await loadTransportStreamBuilder(t.target) const stream = await fn(t.options) return stream } )) - return { stream: createPipeline(pipeDests) } + return { + level, + stream: createPipeline(pipeDests) + } }) ) targetStreams.push(...pipelines) diff --git a/pino.d.ts b/pino.d.ts index 33f9788ca..3efe72276 100644 --- a/pino.d.ts +++ b/pino.d.ts @@ -259,6 +259,7 @@ declare namespace pino { interface TransportPipelineOptions> extends TransportBaseOptions{ pipeline: TransportSingleOptions[] + level?: LevelWithSilentOrString } interface TransportMultiOptions> extends TransportBaseOptions{ diff --git a/test/transport/pipeline.test.js b/test/transport/pipeline.test.js index 14d435ae5..a845d4425 100644 --- a/test/transport/pipeline.test.js +++ b/test/transport/pipeline.test.js @@ -6,6 +6,7 @@ const { readFile } = require('fs').promises const { watchFileCreated, file } = require('../helper') const { test } = require('tap') const pino = require('../../') +const { DEFAULT_LEVELS } = require('../../lib/constants') const { pid } = process const hostname = os.hostname() @@ -29,21 +30,15 @@ test('pino.transport with a pipeline', async ({ same, teardown }) => { same(result, { pid, hostname, - level: 30, + level: DEFAULT_LEVELS.info, msg: 'hello', service: 'pino' // this property was added by the transform }) }) test('pino.transport with targets containing pipelines', async ({ same, teardown }) => { - const destinationA = join( - os.tmpdir(), - '_' + Math.random().toString(36).substr(2, 9) - ) - const destinationB = join( - os.tmpdir(), - '_' + Math.random().toString(36).substr(2, 9) - ) + const destinationA = file() + const destinationB = file() const transport = pino.transport({ targets: [ { @@ -76,14 +71,65 @@ test('pino.transport with targets containing pipelines', async ({ same, teardown same(resultA, { pid, hostname, - level: 30, + level: DEFAULT_LEVELS.info, msg: 'hello' }) same(resultB, { pid, hostname, - level: 30, + level: DEFAULT_LEVELS.info, msg: 'hello', service: 'pino' // this property was added by the transform }) }) + +test('pino.transport with targets containing pipelines with levels defined and dedupe', async ({ same, teardown }) => { + const destinationA = file() + const destinationB = file() + const transport = pino.transport({ + targets: [ + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationA }, + level: DEFAULT_LEVELS.info + }, + { + pipeline: [ + { + target: join(__dirname, '..', 'fixtures', 'transport-transform.js') + }, + { + target: join(__dirname, '..', 'fixtures', 'to-file-transport.js'), + options: { destination: destinationB } + } + ], + level: DEFAULT_LEVELS.error + } + ], + dedupe: true + }) + + teardown(transport.end.bind(transport)) + const instance = pino(transport) + instance.info('hello info') + instance.error('hello error') + await watchFileCreated(destinationA) + await watchFileCreated(destinationB) + const resultA = JSON.parse(await readFile(destinationA)) + const resultB = JSON.parse(await readFile(destinationB)) + delete resultA.time + delete resultB.time + same(resultA, { + pid, + hostname, + level: DEFAULT_LEVELS.info, + msg: 'hello info' + }) + same(resultB, { + pid, + hostname, + level: DEFAULT_LEVELS.error, + msg: 'hello error', + service: 'pino' // this property was added by the transform + }) +})