Skip to content

Commit d6a4992

Browse files
iravidmjameswh
andauthored
Instrument signals with OTel (#1449)
Co-authored-by: James Watkins-Harvey <mjameswh@users.noreply.github.com>
1 parent 00a48d2 commit d6a4992

File tree

5 files changed

+74
-3
lines changed

5 files changed

+74
-3
lines changed

packages/interceptors-opentelemetry/src/client/index.ts

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import * as otel from '@opentelemetry/api';
2-
import { Next, WorkflowClientInterceptor, WorkflowStartInput } from '@temporalio/client';
2+
import { Next, WorkflowClientInterceptor, WorkflowSignalInput, WorkflowStartInput } from '@temporalio/client';
33
import { instrument, headersWithContext, RUN_ID_ATTR_KEY } from '../instrumentation';
44
import { SpanName, SPAN_DELIMITER } from '../workflow';
55

@@ -31,4 +31,15 @@ export class OpenTelemetryWorkflowClientInterceptor implements WorkflowClientInt
3131
},
3232
});
3333
}
34+
35+
async signal(input: WorkflowSignalInput, next: Next<WorkflowClientInterceptor, 'signal'>): Promise<void> {
36+
return await instrument({
37+
tracer: this.tracer,
38+
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
39+
fn: async () => {
40+
const headers = await headersWithContext(input.headers);
41+
await next({ ...input, headers });
42+
},
43+
});
44+
}
3445
}

packages/interceptors-opentelemetry/src/workflow/definitions.ts

+5
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ export enum SpanName {
4444
*/
4545
WORKFLOW_START = 'StartWorkflow',
4646

47+
/**
48+
* Workflow is signalled
49+
*/
50+
WORKFLOW_SIGNAL = 'SignalWorkflow',
51+
4752
/**
4853
* Workflow is client calls signalWithStart
4954
*/

packages/interceptors-opentelemetry/src/workflow/index.ts

+32
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99
ContinueAsNewInput,
1010
DisposeInput,
1111
Next,
12+
SignalInput,
13+
SignalWorkflowInput,
1214
StartChildWorkflowExecutionInput,
1315
WorkflowExecuteInput,
1416
WorkflowInboundCallsInterceptor,
@@ -61,6 +63,19 @@ export class OpenTelemetryInboundInterceptor implements WorkflowInboundCallsInte
6163
acceptableErrors: (err) => err instanceof ContinueAsNew,
6264
});
6365
}
66+
67+
public async handleSignal(
68+
input: SignalInput,
69+
next: Next<WorkflowInboundCallsInterceptor, 'handleSignal'>
70+
): Promise<void> {
71+
const context = await extractContextFromHeaders(input.headers);
72+
return await instrument({
73+
tracer: this.tracer,
74+
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
75+
fn: () => next(input),
76+
context,
77+
});
78+
}
6479
}
6580

6681
/**
@@ -122,6 +137,23 @@ export class OpenTelemetryOutboundInterceptor implements WorkflowOutboundCallsIn
122137
acceptableErrors: (err) => err instanceof ContinueAsNew,
123138
});
124139
}
140+
141+
public async signalWorkflow(
142+
input: SignalWorkflowInput,
143+
next: Next<WorkflowOutboundCallsInterceptor, 'signalWorkflow'>
144+
): Promise<void> {
145+
return await instrument({
146+
tracer: this.tracer,
147+
spanName: `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}${input.signalName}`,
148+
fn: async () => {
149+
const headers = await headersWithContext(input.headers);
150+
return next({
151+
...input,
152+
headers,
153+
});
154+
},
155+
});
156+
}
125157
}
126158

127159
export class OpenTelemetryInternalsInterceptor implements WorkflowInternalsInterceptor {

packages/test/src/activities/helpers.ts

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,19 @@
1+
import { OpenTelemetryWorkflowClientCallsInterceptor } from '@temporalio/interceptors-opentelemetry';
12
import { Client, WorkflowHandle } from '@temporalio/client';
23
import { QueryDefinition } from '@temporalio/common';
34
import { getContext } from './interceptors';
45

56
function getSchedulingWorkflowHandle(): WorkflowHandle {
67
const { info, connection, dataConverter } = getContext();
78
const { workflowExecution } = info;
8-
const client = new Client({ connection, namespace: info.workflowNamespace, dataConverter });
9+
const client = new Client({
10+
connection,
11+
namespace: info.workflowNamespace,
12+
dataConverter,
13+
interceptors: {
14+
workflow: [new OpenTelemetryWorkflowClientCallsInterceptor()],
15+
},
16+
});
917
return client.workflow.getHandle(workflowExecution.workflowId, workflowExecution.runId);
1018
}
1119

packages/test/src/test-otel.ts

+16-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,22 @@ if (RUN_INTEGRATION_TESTS) {
232232
parentSpanId === childWorkflowStartSpan?.spanContext().spanId
233233
);
234234
t.true(childWorkflowExecuteSpan !== undefined);
235-
t.true(new Set(spans.map((span) => span.spanContext().traceId)).size === 1);
235+
236+
const signalChildWithUnblockSpan = spans.find(
237+
({ name, parentSpanId }) =>
238+
name === `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}unblock` &&
239+
parentSpanId === parentExecuteSpan?.spanContext().spanId
240+
);
241+
t.true(signalChildWithUnblockSpan !== undefined);
242+
243+
const activityStartedSignalSpan = spans.find(
244+
({ name, parentSpanId }) =>
245+
name === `${SpanName.WORKFLOW_SIGNAL}${SPAN_DELIMITER}activityStarted` &&
246+
parentSpanId === firstActivityExecuteSpan?.spanContext().spanId
247+
);
248+
t.true(activityStartedSignalSpan !== undefined);
249+
250+
t.deepEqual(new Set(spans.map((span) => span.spanContext().traceId)).size, 1);
236251
});
237252

238253
// Un-skip this test and run it by hand to inspect outputted traces

0 commit comments

Comments
 (0)