Skip to content

Commit

Permalink
feat: logs processing speed increased
Browse files Browse the repository at this point in the history
  • Loading branch information
makeros committed Mar 7, 2021
1 parent 215a6bb commit 36a06e2
Show file tree
Hide file tree
Showing 13 changed files with 5,221 additions and 3,509 deletions.
2 changes: 2 additions & 0 deletions bench/bench.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/bash
time node producer.js 1000000 | node --trace-uncaught ../index.js log -c ../__mocks__/custom-schema.json -v
5 changes: 5 additions & 0 deletions bench/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'strict'

for (let i = 0; i < process.argv.slice(2)[0]; i++) {
process.stdout.write('{"level":30,"time":1531171074631,"msg":"hello world","nested":{"mock":666},"test2":"red","pid":657,"hostname":"box","name":"app","v":1}' + '\n')
}
13 changes: 9 additions & 4 deletions lib/gelf-transformer.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
const fastJsonParse = require('fast-json-parse')
const pipeline = require('./pipeline')
const pump = require('pump')
const processLogs = require('./process-logs')
const streamLogs = require('./stream-logs')
const split = require('split2')
const { pipeline } = require('readable-stream')

module.exports = function (opts) {
const pipe = pipeline(opts)
pump(process.stdin, split(fastJsonParse), pipe)
pipeline(
process.stdin,
split(fastJsonParse),
processLogs(opts),
...streamLogs(opts)
)
process.on('SIGINT', function () { process.exit(0) })
}
27 changes: 0 additions & 27 deletions lib/pipeline.js

This file was deleted.

30 changes: 30 additions & 0 deletions lib/process-logs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
const transformer = require('./transformer')
const { stringify } = require('./utils')
const { Transform } = require('readable-stream')

module.exports = function processLogs (opts) {
const transform = transformer(opts)
const stringified = stringify(opts)

return new Transform({
writableObjectMode: true,
decodeStrings: false,
transform: function (data, enc, cb) {
if (data.value !== null) {
this.push(stringified(transform(data.value)))
}
cb()
},
construct (callback) {
this.data = ''
callback()
},
flush (callback) {
try {
this.push('')
} catch (err) {
callback(err)
}
}
})
}
39 changes: 39 additions & 0 deletions lib/stream-logs.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const Transport = require('./transport')
const { Transform } = require('readable-stream')

module.exports = function processLogs (opts) {
const streams = []
if (opts.verbose) {
streams.push(getVerbose())
}
if (opts.transportToGraylog) {
streams.push(getTransport(opts))
}
return streams
}

function getVerbose () {
return new Transform({
writableObjectMode: true,
decodeStrings: false,
transform: function (data, enc, cb) {
setImmediate(function () { process.stdout.write(data + '\n') })
cb()
}
})
}

function getTransport (opts) {
console.log('transport is prepared')
const transport = new Transport(opts)
return new Transform({
writableObjectMode: true,
decodeStrings: false,
transform: function (data, enc, cb) {
setImmediate(function () {
transport.emit('log', data)
})
cb()
}
})
}
15 changes: 8 additions & 7 deletions lib/transformer/custom-gelf.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
const get = require('lodash.get')

module.exports = function (data, customSchema) {
return Object.entries(customSchema.properties)
.reduce((acc, entry) => {
const sourcePath = entry[1].source ? entry[1].source : entry[0]
acc[entry[0]] = get(data, sourcePath, undefined)
return acc
}, {})
module.exports = function customGelf (properties) {
return function (data) {
const _data = {}
for (const property in properties) {
_data[property] = get(data, properties[property].source, data[property])
}
return _data
}
}
13 changes: 6 additions & 7 deletions lib/transformer/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ const buildCustomGelf = require('./custom-gelf')

module.exports = function (opts) {
const sanitizeData = setMessageField(opts)

return function (data) {
return {
...buildStandardGelf(sanitizeData(data)),
...opts.customSchema ? buildCustomGelf(data, opts.customSchema) : {}
}
const customProperties = opts.customSchema ? opts.customSchema.properties : {}
const build = buildCustomGelf(customProperties)
return function (value) {
return Object.assign(buildStandardGelf(sanitizeData(value)), build(value))
}
}

function setMessageField (opts) {
const _field = opts.messageField
return function (data) {
data.msg = data[opts.messageField] ? data[opts.messageField] : 'No msg property found or msg is empty'
data.msg = data[_field] !== undefined ? data[_field] : 'No msg property found or msg is empty'
return data
}
}
10 changes: 4 additions & 6 deletions lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ const EventEmitter = require('events').EventEmitter
const pick = require('lodash.pick')
const utils = require('./utils')

var Transport = function (opts) {
const Transport = function (opts) {
const self = this

self.config = pick(opts, ['host', 'port', 'maxChunkSize'])
self.stringify = utils.stringify(opts)

self.on('log', function (gelf) {
const msg = self.stringify(gelf)

self.on('log', function (msg) {
self.compress(msg, function (buffer) {
self.processMessage(buffer)
})
Expand All @@ -39,7 +37,7 @@ Transport.prototype.prepareDatagrams = function (chunks, cb) {
const length = chunks.length

const createDatagramArray = function (msgId) {
for (var i = 0; i < chunks.length; i++) {
for (let i = 0; i < chunks.length; i++) {
datagrams[i] = Buffer.from(gelfBytes.concat(msgId, i, length, chunks[i]))
}

Expand Down Expand Up @@ -98,7 +96,7 @@ Transport.prototype.processMessage = function (msg) {
Transport.prototype.sendMultipleMessages = function (datagrams) {
const self = this

for (var i = 0; i < datagrams.length; i++) {
for (let i = 0; i < datagrams.length; i++) {
self.sendMessage(datagrams[i])
}
}
Expand Down
15 changes: 5 additions & 10 deletions lib/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@ const mapPinoToSyslogLevel = {
50: syslogLevel.error,
60: syslogLevel.critical
}
function pinoLevelToSyslogLevel (level) {

exports.pinoLevelToSyslogLevel = function pinoLevelToSyslogLevel (level) {
return mapPinoToSyslogLevel[level] || syslogLevel.critical
}

const stringify = function (opts) {
const schema = standardSchema
exports.stringify = function stringify (opts) {
if (opts.customSchema) {
schema.properties = Object.assign(schema.properties, opts.customSchema.properties)
Object.assign(standardSchema.properties, opts.customSchema.properties)
}

return fastJsonStringify(schema)
}

module.exports = {
pinoLevelToSyslogLevel,
stringify
return fastJsonStringify(standardSchema)
}
Loading

0 comments on commit 36a06e2

Please # to comment.