Skip to content

Commit

Permalink
feat(worker): Retain meta on log entries forwarded from core (#1225)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjameswh authored Oct 12, 2023
1 parent 2b585b6 commit 35c6005
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 6 deletions.
1 change: 1 addition & 0 deletions packages/core-bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/core-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ opentelemetry = "0.18"
parking_lot = "0.12"
prost = "0.11"
prost-types = "0.11"
serde_json = "1.0"
tokio = "1.13"
once_cell = "1.7.2"
temporal-sdk-core = { version = "*", path = "./sdk-core/core", features = ["ephemeral-server"] }
Expand Down
67 changes: 67 additions & 0 deletions packages/core-bridge/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,3 +187,70 @@ where
cx.throw_type_error::<_, Vec<u8>>(format!("Invalid or missing {}", full_attr_path))
}
}

// Recursively convert a Serde value to a JS value
pub fn serde_value_to_js_value<'a>(cx: &mut impl Context<'a>, val: serde_json::Value) -> JsResult<'a, JsValue> {
match val {
serde_json::Value::String(s) => Ok(cx.string(s).upcast()),
serde_json::Value::Number(n) => Ok(cx.number(n.as_f64().unwrap()).upcast()),
serde_json::Value::Bool(b) => Ok(cx.boolean(b).upcast()),
serde_json::Value::Null => Ok(cx.null().upcast()),
serde_json::Value::Array(vec ) => {
let arr: Handle<'a, JsArray> = JsArray::new(cx, vec.len() as u32);
for (i, v) in vec.into_iter().enumerate() {
let v = serde_value_to_js_value(cx, v)?;
arr.set(cx, i as u32, v)?;
}
Ok(arr.upcast())
}
serde_json::Value::Object(map) => {
hashmap_to_js_value(cx, map).map(|v| v.upcast())
}
}
}

pub fn hashmap_to_js_value<'a>(cx: &mut impl Context<'a>, map: impl IntoIterator<Item = (String, serde_json::Value)>) -> JsResult<'a, JsObject> {
let obj: Handle<'a, JsObject> = cx.empty_object();
for (k, v) in map {
let k = cx.string(snake_to_camel(k));
let v = serde_value_to_js_value(cx, v)?;
obj.set(cx, k, v)?;
}
Ok(obj)
}

