From c8986edebb2dde3e2a931adbe24d2eaefcb799cb Mon Sep 17 00:00:00 2001 From: Alex Streed Date: Fri, 7 Feb 2025 20:02:21 -0600 Subject: [PATCH] Update the SDK to use `PATCH /deployments/{id}` endpoint for existing deployments (#17050) Co-authored-by: Chris White Co-authored-by: nate nowack --- flows/schedule_statefulness.py | 80 ++++++++ src/prefect/cli/deploy.py | 76 ++++++-- .../orchestration/_deployments/client.py | 12 +- src/prefect/client/schemas/actions.py | 2 + src/prefect/deployments/runner.py | 134 +++++++++---- src/prefect/runner/storage.py | 4 +- src/prefect/server/api/deployments.py | 4 +- src/prefect/server/schemas/actions.py | 1 + src/prefect/testing/utilities.py | 8 +- src/prefect/types/__init__.py | 3 +- tests/cli/test_deploy.py | 141 +++++++++++++- tests/conftest.py | 1 - tests/runner/test_runner.py | 184 +++++++++++++++++- 13 files changed, 580 insertions(+), 70 deletions(-) create mode 100644 flows/schedule_statefulness.py diff --git a/flows/schedule_statefulness.py b/flows/schedule_statefulness.py new file mode 100644 index 000000000000..66e674cabab2 --- /dev/null +++ b/flows/schedule_statefulness.py @@ -0,0 +1,80 @@ +""" +this integration test should + +- start serve w a schedule (no slug) +- stop the serve process +- start serve with no schedule +- observe that deployment has no schedules + + +- start serve with schedule (slug) +- stop the serve process +- start serve with no schedule +- stop the serve process +- observe that deployment still has that named schedule + +""" + +import signal +from typing import Any + +from prefect import flow, get_client +from prefect.client.schemas.objects import DeploymentSchedule +from prefect.schedules import Cron + +DEPLOYMENT_NAME = "my-deployment" + + +@flow +def my_flow(): + print("Hello, world!") + + +def _handler(signum: int, frame: Any): + raise KeyboardInterrupt("Simulating user interruption") + + +def run_serve_with_schedule( + timeout: int = 2, serve_kwargs: dict[str, Any] | None = None +): + signal.signal(signal.SIGALRM, _handler) + signal.alarm(timeout) + try: + my_flow.serve(name=DEPLOYMENT_NAME, **(serve_kwargs or {})) + except KeyboardInterrupt: + print("Serve interrupted") + finally: + signal.alarm(0) + + +def check_deployment_schedules(deployment_name: str) -> list[DeploymentSchedule]: + with get_client(sync_client=True) as client: + deployment = client.read_deployment_by_name(deployment_name) + return deployment.schedules + + +def main(): + # case 1: Schedule without slug + print("\nTest case 1: Schedule without slug") + run_serve_with_schedule(serve_kwargs={"schedules": [Cron("0 9 * * *")]}) + run_serve_with_schedule(serve_kwargs={"schedules": []}) + schedules = check_deployment_schedules(f"my-flow/{DEPLOYMENT_NAME}") + assert not schedules, ( + f"Expected no schedules after removing unnamed schedule: {schedules}" + ) + + # case 2: Schedule with slug + print("\nTest case 2: Schedule with slug") + run_serve_with_schedule( + serve_kwargs={"schedules": [Cron("0 9 * * *", slug="every-day-at-9am")]} + ) + run_serve_with_schedule(serve_kwargs={"schedules": []}) + schedules = check_deployment_schedules(f"my-flow/{DEPLOYMENT_NAME}") + assert any(s.slug == "every-day-at-9am" for s in schedules), ( + f"Expected named schedule to persist: {schedules}" + ) + print("All tests passed!") + + +if __name__ == "__main__": + main() diff --git a/src/prefect/cli/deploy.py b/src/prefect/cli/deploy.py index 509b6677d5a6..6c0426996096 100644 --- a/src/prefect/cli/deploy.py +++ b/src/prefect/cli/deploy.py @@ -2,6 +2,7 @@ from __future__ import annotations +import inspect import json import os import re @@ -58,6 +59,7 @@ _format_deployment_for_saving_to_prefect_file, _save_deployment_to_prefect_file, ) +from prefect.deployments.runner import RunnerDeployment from prefect.deployments.steps.core import run_steps from prefect.events import DeploymentTriggerTypes, TriggerTypes from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError @@ -89,6 +91,36 @@ SlaAdapter: TypeAdapter[SlaTypes] = TypeAdapter(SlaTypes) +class _PullStepStorage: + """ + A shim storage class that allows passing pull steps to a `RunnerDeployment`. + """ + + def __init__(self, pull_steps: list[dict[str, Any]]): + self._base_path = Path.cwd() + self.pull_steps = pull_steps + + def set_base_path(self, path: Path): + self._base_path = path + + @property + def destination(self): + return self._base_path + + @property + def pull_interval(self): + return 60 + + async def pull_code(self): + pass + + def to_pull_step(self): + return self.pull_steps + + def __eq__(self, other: Any) -> bool: + return self.pull_steps == getattr(other, "pull_steps", None) + + @app.command() async def init( name: Optional[str] = None, @@ -730,35 +762,41 @@ async def _run_single_deploy( pull_steps = apply_values(pull_steps, step_outputs, remove_notset=False) - flow_id = await client.create_flow_from_name(deploy_config["flow_name"]) - - deployment_id = await client.create_deployment( - flow_id=flow_id, - name=deploy_config.get("name"), - work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"), + deployment = RunnerDeployment( + name=deploy_config["name"], + flow_name=deploy_config.get("flow_name"), + entrypoint=deploy_config.get("entrypoint"), work_pool_name=get_from_dict(deploy_config, "work_pool.name"), - version=deploy_config.get("version"), - schedules=deploy_config.get("schedules"), - paused=deploy_config.get("paused"), - enforce_parameter_schema=deploy_config.get("enforce_parameter_schema", True), - parameter_openapi_schema=deploy_config.get( - "parameter_openapi_schema" - ).model_dump_for_openapi(), + work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"), parameters=deploy_config.get("parameters"), description=deploy_config.get("description"), - tags=deploy_config.get("tags", []), + version=deploy_config.get("version"), + tags=deploy_config.get("tags"), concurrency_limit=deploy_config.get("concurrency_limit"), concurrency_options=deploy_config.get("concurrency_options"), - entrypoint=deploy_config.get("entrypoint"), - pull_steps=pull_steps, + parameter_openapi_schema=deploy_config.get("parameter_openapi_schema"), + schedules=deploy_config.get("schedules"), + paused=deploy_config.get("paused"), + storage=_PullStepStorage(pull_steps), job_variables=get_from_dict(deploy_config, "work_pool.job_variables"), ) + if deploy_config.get("enforce_parameter_schema") is not None: + deployment.enforce_parameter_schema = deploy_config.get( + "enforce_parameter_schema" + ) + + apply_coro = deployment.apply() + if TYPE_CHECKING: + assert inspect.isawaitable(apply_coro) + + deployment_id = await apply_coro + await _create_deployment_triggers(client, deployment_id, triggers) - # We want to ensure that if a user passes an empty list of SLAs, we call the - # apply endpoint to remove existing SLAs for the deployment. - # If the argument is not provided, we will not call the endpoint. + # # We want to ensure that if a user passes an empty list of SLAs, we call the + # # apply endpoint to remove existing SLAs for the deployment. + # # If the argument is not provided, we will not call the endpoint. sla_specs = _gather_deployment_sla_definitions( options.get("sla"), deploy_config.get("sla") ) diff --git a/src/prefect/client/orchestration/_deployments/client.py b/src/prefect/client/orchestration/_deployments/client.py index 05b0f68da563..e3eaf85a33f7 100644 --- a/src/prefect/client/orchestration/_deployments/client.py +++ b/src/prefect/client/orchestration/_deployments/client.py @@ -165,7 +165,11 @@ def update_deployment( "PATCH", "/deployments/{id}", path_params={"id": deployment_id}, - json=deployment.model_dump(mode="json", exclude_unset=True), + json=deployment.model_dump( + mode="json", + exclude_unset=True, + exclude={"name", "flow_name", "triggers"}, + ), ) def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> "UUID": @@ -708,7 +712,11 @@ async def update_deployment( "PATCH", "/deployments/{id}", path_params={"id": deployment_id}, - json=deployment.model_dump(mode="json", exclude_unset=True), + json=deployment.model_dump( + mode="json", + exclude_unset=True, + exclude={"name", "flow_name", "triggers"}, + ), ) async def _create_deployment_from_schema( diff --git a/src/prefect/client/schemas/actions.py b/src/prefect/client/schemas/actions.py index 771718b9beb3..7dbc5f9d8bf7 100644 --- a/src/prefect/client/schemas/actions.py +++ b/src/prefect/client/schemas/actions.py @@ -326,6 +326,8 @@ def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: "Whether or not the deployment should enforce the parameter schema." ), ) + parameter_openapi_schema: Optional[dict[str, Any]] = Field(default_factory=dict) + pull_steps: Optional[list[dict[str, Any]]] = Field(default=None) def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: """Check that the combination of base_job_template defaults diff --git a/src/prefect/deployments/runner.py b/src/prefect/deployments/runner.py index 9c12b3bf66ef..81ef05d0e9cf 100644 --- a/src/prefect/deployments/runner.py +++ b/src/prefect/deployments/runner.py @@ -57,7 +57,7 @@ def fast_flow(): ) from prefect.client.base import ServerType from prefect.client.orchestration import PrefectClient, get_client -from prefect.client.schemas.actions import DeploymentScheduleCreate +from prefect.client.schemas.actions import DeploymentScheduleCreate, DeploymentUpdate from prefect.client.schemas.filters import WorkerFilter, WorkerFilterStatus from prefect.client.schemas.objects import ( ConcurrencyLimitConfig, @@ -230,6 +230,10 @@ class RunnerDeployment(BaseModel): def entrypoint_type(self) -> EntrypointType: return self._entrypoint_type + @property + def full_name(self) -> str: + return f"{self.flow_name}/{self.name}" + @field_validator("name", mode="before") @classmethod def validate_name(cls, value: str) -> str: @@ -256,24 +260,9 @@ def reconcile_paused(cls, values): def reconcile_schedules(cls, values): return reconcile_schedules_runner(values) - @sync_compatible - async def apply( + async def _create( self, work_pool_name: Optional[str] = None, image: Optional[str] = None ) -> UUID: - """ - Registers this deployment with the API and returns the deployment's ID. - - Args: - work_pool_name: The name of the work pool to use for this - deployment. - image: The registry, name, and tag of the Docker image to - use for this deployment. Only used when the deployment is - deployed to a work pool. - - Returns: - The ID of the created deployment. - """ - work_pool_name = work_pool_name or self.work_pool_name if image and not work_pool_name: @@ -325,9 +314,14 @@ async def apply( if image: create_payload["job_variables"]["image"] = image create_payload["path"] = None if self.storage else self._path - create_payload["pull_steps"] = ( - [self.storage.to_pull_step()] if self.storage else [] - ) + if self.storage: + pull_steps = self.storage.to_pull_step() + if isinstance(pull_steps, list): + create_payload["pull_steps"] = pull_steps + else: + create_payload["pull_steps"] = [pull_steps] + else: + create_payload["pull_steps"] = [] try: deployment_id = await client.create_deployment(**create_payload) @@ -340,25 +334,7 @@ async def apply( f"Error while applying deployment: {str(exc)}" ) from exc - try: - # The triggers defined in the deployment spec are, essentially, - # anonymous and attempting truly sync them with cloud is not - # feasible. Instead, we remove all automations that are owned - # by the deployment, meaning that they were created via this - # mechanism below, and then recreate them. - await client.delete_resource_owned_automations( - f"prefect.deployment.{deployment_id}" - ) - except PrefectHTTPStatusError as e: - if e.response.status_code == 404: - # This Prefect server does not support automations, so we can safely - # ignore this 404 and move on. - return deployment_id - raise e - - for trigger in self.triggers: - trigger.set_deployment_id(deployment_id) - await client.create_automation(trigger.as_automation()) + await self._create_triggers(deployment_id, client) # We plan to support SLA configuration on the Prefect Server in the future. # For now, we only support it on Prefect Cloud. @@ -371,6 +347,86 @@ async def apply( return deployment_id + async def _update(self, deployment_id: UUID, client: PrefectClient): + parameter_openapi_schema = self._parameter_openapi_schema.model_dump( + exclude_unset=True + ) + await client.update_deployment( + deployment_id, + deployment=DeploymentUpdate( + parameter_openapi_schema=parameter_openapi_schema, + **self.model_dump( + mode="json", + exclude_unset=True, + exclude={"storage", "name", "flow_name", "triggers"}, + ), + ), + ) + + await self._create_triggers(deployment_id, client) + + # We plan to support SLA configuration on the Prefect Server in the future. + # For now, we only support it on Prefect Cloud. + + # If we're provided with an empty list, we will call the apply endpoint + # to remove existing SLAs for the deployment. If the argument is not provided, + # we will not call the endpoint. + if self._sla or self._sla == []: + await self._create_slas(deployment_id, client) + + return deployment_id + + async def _create_triggers(self, deployment_id: UUID, client: PrefectClient): + try: + # The triggers defined in the deployment spec are, essentially, + # anonymous and attempting truly sync them with cloud is not + # feasible. Instead, we remove all automations that are owned + # by the deployment, meaning that they were created via this + # mechanism below, and then recreate them. + await client.delete_resource_owned_automations( + f"prefect.deployment.{deployment_id}" + ) + except PrefectHTTPStatusError as e: + if e.response.status_code == 404: + # This Prefect server does not support automations, so we can safely + # ignore this 404 and move on. + return deployment_id + raise e + + for trigger in self.triggers: + trigger.set_deployment_id(deployment_id) + await client.create_automation(trigger.as_automation()) + + @sync_compatible + async def apply( + self, work_pool_name: Optional[str] = None, image: Optional[str] = None + ) -> UUID: + """ + Registers this deployment with the API and returns the deployment's ID. + + Args: + work_pool_name: The name of the work pool to use for this + deployment. + image: The registry, name, and tag of the Docker image to + use for this deployment. Only used when the deployment is + deployed to a work pool. + + Returns: + The ID of the created deployment. + """ + + async with get_client() as client: + try: + deployment = await client.read_deployment_by_name(self.full_name) + except ObjectNotFound: + return await self._create(work_pool_name, image) + else: + if image: + self.job_variables["image"] = image + if work_pool_name: + self.work_pool_name = work_pool_name + return await self._update(deployment.id, client) + async def _create_slas(self, deployment_id: UUID, client: PrefectClient): if not isinstance(self._sla, list): self._sla = [self._sla] diff --git a/src/prefect/runner/storage.py b/src/prefect/runner/storage.py index 9f9cf53778c7..efb17ecdb945 100644 --- a/src/prefect/runner/storage.py +++ b/src/prefect/runner/storage.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import shutil import subprocess from copy import deepcopy @@ -61,7 +63,7 @@ async def pull_code(self) -> None: """ ... - def to_pull_step(self) -> dict[str, Any]: + def to_pull_step(self) -> dict[str, Any] | list[dict[str, Any]]: """ Returns a dictionary representation of the storage object that can be used as a deployment pull step. diff --git a/src/prefect/server/api/deployments.py b/src/prefect/server/api/deployments.py index de579ae14ee9..a0cff90b5fea 100644 --- a/src/prefect/server/api/deployments.py +++ b/src/prefect/server/api/deployments.py @@ -198,8 +198,8 @@ async def update_deployment( # Otherwise, we'll use the existing slugs and the provided slugs to make targeted updates to the deployment's schedules. schedules_to_patch: list[schemas.actions.DeploymentScheduleUpdate] = [] schedules_to_create: list[schemas.actions.DeploymentScheduleUpdate] = [] - all_provided_have_slugs = deployment.schedules and all( - schedule.slug is not None for schedule in deployment.schedules + all_provided_have_slugs = all( + schedule.slug is not None for schedule in deployment.schedules or [] ) all_existing_have_slugs = existing_deployment.schedules and all( schedule.slug is not None for schedule in existing_deployment.schedules diff --git a/src/prefect/server/schemas/actions.py b/src/prefect/server/schemas/actions.py index cbb135e8df06..442452f3f98a 100644 --- a/src/prefect/server/schemas/actions.py +++ b/src/prefect/server/schemas/actions.py @@ -326,6 +326,7 @@ def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]: "Whether or not the deployment should enforce the parameter schema." ), ) + pull_steps: Optional[List[dict[str, Any]]] = Field(None) model_config: ClassVar[ConfigDict] = ConfigDict(populate_by_name=True) def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None: diff --git a/src/prefect/testing/utilities.py b/src/prefect/testing/utilities.py index 2ed0b99a4b6f..d7a397d895f4 100644 --- a/src/prefect/testing/utilities.py +++ b/src/prefect/testing/utilities.py @@ -281,7 +281,13 @@ async def assert_uses_result_storage( def a_test_step(**kwargs: Any) -> dict[str, Any]: - kwargs.update({"output1": 1, "output2": ["b", 2, 3]}) + kwargs.update( + { + "output1": 1, + "output2": ["b", 2, 3], + "output3": "This one is actually a string", + } + ) return kwargs diff --git a/src/prefect/types/__init__.py b/src/prefect/types/__init__.py index cf4b68e08c3c..f03049319e97 100644 --- a/src/prefect/types/__init__.py +++ b/src/prefect/types/__init__.py @@ -106,7 +106,8 @@ def cast_none_to_empty_dict(value: Any) -> dict[str, Any]: ListOfNonEmptyStrings = Annotated[ - List[str], BeforeValidator(lambda x: [s for s in x if s.strip()]) + List[str], + BeforeValidator(lambda x: [str(s) for s in x if str(s).strip()]), ] diff --git a/tests/cli/test_deploy.py b/tests/cli/test_deploy.py index 894f220f3781..556f11077f48 100644 --- a/tests/cli/test_deploy.py +++ b/tests/cli/test_deploy.py @@ -27,7 +27,7 @@ _initialize_deployment_triggers, ) from prefect.client.orchestration import PrefectClient, ServerType -from prefect.client.schemas.actions import WorkPoolCreate +from prefect.client.schemas.actions import DeploymentScheduleCreate, WorkPoolCreate from prefect.client.schemas.objects import Worker, WorkerStatus, WorkPool from prefect.client.schemas.schedules import ( CronSchedule, @@ -1453,7 +1453,7 @@ async def test_project_deploy_templates_values(self, work_pool, prefect_client): contents["deployments"][0]["name"] = "test-name" contents["deployments"][0]["version"] = "{{ input }}" contents["deployments"][0]["tags"] = "{{ output2 }}" - contents["deployments"][0]["description"] = "{{ output1 }}" + contents["deployments"][0]["description"] = "{{ output3 }}" # save it back with prefect_file.open(mode="w") as f: @@ -1488,7 +1488,7 @@ async def test_project_deploy_templates_values(self, work_pool, prefect_client): assert deployment.work_pool_name == work_pool.name assert deployment.version == "foo" assert deployment.tags == ["b", "2", "3"] - assert deployment.description == "1" + assert deployment.description == "This one is actually a string" @pytest.mark.usefixtures("project_dir") async def test_project_deploy_templates_env_var_values( @@ -2331,6 +2331,66 @@ async def test_deploy_respects_yaml_enforce_parameter_schema( ) assert not deployment.enforce_parameter_schema + @pytest.mark.usefixtures("project_dir") + async def test_deploy_update_does_not_override_enforce_parameter_schema( + self, work_pool: WorkPool, prefect_client: PrefectClient + ): + # Create a deployment with enforce_parameter_schema set to False + prefect_yaml_file = Path("prefect.yaml") + with prefect_yaml_file.open(mode="r") as f: + deploy_config = yaml.safe_load(f) + + deploy_config["deployments"] = [ + { + "name": "test-name", + "entrypoint": "flows/hello.py:my_flow", + "work_pool": { + "name": work_pool.name, + }, + "enforce_parameter_schema": False, + } + ] + + with prefect_yaml_file.open(mode="w") as f: + yaml.safe_dump(deploy_config, f) + + await run_sync_in_worker_thread( + invoke_and_assert, + command="deploy -n test-name", + ) + + deployment = await prefect_client.read_deployment_by_name( + "An important name/test-name" + ) + assert not deployment.enforce_parameter_schema + + prefect_yaml_file = Path("prefect.yaml") + with prefect_yaml_file.open(mode="r") as f: + deploy_config = yaml.safe_load(f) + + deploy_config["deployments"] = [ + { + "name": "test-name", + "entrypoint": "flows/hello.py:my_flow", + "work_pool": { + "name": work_pool.name, + }, + } + ] + + with prefect_yaml_file.open(mode="w") as f: + yaml.safe_dump(deploy_config, f) + + await run_sync_in_worker_thread( + invoke_and_assert, + command="deploy -n test-name", + ) + + deployment = await prefect_client.read_deployment_by_name( + "An important name/test-name" + ) + assert not deployment.enforce_parameter_schema + class TestSchedules: @pytest.mark.usefixtures("project_dir") @@ -2913,6 +2973,81 @@ async def test_deploy_with_inactive_schedule( assert deployment_schedule.schedule.cron == "0 4 * * *" assert deployment_schedule.schedule.timezone == "America/Chicago" + @pytest.mark.usefixtures("project_dir") + async def test_deploy_does_not_activate_schedule_outside_of_yaml( + self, prefect_client: PrefectClient, work_pool: WorkPool + ): + prefect_file = Path("prefect.yaml") + with prefect_file.open(mode="r") as f: + deploy_config = yaml.safe_load(f) + + # Create a deployment with a schedule that is not active + deploy_config["deployments"][0]["name"] = "test-name" + deploy_config["deployments"][0]["schedules"] = [ + { + "cron": "0 4 * * *", + "timezone": "America/Chicago", + "active": False, + "slug": "test-yaml-slug", + } + ] + + with prefect_file.open(mode="w") as f: + yaml.safe_dump(deploy_config, f) + + result = await run_sync_in_worker_thread( + invoke_and_assert, + command=f"deploy ./flows/hello.py:my_flow -n test-name --pool {work_pool.name}", + ) + + assert result.exit_code == 0 + + deployment = await prefect_client.read_deployment_by_name( + "An important name/test-name" + ) + + deployment_schedule = deployment.schedules[0] + assert deployment_schedule.active is False + assert deployment_schedule.schedule.cron == "0 4 * * *" + assert deployment_schedule.schedule.timezone == "America/Chicago" + + # Create another schedule outside of the yaml + # Using the https client directly because the PrefectClient does not support + # creating schedules with slugs + await prefect_client._client.post( + f"/deployments/{deployment.id}/schedules", + json=[ + DeploymentScheduleCreate( + schedule=CronSchedule(cron="0 4 * * *"), + active=False, + slug="test-client-slug", + ).model_dump(mode="json"), + ], + ) + + deploy_config["deployments"][0]["schedules"][0]["active"] = True + + with prefect_file.open(mode="w") as f: + yaml.safe_dump(deploy_config, f) + + result = await run_sync_in_worker_thread( + invoke_and_assert, + command=f"deploy ./flows/hello.py:my_flow -n test-name --pool {work_pool.name}", + ) + + assert result.exit_code == 0 + + deployment = await prefect_client.read_deployment_by_name( + "An important name/test-name" + ) + + assert len(deployment.schedules) == 2 + expected_slug_active = {("test-yaml-slug", True), ("test-client-slug", False)} + actual_slug_active = { + (schedule.slug, schedule.active) for schedule in deployment.schedules + } + assert actual_slug_active == expected_slug_active + @pytest.mark.usefixtures("project_dir") async def test_yaml_null_schedules( self, prefect_client: PrefectClient, work_pool: WorkPool diff --git a/tests/conftest.py b/tests/conftest.py index ae3b94ed7e5b..33052921db43 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -140,7 +140,6 @@ def pytest_addoption(parser): "tests/_internal", "tests/server/orchestration/test_rules.py", "tests/test_flows.py", - "tests/runner/test_runner.py", ] diff --git a/tests/runner/test_runner.py b/tests/runner/test_runner.py index 1f4e81a83c2f..5ac7d89d006a 100644 --- a/tests/runner/test_runner.py +++ b/tests/runner/test_runner.py @@ -39,6 +39,8 @@ ) from prefect.docker.docker_image import DockerImage from prefect.events.clients import AssertingEventsClient +from prefect.events.schemas.automations import Posture +from prefect.events.schemas.deployment_triggers import DeploymentEventTrigger from prefect.events.worker import EventsWorker from prefect.flows import load_flow_from_entrypoint from prefect.logging.loggers import flow_run_logger @@ -426,6 +428,46 @@ async def test_add_flows_to_runner(self, prefect_client: PrefectClient): assert deployment_2.name == "test_runner" assert deployment_2.schedules[0].schedule.cron == "* * * * *" + async def test_add_flow_to_runner_always_updates_openapi_schema( + self, prefect_client: PrefectClient + ): + """Runner.add should create a deployment for the flow passed to it""" + runner = Runner() + + @flow + def one(num: int): + pass + + deployment_id = await runner.add_flow(one, name="test-openapi") + deployment = await prefect_client.read_deployment(deployment_id) + + assert deployment.name == "test-openapi" + assert deployment.description == "None" + assert set(deployment.parameter_openapi_schema["properties"].keys()) == {"num"} + + @flow(name="one") + def two(num: int): + "description now" + pass + + deployment_id = await runner.add_flow(two, name="test-openapi") + deployment = await prefect_client.read_deployment(deployment_id) + + assert deployment.name == "test-openapi" + assert deployment.description == "description now" + assert set(deployment.parameter_openapi_schema["properties"].keys()) == {"num"} + + @flow(name="one") + def three(name: str): + pass + + deployment_id = await runner.add_flow(three, name="test-openapi") + deployment = await prefect_client.read_deployment(deployment_id) + + assert deployment.name == "test-openapi" + assert deployment.description is None + assert set(deployment.parameter_openapi_schema["properties"].keys()) == {"name"} + @pytest.mark.parametrize( "kwargs", [ @@ -1572,7 +1614,9 @@ async def test_apply(self, prefect_client: PrefectClient): assert deployment.paused is False assert deployment.global_concurrency_limit is None - async def test_apply_with_work_pool(self, prefect_client: PrefectClient, work_pool): + async def test_apply_with_work_pool( + self, prefect_client: PrefectClient, work_pool, process_work_pool + ): deployment = RunnerDeployment.from_flow( dummy_flow_1, __file__, @@ -1591,6 +1635,57 @@ async def test_apply_with_work_pool(self, prefect_client: PrefectClient, work_po } assert deployment.work_queue_name == "default" + # should result in the same deployment ID + deployment2 = RunnerDeployment.from_flow( + dummy_flow_1, + __file__, + interval=3600, + ) + + deployment_id = await deployment2.apply(work_pool_name=process_work_pool.name) + deployment2 = await prefect_client.read_deployment(deployment_id) + + assert deployment2.work_pool_name == process_work_pool.name + + # this may look weird with a process pool but update's job isn't to enforce that schema + assert deployment2.job_variables == { + "image": "my-repo/my-image:latest", + } + assert deployment2.work_queue_name == "default" + + async def test_apply_with_image(self, prefect_client: PrefectClient, work_pool): + deployment = RunnerDeployment.from_flow( + dummy_flow_1, + "test-image", + ) + + deployment_id = await deployment.apply( + work_pool_name=work_pool.name, image="my-repo/my-image:latest" + ) + + deployment = await prefect_client.read_deployment(deployment_id) + + assert deployment.work_pool_name == work_pool.name + assert deployment.job_variables == { + "image": "my-repo/my-image:latest", + } + assert deployment.work_queue_name == "default" + + # should result in the same deployment ID + deployment2 = RunnerDeployment.from_flow( + dummy_flow_1, + "test-image", + ) + + deployment_id = await deployment2.apply(image="my-other-repo/my-image:latest") + deployment2 = await prefect_client.read_deployment(deployment_id) + + assert deployment2.work_pool_name == work_pool.name + assert deployment2.job_variables == { + "image": "my-other-repo/my-image:latest", + } + assert deployment2.work_queue_name == "default" + async def test_apply_paused(self, prefect_client: PrefectClient): deployment = RunnerDeployment.from_flow( dummy_flow_1, __file__, interval=3600, paused=True @@ -2609,6 +2704,93 @@ async def test_deploy_to_process_work_pool_with_storage( "Looks like you're deploying to a process work pool." in console_output ) + async def test_deploy_with_triggers( + self, + mock_build_image, + mock_docker_client, + mock_generate_default_dockerfile, + work_pool_with_image_variable, + prefect_client: PrefectClient, + ): + deployment_ids = await deploy( + await dummy_flow_1.to_deployment( + __file__, + triggers=[ + DeploymentEventTrigger( + name="test-trigger", + enabled=True, + posture=Posture.Reactive, + match={"prefect.resource.id": "prefect.flow-run.*"}, + expect=["prefect.flow-run.Completed"], + ) + ], + ), + work_pool_name=work_pool_with_image_variable.name, + image="test-registry/test-image", + ) + assert len(deployment_ids) == 1 + triggers = await prefect_client._client.get( + f"/automations/related-to/prefect.deployment.{deployment_ids[0]}" + ) + assert len(triggers.json()) == 1 + assert triggers.json()[0]["name"] == "test-trigger" + + async def test_deploy_with_triggers_and_update( + self, + mock_build_image, + mock_docker_client, + mock_generate_default_dockerfile, + work_pool_with_image_variable, + prefect_client: PrefectClient, + ): + deployment_ids = await deploy( + await dummy_flow_1.to_deployment( + __file__, + triggers=[ + DeploymentEventTrigger( + name="test-trigger", + enabled=True, + posture=Posture.Reactive, + match={"prefect.resource.id": "prefect.flow-run.*"}, + expect=["prefect.flow-run.Completed"], + ) + ], + ), + work_pool_name=work_pool_with_image_variable.name, + image="test-registry/test-image", + ) + assert len(deployment_ids) == 1 + triggers = await prefect_client._client.get( + f"/automations/related-to/prefect.deployment.{deployment_ids[0]}" + ) + assert len(triggers.json()) == 1 + assert triggers.json()[0]["name"] == "test-trigger" + assert triggers.json()[0]["enabled"] + + deployment_ids = await deploy( + await dummy_flow_1.to_deployment( + __file__, + triggers=[ + DeploymentEventTrigger( + name="test-trigger-2", + enabled=False, + posture=Posture.Reactive, + match={"prefect.resource.id": "prefect.flow-run.*"}, + expect=["prefect.flow-run.Completed"], + ) + ], + ), + work_pool_name=work_pool_with_image_variable.name, + image="test-registry/test-image", + ) + assert len(deployment_ids) == 1 + triggers = await prefect_client._client.get( + f"/automations/related-to/prefect.deployment.{deployment_ids[0]}" + ) + assert len(triggers.json()) == 1 + assert triggers.json()[0]["name"] == "test-trigger-2" + assert not triggers.json()[0]["enabled"] + class TestDockerImage: def test_adds_default_registry_url(self):