Skip to content

Commit

Permalink
Add an event name as an optional field to the experimental interface;…
Browse files Browse the repository at this point in the history
… make sure context resources are registered as lineage resources in the global namespace
  • Loading branch information
znicholasbrown committed Jan 23, 2025
1 parent 2b6f0d1 commit 2afce8d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 9 deletions.
9 changes: 5 additions & 4 deletions src/prefect/_experimental/lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ async def emit_result_write_event(


async def emit_external_resource_lineage(
event_name: str = "prefect.lineage.event",
upstream_resources: Optional[UpstreamResources] = None,
downstream_resources: Optional[DownstreamResources] = None,
) -> None:
Expand Down Expand Up @@ -210,15 +211,15 @@ async def emit_external_resource_lineage(
context_resources = await related_resources_from_run_context(client)

# Add lineage group label to all resources
for res in upstream_resources + downstream_resources:
for res in upstream_resources + downstream_resources + context_resources:
if "prefect.resource.lineage-group" not in res:
res["prefect.resource.lineage-group"] = "global"

# For each context resource, emit an event showing it as downstream of upstream resources
if upstream_resources:
for context_resource in context_resources:
emit_kwargs: Dict[str, Any] = {
"event": "prefect.resource.consumed",
"event": "prefect.lineage.upstream-interaction",
"resource": context_resource,
"related": upstream_resources,
}
Expand All @@ -227,7 +228,7 @@ async def emit_external_resource_lineage(
# For each downstream resource, emit an event showing it as downstream of context resources
for downstream_resource in downstream_resources:
emit_kwargs: Dict[str, Any] = {
"event": "prefect.resource.produced",
"event": "prefect.lineage.downstream-interaction",
"resource": downstream_resource,
"related": context_resources,
}
Expand All @@ -236,7 +237,7 @@ async def emit_external_resource_lineage(
# For each downstream resource, emit an event showing it as downstream of upstream resources
if upstream_resources:
direct_emit_kwargs = {
"event": "prefect.resource.direct-lineage",
"event": event_name,
"resource": downstream_resource,
"related": upstream_resources,
}
Expand Down
71 changes: 66 additions & 5 deletions tests/experimental/test_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ async def test_flow():
# Should emit one event per context resource (flow and flow-run)
assert mock_emit_event.call_count == 2
for call in mock_emit_event.call_args_list:
assert call.kwargs["event"] == "prefect.resource.consumed"
assert call.kwargs["event"] == "prefect.lineage.upstream-interaction"
assert "prefect.resource.role" in call.kwargs["resource"]
assert call.kwargs["related"] == [
{
Expand Down Expand Up @@ -262,7 +262,7 @@ async def test_flow():

mock_emit_event.assert_called_once()
call_args = mock_emit_event.call_args.kwargs
assert call_args["event"] == "prefect.resource.produced"
assert call_args["event"] == "prefect.lineage.downstream-interaction"
assert call_args["resource"] == {
"prefect.resource.id": "downstream1",
"prefect.resource.role": "data-destination",
Expand Down Expand Up @@ -306,7 +306,7 @@ async def test_flow():
# Check context resources consuming upstream (first two events)
context_calls = mock_emit_event.call_args_list[:2]
for call in context_calls:
assert call.kwargs["event"] == "prefect.resource.consumed"
assert call.kwargs["event"] == "prefect.lineage.upstream-interaction"
assert "prefect.resource.role" in call.kwargs["resource"]
assert call.kwargs["related"] == [
{
Expand All @@ -318,7 +318,7 @@ async def test_flow():

# Check downstream produced by context (third event)
produced_call = mock_emit_event.call_args_list[2]
assert produced_call.kwargs["event"] == "prefect.resource.produced"
assert produced_call.kwargs["event"] == "prefect.lineage.downstream-interaction"
assert produced_call.kwargs["resource"] == {
"prefect.resource.id": "downstream1",
"prefect.resource.role": "data-destination",
Expand All @@ -336,7 +336,7 @@ async def test_flow():

# Check direct lineage event (fourth event)
direct_call = mock_emit_event.call_args_list[3]
assert direct_call.kwargs["event"] == "prefect.resource.direct-lineage"
assert direct_call.kwargs["event"] == "prefect.lineage.event"
assert direct_call.kwargs["resource"] == {
"prefect.resource.id": "downstream1",
"prefect.resource.role": "data-destination",
Expand Down Expand Up @@ -374,6 +374,67 @@ async def test_flow():
await test_flow()
mock_emit_event.assert_not_called()

async def test_emit_external_resource_lineage_custom_event_name(
self, enable_lineage_events, mock_emit_event
):
upstream_resources = [
{
"prefect.resource.id": "upstream1",
"prefect.resource.role": "data-source",
}
]
downstream_resources = [
{
"prefect.resource.id": "downstream1",
"prefect.resource.role": "data-destination",
}
]

@flow
async def test_flow():
# Test with custom event name
await emit_external_resource_lineage(
event_name="custom.lineage.event",
upstream_resources=upstream_resources,
downstream_resources=downstream_resources,
)

# Test with default event name
await emit_external_resource_lineage(
upstream_resources=upstream_resources,
downstream_resources=downstream_resources,
)

await test_flow()

# We expect 8 total events (4 for each call):
# - 2 upstream interaction events
# - 1 downstream interaction event
# - 1 direct lineage event
assert mock_emit_event.call_count == 8

# Check the direct lineage events (4th and 8th calls)
custom_direct_call = mock_emit_event.call_args_list[3]
assert custom_direct_call.kwargs["event"] == "custom.lineage.event"

default_direct_call = mock_emit_event.call_args_list[7]
assert default_direct_call.kwargs["event"] == "prefect.lineage.event"

# Verify the rest of the event structure remains correct
for direct_call in [custom_direct_call, default_direct_call]:
assert direct_call.kwargs["resource"] == {
"prefect.resource.id": "downstream1",
"prefect.resource.role": "data-destination",
"prefect.resource.lineage-group": "global",
}
assert direct_call.kwargs["related"] == [
{
"prefect.resource.id": "upstream1",
"prefect.resource.role": "data-source",
"prefect.resource.lineage-group": "global",
}
]


class TestEmitResultEvents:
async def test_emit_result_read_event(
Expand Down

0 comments on commit 2afce8d

Please # to comment.