Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fix serializtion for FlowRunContext for flow runs kicked off from a deployment #16831

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions src/prefect/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,15 @@ class EngineContext(RunContext):
flow: The flow instance associated with the run
flow_run: The API metadata for the flow run
task_runner: The task runner instance being used for the flow run
task_run_futures: A list of futures for task runs submitted within this flow run
task_run_states: A list of states for task runs created within this flow run
task_run_results: A mapping of result ids to task run states for this flow run
flow_run_states: A list of states for flow runs created within this flow run
log_prints: Whether to log print statements from the flow run
parameters: The parameters passed to the flow run
detached: Flag indicating if context has been serialized and sent to remote infrastructure
result_store: The result store used to persist results
persist_result: Whether to persist the flow run result
task_run_dynamic_keys: Counter for task calls allowing unique keys
observed_flow_pauses: Counter for flow pauses
events: Events worker to emit events
"""

flow: Optional["Flow[Any, Any]"] = None
Expand Down Expand Up @@ -373,21 +378,26 @@ class EngineContext(RunContext):
__var__: ClassVar[ContextVar[Self]] = ContextVar("flow_run")

def serialize(self: Self, include_secrets: bool = True) -> dict[str, Any]:
return self.model_dump(
serialized = self.model_dump(
include={
"flow_run",
"flow",
"parameters",
"log_prints",
"start_time",
"input_keyset",
"result_store",
"persist_result",
},
exclude_unset=True,
serialize_as_any=True,
context={"include_secrets": include_secrets},
)
if self.result_store:
serialized["result_store"] = self.result_store.model_dump(
serialize_as_any=True,
exclude_unset=True,
context={"include_secrets": include_secrets},
)
return serialized


FlowRunContext = EngineContext # for backwards compatibility
Expand Down
79 changes: 78 additions & 1 deletion tests/test_context.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import inspect
import multiprocessing
import os
import textwrap
import uuid
from contextvars import ContextVar
from typing import cast
from typing import TYPE_CHECKING, Any, cast
from unittest.mock import MagicMock

import pytest

import prefect.settings
from prefect import flow, task
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.objects import FlowRun, StateType
from prefect.context import (
GLOBAL_SETTINGS_CONTEXT,
ContextModel,
Expand All @@ -26,6 +30,8 @@
)
from prefect.exceptions import MissingContextError
from prefect.filesystems import LocalFileSystem
from prefect.flow_engine import run_flow
from prefect.flows import Flow
from prefect.results import ResultStore, get_result_store
from prefect.settings import (
PREFECT_API_KEY,
Expand All @@ -37,9 +43,12 @@
save_profiles,
temporary_settings,
)
from prefect.settings.context import get_current_settings
from prefect.settings.models.root import Settings
from prefect.states import Running
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.types import DateTime
from prefect.utilities.callables import cloudpickle_wrapped_call


class ExampleContext(ContextModel):
Expand Down Expand Up @@ -403,6 +412,74 @@ def foo():
"settings_context": SettingsContext.get().serialize(),
}

async def test_serialize_from_subprocess_with_flow_run_from_deployment(
self, prefect_client: PrefectClient
):
"""
Regression test for https://github.com/PrefectHQ/prefect/issues/16766 and https://github.com/PrefectHQ/prefect/issues/16756

This test ensures that context serialization works when the flow run is running in a subprocess, which replicates
the behavior of a flow run that is created from a deployment.
"""

# Our hero, the flow
@flow
def foo():
serialize_context()

# Create a deployment and avoid red squiggles
to_deployment_coro = foo.to_deployment(name="foo")
if TYPE_CHECKING:
assert inspect.iscoroutine(to_deployment_coro)
deployment = await to_deployment_coro
deployment_id_coro = deployment.apply()
if TYPE_CHECKING:
assert inspect.iscoroutine(deployment_id_coro)
deployment_id = await deployment_id_coro

# Create a flow run from the deployment
flow_run = await prefect_client.create_flow_run_from_deployment(
deployment_id=deployment_id
)

# Define a wrapper function to ensure environment variables and settings propagate because
# PYTHON WON'T DO IT FOR US
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🫠

def run_flow_with_env(
flow: Flow[Any, Any], flow_run: FlowRun, env: dict[str, Any]
):
"""
This whole song and dance is to ensure that the test settings get to the engine running in a subprocess.
"""
os.environ.update(env)
settings_context = get_settings_context()
with SettingsContext(
profile=settings_context.profile,
# Create a new settings object to pick up the new environment variables
settings=Settings(),
):
return run_flow(flow, flow_run)

# Run the flow in a subprocess. Need to use cloudpickle to serialize the flow because
# to flow wasn't declared in the global scope and we can't pickle flows right now. If
# you're reading this and you're thinking "we should be able to pickle flows", you're
# right, and you should try and fix it.
process = multiprocessing.Process(
target=cloudpickle_wrapped_call(
run_flow_with_env,
foo,
flow_run,
os.environ
| get_current_settings().to_environment_variables(exclude_unset=True),
)
)
process.start()
process.join()

# Check that the flow run completed successfully
flow_run = await prefect_client.read_flow_run(flow_run.id)
assert flow_run.state
assert flow_run.state.type == StateType.COMPLETED

async def test_with_task_run_context(self, prefect_client, flow_run):
@task
def bar():
Expand Down
Loading