fn snake_to_camel(input: String) -> String {
match input.find('_') {
None => input,
Some(first) => {
let mut result = String::with_capacity(input.len());
if first > 0 {
result.push_str(&input[..first]);
}
let mut capitalize = true;
for c in input[first+1..].chars() {
if c == '_' {
capitalize = true;
} else if capitalize {
result.push(c.to_ascii_uppercase());
capitalize = false;
} else {
result.push(c.to_ascii_lowercase());
}
}
result
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn snake_to_camel_works() {
assert_eq!(snake_to_camel("this_is_a_test".into()), "thisIsATest");
assert_eq!(snake_to_camel("this___IS_a_TEST".into()), "thisIsATest");
assert_eq!(snake_to_camel("éàç_this_is_a_test".into()), "éàçThisIsATest");
}
}
13 changes: 11 additions & 2 deletions packages/core-bridge/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,24 @@ pub fn start_bridge_loop(
send_result(channel.clone(), callback, |cx| {
let logarr = cx.empty_array();
for (i, cl) in logs.into_iter().enumerate() {
// Not much to do here except for panic when there's an
// error here.
// Not much to do here except for panic when there's an error here.
let logobj = cx.empty_object();

let level = cx.string(cl.level.to_string());
logobj.set(cx, "level", level).unwrap();

let ts = system_time_to_js(cx, cl.timestamp).unwrap();
logobj.set(cx, "timestamp", ts).unwrap();

let msg = cx.string(cl.message);
logobj.set(cx, "message", msg).unwrap();

let fieldsobj = hashmap_to_js_value(cx, cl.fields);
logobj.set(cx, "fields", fieldsobj.unwrap()).unwrap();

let target = cx.string(cl.target);
logobj.set(cx, "target", target).unwrap();

logarr.set(cx, i as u32, logobj).unwrap();
}
Ok(logarr)
Expand Down
11 changes: 11 additions & 0 deletions packages/core-bridge/ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ export interface WorkerOptions {
maxActivitiesPerSecond?: number;
}

export type LogEntryMetadata = {
[key: string]: string | number | boolean | LogEntryMetadata;
};

export interface LogEntry {
/** Log message */
message: string;
Expand All @@ -353,8 +357,15 @@ export interface LogEntry {
* Should be switched to bigint once it is supported in neon.
*/
timestamp: [number, number];

/** Log level */
level: LogLevel;

/** Name of the Core subsystem that emitted that log entry */
target: string;

/*** Metadata fields */
fields: LogEntryMetadata;
}

/**
Expand Down
49 changes: 47 additions & 2 deletions packages/test/src/test-runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
*/
import test from 'ava';
import { v4 as uuid4 } from 'uuid';
import { Runtime, DefaultLogger } from '@temporalio/worker';
import { WorkflowClient } from '@temporalio/client';
import asyncRetry from 'async-retry';
import { Runtime, DefaultLogger, LogEntry, makeTelemetryFilterString } from '@temporalio/worker';
import { Client, WorkflowClient } from '@temporalio/client';
import { defaultOptions } from './mock-native-worker';
import * as workflows from './workflows';
import { RUN_INTEGRATION_TESTS, Worker } from './helpers';
Expand Down Expand Up @@ -73,6 +74,50 @@ if (RUN_INTEGRATION_TESTS) {
}
});

test.serial('Runtime.instance() Core forwarded logs contains metadata', async (t) => {
const logEntries: LogEntry[] = [];
const logger = new DefaultLogger('DEBUG', (entry) => logEntries.push(entry));
Runtime.install({
logger,
telemetryOptions: { logging: { forward: {}, filter: makeTelemetryFilterString({ core: 'DEBUG' }) } },
});
try {
{
const runtime = Runtime.instance();
t.is(runtime.options.logger, logger);
}
await new Client().workflow.start('not-existant', { taskQueue: 'q1', workflowId: uuid4() });
const worker = await Worker.create({
...defaultOptions,
taskQueue: 'q1',
});
await worker.runUntil(() =>
asyncRetry(
() => {
if (!logEntries.some((x) => x.message === 'Failing workflow task'))
throw new Error('Waiting for failing workflow task');
},
{ minTimeout: 100, factor: 1, maxTimeout: 5000 }
)
);

const initWorkerEntry = logEntries.filter((x) => x.message === 'Initializing worker')?.[0];
t.true(initWorkerEntry !== undefined);
t.is(initWorkerEntry.meta?.['taskQueue'], 'q1');

const failingWftEntry = logEntries.filter((x) => x.message === 'Failing workflow task')?.[0];
t.true(failingWftEntry !== undefined);
t.is(failingWftEntry.meta?.['taskQueue'], 'q1');
t.is(typeof failingWftEntry.meta?.['completion'], 'string');
t.is(typeof failingWftEntry.meta?.['failure'], 'string');
t.is(typeof failingWftEntry.meta?.['runId'], 'string');
t.is(typeof failingWftEntry.meta?.['workflowId'], 'string');
t.is(typeof failingWftEntry.meta?.['subsystem'], 'string');
} finally {
await Runtime.instance().shutdown();
}
});

test.serial('Runtime.instance() throws meaningful error when passed invalid tracing.otel.url', (t) => {
t.throws(() => Runtime.install({ telemetryOptions: { tracing: { otel: { url: ':invalid' } } } }), {
instanceOf: TypeError,
Expand Down
6 changes: 4 additions & 2 deletions packages/worker/src/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
OtelCollectorExporter,
} from '@temporalio/core-bridge';
import { filterNullAndUndefined, normalizeTlsConfig } from '@temporalio/common/lib/internal-non-workflow';
import { IllegalStateError } from '@temporalio/common';
import { IllegalStateError, LogMetadata } from '@temporalio/common';
import { temporal } from '@temporalio/proto';
import { History } from '@temporalio/common/lib/proto-utils';
import { msToNumber } from '@temporalio/common/lib/time';
Expand Down Expand Up @@ -273,8 +273,10 @@ export class Runtime {
const doPoll = async () => {
const logs = await poll(this.native);
for (const log of logs) {
const meta: Record<string | symbol, unknown> = {
const meta: LogMetadata = {
[LogTimestamp]: timeOfDayToBigint(log.timestamp),
subsystem: log.target,
...log.fields,
};
logger.log(log.level, log.message, meta);
}
Expand Down

0 comments on commit 35c6005

Please # to comment.