From 1a1290612b5b08c9bbd3409ee4b56ad5da07a2fa Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Thu, 23 Jan 2025 11:05:55 -0600 Subject: [PATCH 1/3] Fix serializtion for `FlowRunContext` for flow runs kicked off from a deployment --- src/prefect/context.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/prefect/context.py b/src/prefect/context.py index 6e460ff2f2fa..6f102aabd78e 100644 --- a/src/prefect/context.py +++ b/src/prefect/context.py @@ -373,7 +373,7 @@ class EngineContext(RunContext): __var__: ClassVar[ContextVar[Self]] = ContextVar("flow_run") def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: - return self.model_dump( + serialized = self.model_dump( include={ "flow_run", "flow", @@ -381,13 +381,18 @@ def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: "log_prints", "start_time", "input_keyset", - "result_store", "persist_result", }, exclude_unset=True, - serialize_as_any=True, context={"include_secrets": include_secrets}, ) + if self.result_store: + serialized["result_store"] = self.result_store.model_dump( + serialize_as_any=True, + exclude_unset=True, + context={"include_secrets": include_secrets}, + ) + return serialized FlowRunContext = EngineContext # for backwards compatibility From f0ad6b10ef4be084d7b74867494f00cb21d4fbf1 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Thu, 23 Jan 2025 13:42:44 -0600 Subject: [PATCH 2/3] Adds a failing test --- src/prefect/context.py | 11 +++-- tests/runner/test_run_flow.py | 18 ++++++++ tests/test_context.py | 79 ++++++++++++++++++++++++++++++++++- 3 files changed, 104 insertions(+), 4 deletions(-) create mode 100644 tests/runner/test_run_flow.py diff --git a/src/prefect/context.py b/src/prefect/context.py index 6f102aabd78e..b2852501ed9a 100644 --- a/src/prefect/context.py +++ b/src/prefect/context.py @@ -337,10 +337,15 @@ class EngineContext(RunContext): flow: The flow instance associated with the run flow_run: The API metadata for the flow run task_runner: The task runner instance being used for the flow run - task_run_futures: A list of futures for task runs submitted within this flow run - task_run_states: A list of states for task runs created within this flow run task_run_results: A mapping of result ids to task run states for this flow run - flow_run_states: A list of states for flow runs created within this flow run + log_prints: Whether to log print statements from the flow run + parameters: The parameters passed to the flow run + detached: Flag indicating if context has been serialized and sent to remote infrastructure + result_store: The result store used to persist results + persist_result: Whether to persist the flow run result + task_run_dynamic_keys: Counter for task calls allowing unique keys + observed_flow_pauses: Counter for flow pauses + events: Events worker to emit events """ flow: Optional["Flow[Any, Any]"] = None diff --git a/tests/runner/test_run_flow.py b/tests/runner/test_run_flow.py new file mode 100644 index 000000000000..8e52637b3484 --- /dev/null +++ b/tests/runner/test_run_flow.py @@ -0,0 +1,18 @@ +from prefect.client.orchestration import PrefectClient +from prefect.flows import flow +from prefect.runner import Runner +from prefect.states import StateType + + +class TestRunFlowWithRunner: + async def test_run_flow_with_runner(self, prefect_client: PrefectClient): + @flow + def local_dummy_flow(): + pass + + flow_run = await prefect_client.create_flow_run(flow=local_dummy_flow) + runner = Runner() + await runner.run_flow(local_dummy_flow, flow_run) + + flow_run = await prefect_client.read_flow_run(flow_run.id) + assert flow_run.state.type == StateType.COMPLETED diff --git a/tests/test_context.py b/tests/test_context.py index 8d51fddbb104..e67952b59aed 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -1,7 +1,10 @@ +import inspect +import multiprocessing +import os import textwrap import uuid from contextvars import ContextVar -from typing import cast +from typing import TYPE_CHECKING, Any, cast from unittest.mock import MagicMock import pytest @@ -9,6 +12,7 @@ import prefect.settings from prefect import flow, task from prefect.client.orchestration import PrefectClient +from prefect.client.schemas.objects import FlowRun, StateType from prefect.context import ( GLOBAL_SETTINGS_CONTEXT, ContextModel, @@ -26,6 +30,8 @@ ) from prefect.exceptions import MissingContextError from prefect.filesystems import LocalFileSystem +from prefect.flow_engine import run_flow +from prefect.flows import Flow from prefect.results import ResultStore, get_result_store from prefect.settings import ( PREFECT_API_KEY, @@ -37,9 +43,12 @@ save_profiles, temporary_settings, ) +from prefect.settings.context import get_current_settings +from prefect.settings.models.root import Settings from prefect.states import Running from prefect.task_runners import ThreadPoolTaskRunner from prefect.types import DateTime +from prefect.utilities.callables import cloudpickle_wrapped_call class ExampleContext(ContextModel): @@ -403,6 +412,74 @@ def foo(): "settings_context": SettingsContext.get().serialize(), } + async def test_serialize_from_subprocess_with_flow_run_from_deployment( + self, prefect_client: PrefectClient + ): + """ + Regression test for https://github.com/PrefectHQ/prefect/issues/16766 and https://github.com/PrefectHQ/prefect/issues/16756 + + This test ensures that context serialization works when the flow run is running in a subprocess, which replicates + the behavior of a flow run that is created from a deployment. + """ + + # Our hero, the flow + @flow + def foo(): + serialize_context() + + # Create a deployment and avoid red squiggles + to_deployment_coro = foo.to_deployment(name="foo") + if TYPE_CHECKING: + assert inspect.iscoroutine(to_deployment_coro) + deployment = await to_deployment_coro + deployment_id_coro = deployment.apply() + if TYPE_CHECKING: + assert inspect.iscoroutine(deployment_id_coro) + deployment_id = await deployment_id_coro + + # Create a flow run from the deployment + flow_run = await prefect_client.create_flow_run_from_deployment( + deployment_id=deployment_id + ) + + # Define a wrapper function to ensure environment variables and settings propagate because + # PYTHON WON'T DO IT FOR US + def run_flow_with_env( + flow: Flow[Any, Any], flow_run: FlowRun, env: dict[str, Any] + ): + """ + This whole song and dance is to ensure that the test settings get to the engine running in a subprocess. + """ + os.environ.update(env) + settings_context = get_settings_context() + with SettingsContext( + profile=settings_context.profile, + # Create a new settings object to pick up the new environment variables + settings=Settings(), + ): + return run_flow(flow, flow_run) + + # Run the flow in a subprocess. Need to use cloudpickle to serialize the flow because + # to flow wasn't declared in the global scope and we can't pickle flows right now. If + # you're reading this and you're thinking "we should be able to pickle flows", you're + # right, and you should try and fix it. + process = multiprocessing.Process( + target=cloudpickle_wrapped_call( + run_flow_with_env, + foo, + flow_run, + os.environ + | get_current_settings().to_environment_variables(exclude_unset=True), + ) + ) + process.start() + process.join() + + # Check that the flow run completed successfully + flow_run = await prefect_client.read_flow_run(flow_run.id) + assert flow_run.state + assert flow_run.state.type == StateType.COMPLETED + async def test_with_task_run_context(self, prefect_client, flow_run): @task def bar(): From 7d757463f7577bbac9c5c8022cd679da6bacc6a0 Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Thu, 23 Jan 2025 13:43:50 -0600 Subject: [PATCH 3/3] Remove stray file --- tests/runner/test_run_flow.py | 18 ------------------ 1 file changed, 18 deletions(-) delete mode 100644 tests/runner/test_run_flow.py diff --git a/tests/runner/test_run_flow.py b/tests/runner/test_run_flow.py deleted file mode 100644 index 8e52637b3484..000000000000 --- a/tests/runner/test_run_flow.py +++ /dev/null @@ -1,18 +0,0 @@ -from prefect.client.orchestration import PrefectClient -from prefect.flows import flow -from prefect.runner import Runner -from prefect.states import StateType - - -class TestRunFlowWithRunner: - async def test_run_flow_with_runner(self, prefect_client: PrefectClient): - @flow - def local_dummy_flow(): - pass - - flow_run = await prefect_client.create_flow_run(flow=local_dummy_flow) - runner = Runner() - await runner.run_flow(local_dummy_flow, flow_run) - - flow_run = await prefect_client.read_flow_run(flow_run.id) - assert flow_run.state.type == StateType.COMPLETED