From 2afce8d9710d769180397834ee61ea88d2d2525e Mon Sep 17 00:00:00 2001 From: nicholas Date: Thu, 23 Jan 2025 16:38:54 -0500 Subject: [PATCH] Add an event name as an optional field to the experimental interface; make sure context resources are registered as lineage resources in the global namespace --- src/prefect/_experimental/lineage.py | 9 ++-- tests/experimental/test_lineage.py | 71 ++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/src/prefect/_experimental/lineage.py b/src/prefect/_experimental/lineage.py index 0300f6f7cd43..b188314fc392 100644 --- a/src/prefect/_experimental/lineage.py +++ b/src/prefect/_experimental/lineage.py @@ -181,6 +181,7 @@ async def emit_result_write_event( async def emit_external_resource_lineage( + event_name: str = "prefect.lineage.event", upstream_resources: Optional[UpstreamResources] = None, downstream_resources: Optional[DownstreamResources] = None, ) -> None: @@ -210,7 +211,7 @@ async def emit_external_resource_lineage( context_resources = await related_resources_from_run_context(client) # Add lineage group label to all resources - for res in upstream_resources + downstream_resources: + for res in upstream_resources + downstream_resources + context_resources: if "prefect.resource.lineage-group" not in res: res["prefect.resource.lineage-group"] = "global" @@ -218,7 +219,7 @@ async def emit_external_resource_lineage( if upstream_resources: for context_resource in context_resources: emit_kwargs: Dict[str, Any] = { - "event": "prefect.resource.consumed", + "event": "prefect.lineage.upstream-interaction", "resource": context_resource, "related": upstream_resources, } @@ -227,7 +228,7 @@ async def emit_external_resource_lineage( # For each downstream resource, emit an event showing it as downstream of context resources for downstream_resource in downstream_resources: emit_kwargs: Dict[str, Any] = { - "event": "prefect.resource.produced", + "event": "prefect.lineage.downstream-interaction", "resource": downstream_resource, "related": context_resources, } @@ -236,7 +237,7 @@ async def emit_external_resource_lineage( # For each downstream resource, emit an event showing it as downstream of upstream resources if upstream_resources: direct_emit_kwargs = { - "event": "prefect.resource.direct-lineage", + "event": event_name, "resource": downstream_resource, "related": upstream_resources, } diff --git a/tests/experimental/test_lineage.py b/tests/experimental/test_lineage.py index 59facf92322a..66b461fc2555 100644 --- a/tests/experimental/test_lineage.py +++ b/tests/experimental/test_lineage.py @@ -232,7 +232,7 @@ async def test_flow(): # Should emit one event per context resource (flow and flow-run) assert mock_emit_event.call_count == 2 for call in mock_emit_event.call_args_list: - assert call.kwargs["event"] == "prefect.resource.consumed" + assert call.kwargs["event"] == "prefect.lineage.upstream-interaction" assert "prefect.resource.role" in call.kwargs["resource"] assert call.kwargs["related"] == [ { @@ -262,7 +262,7 @@ async def test_flow(): mock_emit_event.assert_called_once() call_args = mock_emit_event.call_args.kwargs - assert call_args["event"] == "prefect.resource.produced" + assert call_args["event"] == "prefect.lineage.downstream-interaction" assert call_args["resource"] == { "prefect.resource.id": "downstream1", "prefect.resource.role": "data-destination", @@ -306,7 +306,7 @@ async def test_flow(): # Check context resources consuming upstream (first two events) context_calls = mock_emit_event.call_args_list[:2] for call in context_calls: - assert call.kwargs["event"] == "prefect.resource.consumed" + assert call.kwargs["event"] == "prefect.lineage.upstream-interaction" assert "prefect.resource.role" in call.kwargs["resource"] assert call.kwargs["related"] == [ { @@ -318,7 +318,7 @@ async def test_flow(): # Check downstream produced by context (third event) produced_call = mock_emit_event.call_args_list[2] - assert produced_call.kwargs["event"] == "prefect.resource.produced" + assert produced_call.kwargs["event"] == "prefect.lineage.downstream-interaction" assert produced_call.kwargs["resource"] == { "prefect.resource.id": "downstream1", "prefect.resource.role": "data-destination", @@ -336,7 +336,7 @@ async def test_flow(): # Check direct lineage event (fourth event) direct_call = mock_emit_event.call_args_list[3] - assert direct_call.kwargs["event"] == "prefect.resource.direct-lineage" + assert direct_call.kwargs["event"] == "prefect.lineage.event" assert direct_call.kwargs["resource"] == { "prefect.resource.id": "downstream1", "prefect.resource.role": "data-destination", @@ -374,6 +374,67 @@ async def test_flow(): await test_flow() mock_emit_event.assert_not_called() + async def test_emit_external_resource_lineage_custom_event_name( + self, enable_lineage_events, mock_emit_event + ): + upstream_resources = [ + { + "prefect.resource.id": "upstream1", + "prefect.resource.role": "data-source", + } + ] + downstream_resources = [ + { + "prefect.resource.id": "downstream1", + "prefect.resource.role": "data-destination", + } + ] + + @flow + async def test_flow(): + # Test with custom event name + await emit_external_resource_lineage( + event_name="custom.lineage.event", + upstream_resources=upstream_resources, + downstream_resources=downstream_resources, + ) + + # Test with default event name + await emit_external_resource_lineage( + upstream_resources=upstream_resources, + downstream_resources=downstream_resources, + ) + + await test_flow() + + # We expect 8 total events (4 for each call): + # - 2 upstream interaction events + # - 1 downstream interaction event + # - 1 direct lineage event + assert mock_emit_event.call_count == 8 + + # Check the direct lineage events (4th and 8th calls) + custom_direct_call = mock_emit_event.call_args_list[3] + assert custom_direct_call.kwargs["event"] == "custom.lineage.event" + + default_direct_call = mock_emit_event.call_args_list[7] + assert default_direct_call.kwargs["event"] == "prefect.lineage.event" + + # Verify the rest of the event structure remains correct + for direct_call in [custom_direct_call, default_direct_call]: + assert direct_call.kwargs["resource"] == { + "prefect.resource.id": "downstream1", + "prefect.resource.role": "data-destination", + "prefect.resource.lineage-group": "global", + } + assert direct_call.kwargs["related"] == [ + { + "prefect.resource.id": "upstream1", + "prefect.resource.role": "data-source", + "prefect.resource.lineage-group": "global", + } + ] + class TestEmitResultEvents: async def test_emit_result_read_event(