diff --git a/src/prefect/context.py b/src/prefect/context.py index 6e460ff2f2fa..b2852501ed9a 100644 --- a/src/prefect/context.py +++ b/src/prefect/context.py @@ -337,10 +337,15 @@ class EngineContext(RunContext): flow: The flow instance associated with the run flow_run: The API metadata for the flow run task_runner: The task runner instance being used for the flow run - task_run_futures: A list of futures for task runs submitted within this flow run - task_run_states: A list of states for task runs created within this flow run task_run_results: A mapping of result ids to task run states for this flow run - flow_run_states: A list of states for flow runs created within this flow run + log_prints: Whether to log print statements from the flow run + parameters: The parameters passed to the flow run + detached: Flag indicating if context has been serialized and sent to remote infrastructure + result_store: The result store used to persist results + persist_result: Whether to persist the flow run result + task_run_dynamic_keys: Counter for task calls allowing unique keys + observed_flow_pauses: Counter for flow pauses + events: Events worker to emit events """ flow: Optional["Flow[Any, Any]"] = None @@ -373,7 +378,7 @@ class EngineContext(RunContext): __var__: ClassVar[ContextVar[Self]] = ContextVar("flow_run") def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: - return self.model_dump( + serialized = self.model_dump( include={ "flow_run", "flow", @@ -381,13 +386,18 @@ def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]: "log_prints", "start_time", "input_keyset", - "result_store", "persist_result", }, exclude_unset=True, - serialize_as_any=True, context={"include_secrets": include_secrets}, ) + if self.result_store: + serialized["result_store"] = self.result_store.model_dump( + serialize_as_any=True, + exclude_unset=True, + context={"include_secrets": include_secrets}, + ) + return serialized FlowRunContext = EngineContext # for backwards compatibility diff --git a/tests/test_context.py b/tests/test_context.py index 8d51fddbb104..e67952b59aed 100644 --- a/tests/test_context.py +++ b/tests/test_context.py @@ -1,7 +1,10 @@ +import inspect +import multiprocessing +import os import textwrap import uuid from contextvars import ContextVar -from typing import cast +from typing import TYPE_CHECKING, Any, cast from unittest.mock import MagicMock import pytest @@ -9,6 +12,7 @@ import prefect.settings from prefect import flow, task from prefect.client.orchestration import PrefectClient +from prefect.client.schemas.objects import FlowRun, StateType from prefect.context import ( GLOBAL_SETTINGS_CONTEXT, ContextModel, @@ -26,6 +30,8 @@ ) from prefect.exceptions import MissingContextError from prefect.filesystems import LocalFileSystem +from prefect.flow_engine import run_flow +from prefect.flows import Flow from prefect.results import ResultStore, get_result_store from prefect.settings import ( PREFECT_API_KEY, @@ -37,9 +43,12 @@ save_profiles, temporary_settings, ) +from prefect.settings.context import get_current_settings +from prefect.settings.models.root import Settings from prefect.states import Running from prefect.task_runners import ThreadPoolTaskRunner from prefect.types import DateTime +from prefect.utilities.callables import cloudpickle_wrapped_call class ExampleContext(ContextModel): @@ -403,6 +412,74 @@ def foo(): "settings_context": SettingsContext.get().serialize(), } + async def test_serialize_from_subprocess_with_flow_run_from_deployment( + self, prefect_client: PrefectClient + ): + """ + Regression test for https://github.com/PrefectHQ/prefect/issues/16766 and https://github.com/PrefectHQ/prefect/issues/16756 + + This test ensures that context serialization works when the flow run is running in a subprocess, which replicates + the behavior of a flow run that is created from a deployment. + """ + + # Our hero, the flow + @flow + def foo(): + serialize_context() + + # Create a deployment and avoid red squiggles + to_deployment_coro = foo.to_deployment(name="foo") + if TYPE_CHECKING: + assert inspect.iscoroutine(to_deployment_coro) + deployment = await to_deployment_coro + deployment_id_coro = deployment.apply() + if TYPE_CHECKING: + assert inspect.iscoroutine(deployment_id_coro) + deployment_id = await deployment_id_coro + + # Create a flow run from the deployment + flow_run = await prefect_client.create_flow_run_from_deployment( + deployment_id=deployment_id + ) + + # Define a wrapper function to ensure environment variables and settings propagate because + # PYTHON WON'T DO IT FOR US + def run_flow_with_env( + flow: Flow[Any, Any], flow_run: FlowRun, env: dict[str, Any] + ): + """ + This whole song and dance is to ensure that the test settings get to the engine running in a subprocess. + """ + os.environ.update(env) + settings_context = get_settings_context() + with SettingsContext( + profile=settings_context.profile, + # Create a new settings object to pick up the new environment variables + settings=Settings(), + ): + return run_flow(flow, flow_run) + + # Run the flow in a subprocess. Need to use cloudpickle to serialize the flow because + # to flow wasn't declared in the global scope and we can't pickle flows right now. If + # you're reading this and you're thinking "we should be able to pickle flows", you're + # right, and you should try and fix it. + process = multiprocessing.Process( + target=cloudpickle_wrapped_call( + run_flow_with_env, + foo, + flow_run, + os.environ + | get_current_settings().to_environment_variables(exclude_unset=True), + ) + ) + process.start() + process.join() + + # Check that the flow run completed successfully + flow_run = await prefect_client.read_flow_run(flow_run.id) + assert flow_run.state + assert flow_run.state.type == StateType.COMPLETED + async def test_with_task_run_context(self, prefect_client, flow_run): @task def bar():