-
Notifications
You must be signed in to change notification settings - Fork 117
Description
Hello,
What are you really trying to do?
I want to see proper OTEL metadata (spans and traces) propagated from the client invocation site all the way through workflows and activities, so I can visualize the full path in something like Datalust Seq.
Describe the bug
When a workflow starts an activity from its __init__
method, the OpenTelemetry context is not propagated. As a result, the activity runs with a missing/empty span context, even though the client call is wrapped in a valid span.
Minimal Reproduction
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "temporalio[opentelemetry]>=1.16.0",
# ]
# ///
import asyncio
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.worker import Worker
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.testing import WorkflowEnvironment
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor
# ---------- OpenTelemetry setup ----------
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer(__name__)
@activity.defn
async def get_trace_id() -> int:
return trace.get_current_span().get_span_context().trace_id
# ---------- Workflow ----------
@workflow.defn
class DemoWorkflow:
@workflow.init
def __init__(self) -> None:
self._result = workflow.start_activity(
get_trace_id,
start_to_close_timeout=timedelta(seconds=5),
)
@workflow.run
async def run(self) -> int:
return await self._result
# ---------- Main ----------
async def main():
async with await WorkflowEnvironment.start_local(
interceptors=[TracingInterceptor()],
) as env:
queue_name = "some-queue"
async with Worker(
env.client,
task_queue=queue_name,
workflows=[DemoWorkflow],
activities=[get_trace_id],
):
with tracer.start_as_current_span("invocation-site-span") as span:
invocation_trace_id = span.get_span_context().trace_id
activity_trace_id = await env.client.execute_workflow(
DemoWorkflow.run,
id="otel-update-workflow-id-2",
task_queue=queue_name,
)
assert activity_trace_id == invocation_trace_id
if __name__ == "__main__":
asyncio.run(main())
Additional context
I think that WorkflowOutboundInterceptor should have a hook to wrap an __init__
of a workflow, but looking at how/where _instantiate_workflow_object
is called, I don't think that it was planned for. So fixing this might require a refactoring of unclear scale to me.
If this issue is considered worth fixing, I'm willing to try fixing this and would greatly appreciate any clues (e.g. whether or not introducing another hook is the way to go here... or any other advice, really :)