Skip to content

Commit

Permalink
feat: send config to transports on init (#1930)
Browse files Browse the repository at this point in the history
* feat: send config to transports on init

* test: integration for transports using pino config
  • Loading branch information
10xLaCroixDrinker committed Apr 24, 2024
1 parent d9911b0 commit 5ceb596
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 3 deletions.
10 changes: 10 additions & 0 deletions lib/multistream.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ function multistream (streamsArray, opts) {
const res = {
write,
add,
emit,
flushSync,
end,
minLevel: 0,
Expand Down Expand Up @@ -79,6 +80,14 @@ function multistream (streamsArray, opts) {
}
}

function emit (...args) {
for (const { stream } of this.streams) {
if (typeof stream.emit === 'function') {
stream.emit(...args)
}
}
}

function flushSync () {
for (const { stream } of this.streams) {
if (typeof stream.flushSync === 'function') {
Expand Down Expand Up @@ -153,6 +162,7 @@ function multistream (streamsArray, opts) {
minLevel: level,
streams,
clone,
emit,
flushSync,
[metadata]: true
}
Expand Down
2 changes: 2 additions & 0 deletions lib/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ function transport (fullOptions) {
options.dedupe = dedupe
}

options.pinoWillSendConfig = true

return buildStream(fixTarget(target), options, worker)

function fixTarget (origin) {
Expand Down
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@
"atomic-sleep": "^1.0.0",
"fast-redact": "^3.1.1",
"on-exit-leak-free": "^2.1.0",
"pino-abstract-transport": "^1.1.0",
"pino-abstract-transport": "^1.2.0",
"pino-std-serializers": "^6.0.0",
"process-warning": "^3.0.0",
"quick-format-unescaped": "^4.0.3",
"real-require": "^0.2.0",
"safe-stable-stringify": "^2.3.1",
"sonic-boom": "^3.7.0",
"thread-stream": "^2.0.0"
"thread-stream": "^2.6.0"
},
"tsd": {
"directory": "test/types"
Expand Down
4 changes: 4 additions & 0 deletions pino.js
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ function pino (...args) {
assertDefaultLevelFound(level, customLevels, useOnlyCustomLevels)
const levels = mappings(customLevels, useOnlyCustomLevels)

if (typeof stream.emit === 'function') {
stream.emit('message', { code: 'PINO_CONFIG', config: { levels, messageKey, errorKey } })
}

assertLevelComparison(levelComparison)
const levelCompFunc = genLevelComparison(levelComparison)

Expand Down
33 changes: 33 additions & 0 deletions test/basic.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,39 @@ test('serializers can return undefined to strip field', async ({ equal }) => {
equal('test' in result, false)
})

test('streams receive a message event with PINO_CONFIG', ({ match, end }) => {
const stream = sink()
stream.once('message', (message) => {
match(message, {
code: 'PINO_CONFIG',
config: {
errorKey: 'err',
levels: {
labels: {
10: 'trace',
20: 'debug',
30: 'info',
40: 'warn',
50: 'error',
60: 'fatal'
},
values: {
debug: 20,
error: 50,
fatal: 60,
info: 30,
trace: 10,
warn: 40
}
},
messageKey: 'msg'
}
})
end()
})
pino(stream)
})

test('does not explode with a circular ref', async ({ doesNotThrow }) => {
const stream = sink()
const instance = pino(stream)
Expand Down
33 changes: 33 additions & 0 deletions test/fixtures/transport-uses-pino-config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

const build = require('pino-abstract-transport')
const { pipeline, Transform } = require('stream')
module.exports = () => {
return build(function (source) {
const myTransportStream = new Transform({
autoDestroy: true,
objectMode: true,
transform (chunk, enc, cb) {
const {
time,
level,
[source.messageKey]: body,
[source.errorKey]: error,
...attributes
} = chunk
this.push(JSON.stringify({
severityText: source.levels.labels[level],
body,
attributes,
...(error && { error })
}))
cb()
}
})
pipeline(source, myTransportStream, () => {})
return myTransportStream
}, {
enablePipelining: true,
expectPinoConfig: true
})
}
19 changes: 19 additions & 0 deletions test/fixtures/transport-worker-data.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
'use strict'

const { parentPort, workerData } = require('worker_threads')
const { Writable } = require('stream')

module.exports = (options) => {
const myTransportStream = new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
parentPort.postMessage({
code: 'EVENT',
name: 'workerData',
args: [workerData]
})
cb()
}
})
return myTransportStream
}
15 changes: 14 additions & 1 deletion test/multistream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const pino = require('../')
const multistream = pino.multistream
const proxyquire = require('proxyquire')
const strip = require('strip-ansi')
const { file } = require('./helper')
const { file, sink } = require('./helper')

test('sends to multiple streams using string levels', function (t) {
let messageCount = 0
Expand Down Expand Up @@ -246,6 +246,19 @@ test('supports pretty print', function (t) {
log.info('pretty print')
})

test('emit propagates events to each stream', function (t) {
t.plan(3)
const handler = function (data) {
t.equal(data.msg, 'world')
}
const streams = [sink(), sink(), sink()]
streams.forEach(function (s) {
s.once('hello', handler)
})
const stream = multistream(streams)
stream.emit('hello', { msg: 'world' })
})

test('children support custom levels', function (t) {
const stream = writeStream(function (data, enc, cb) {
t.equal(JSON.parse(data).msg, 'bar')
Expand Down
13 changes: 13 additions & 0 deletions test/transport/core.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,19 @@ test('pino.transport with target pino-pretty', async ({ match, teardown }) => {
match(strip(actual), /\[.*\] INFO.*hello/)
})

test('sets worker data informing the transport that pino will send its config', ({ match, plan, teardown }) => {
plan(1)
const transport = pino.transport({
target: join(__dirname, '..', 'fixtures', 'transport-worker-data.js')
})
teardown(transport.end.bind(transport))
const instance = pino(transport)
transport.once('workerData', (workerData) => {
match(workerData.workerData, { pinoWillSendConfig: true })
})
instance.info('hello')
})

test('stdout in worker', async ({ not }) => {
let actual = ''
const child = execa(process.argv[0], [join(__dirname, '..', 'fixtures', 'transport-main.js')])
Expand Down
167 changes: 167 additions & 0 deletions test/transport/uses-pino-config.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
'use strict'

const os = require('os')
const { join } = require('path')
const { readFile } = require('fs').promises
const writeStream = require('flush-write-stream')
const { watchFileCreated, file } = require('../helper')
const { test } = require('tap')
const pino = require('../../')

const { pid } = process
const hostname = os.hostname()

function serializeError (error) {
return {
type: error.name,
message: error.message,
stack: error.stack
}
}

function parseLogs (buffer) {
return JSON.parse(`[${buffer.toString().replace(/}{/g, '},{')}]`)
}

test('transport uses pino config', async ({ same, teardown, plan }) => {
plan(1)
const destination = file()
const transport = pino.transport({
pipeline: [{
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
}, {
target: 'pino/file',
options: { destination }
}]
})
teardown(transport.end.bind(transport))
const instance = pino({
messageKey: 'customMessageKey',
errorKey: 'customErrorKey',
customLevels: { custom: 35 }
}, transport)

const error = new Error('bar')
instance.custom('foo')
instance.error(error)
await watchFileCreated(destination)
const result = parseLogs(await readFile(destination))

same(result, [{
severityText: 'custom',
body: 'foo',
attributes: {
pid,
hostname
}
}, {
severityText: 'error',
body: 'bar',
attributes: {
pid,
hostname
},
error: serializeError(error)
}])
})

test('transport uses pino config without customizations', async ({ same, teardown, plan }) => {
plan(1)
const destination = file()
const transport = pino.transport({
pipeline: [{
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
}, {
target: 'pino/file',
options: { destination }
}]
})
teardown(transport.end.bind(transport))
const instance = pino(transport)

const error = new Error('qux')
instance.info('baz')
instance.error(error)
await watchFileCreated(destination)
const result = parseLogs(await readFile(destination))

same(result, [{
severityText: 'info',
body: 'baz',
attributes: {
pid,
hostname
}
}, {
severityText: 'error',
body: 'qux',
attributes: {
pid,
hostname
},
error: serializeError(error)
}])
})

test('transport uses pino config with multistream', async ({ same, teardown, plan }) => {
plan(2)
const destination = file()
const messages = []
const stream = writeStream(function (data, enc, cb) {
const message = JSON.parse(data)
delete message.time
messages.push(message)
cb()
})
const transport = pino.transport({
pipeline: [{
target: join(__dirname, '..', 'fixtures', 'transport-uses-pino-config.js')
}, {
target: 'pino/file',
options: { destination }
}]
})
teardown(transport.end.bind(transport))
const instance = pino({
messageKey: 'customMessageKey',
errorKey: 'customErrorKey',
customLevels: { custom: 35 }
}, pino.multistream([transport, { stream }]))

const error = new Error('buzz')
const serializedError = serializeError(error)
instance.custom('fizz')
instance.error(error)
await watchFileCreated(destination)
const result = parseLogs(await readFile(destination))

same(result, [{
severityText: 'custom',
body: 'fizz',
attributes: {
pid,
hostname
}
}, {
severityText: 'error',
body: 'buzz',
attributes: {
pid,
hostname
},
error: serializedError
}])

same(messages, [{
level: 35,
pid,
hostname,
customMessageKey: 'fizz'
}, {
level: 50,
pid,
hostname,
customErrorKey: serializedError,
customMessageKey: 'buzz'
}])
})

0 comments on commit 5ceb596

Please sign in to comment.