Skip to content

Commit

Permalink
Update the SDK to use PATCH /deployments/{id} endpoint for existing…
Browse files Browse the repository at this point in the history
… deployments (#17050)

Co-authored-by: Chris White <chris@prefect.io>
Co-authored-by: nate nowack <thrast36@gmail.com>
  • Loading branch information
3 people authored Feb 8, 2025
1 parent 0794427 commit c8986ed
Show file tree
Hide file tree
Showing 13 changed files with 580 additions and 70 deletions.
80 changes: 80 additions & 0 deletions flows/schedule_statefulness.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
this integration test should
- start serve w a schedule (no slug)
- stop the serve process
- start serve with no schedule
- observe that deployment has no schedules
- start serve with schedule (slug)
- stop the serve process
- start serve with no schedule
- stop the serve process
- observe that deployment still has that named schedule
"""

import signal
from typing import Any

from prefect import flow, get_client
from prefect.client.schemas.objects import DeploymentSchedule
from prefect.schedules import Cron

DEPLOYMENT_NAME = "my-deployment"


@flow
def my_flow():
print("Hello, world!")


def _handler(signum: int, frame: Any):
raise KeyboardInterrupt("Simulating user interruption")


def run_serve_with_schedule(
timeout: int = 2, serve_kwargs: dict[str, Any] | None = None
):
signal.signal(signal.SIGALRM, _handler)
signal.alarm(timeout)
try:
my_flow.serve(name=DEPLOYMENT_NAME, **(serve_kwargs or {}))
except KeyboardInterrupt:
print("Serve interrupted")
finally:
signal.alarm(0)


def check_deployment_schedules(deployment_name: str) -> list[DeploymentSchedule]:
with get_client(sync_client=True) as client:
deployment = client.read_deployment_by_name(deployment_name)
return deployment.schedules


def main():
# case 1: Schedule without slug
print("\nTest case 1: Schedule without slug")
run_serve_with_schedule(serve_kwargs={"schedules": [Cron("0 9 * * *")]})
run_serve_with_schedule(serve_kwargs={"schedules": []})
schedules = check_deployment_schedules(f"my-flow/{DEPLOYMENT_NAME}")
assert not schedules, (
f"Expected no schedules after removing unnamed schedule: {schedules}"
)

# case 2: Schedule with slug
print("\nTest case 2: Schedule with slug")
run_serve_with_schedule(
serve_kwargs={"schedules": [Cron("0 9 * * *", slug="every-day-at-9am")]}
)
run_serve_with_schedule(serve_kwargs={"schedules": []})
schedules = check_deployment_schedules(f"my-flow/{DEPLOYMENT_NAME}")
assert any(s.slug == "every-day-at-9am" for s in schedules), (
f"Expected named schedule to persist: {schedules}"
)
print("All tests passed!")


if __name__ == "__main__":
main()
76 changes: 57 additions & 19 deletions src/prefect/cli/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import inspect
import json
import os
import re
Expand Down Expand Up @@ -58,6 +59,7 @@
_format_deployment_for_saving_to_prefect_file,
_save_deployment_to_prefect_file,
)
from prefect.deployments.runner import RunnerDeployment
from prefect.deployments.steps.core import run_steps
from prefect.events import DeploymentTriggerTypes, TriggerTypes
from prefect.exceptions import ObjectNotFound, PrefectHTTPStatusError
Expand Down Expand Up @@ -89,6 +91,36 @@
SlaAdapter: TypeAdapter[SlaTypes] = TypeAdapter(SlaTypes)


class _PullStepStorage:
"""
A shim storage class that allows passing pull steps to a `RunnerDeployment`.
"""

def __init__(self, pull_steps: list[dict[str, Any]]):
self._base_path = Path.cwd()
self.pull_steps = pull_steps

def set_base_path(self, path: Path):
self._base_path = path

@property
def destination(self):
return self._base_path

@property
def pull_interval(self):
return 60

async def pull_code(self):
pass

def to_pull_step(self):
return self.pull_steps

def __eq__(self, other: Any) -> bool:
return self.pull_steps == getattr(other, "pull_steps", None)


@app.command()
async def init(
name: Optional[str] = None,
Expand Down Expand Up @@ -730,35 +762,41 @@ async def _run_single_deploy(

pull_steps = apply_values(pull_steps, step_outputs, remove_notset=False)

flow_id = await client.create_flow_from_name(deploy_config["flow_name"])

deployment_id = await client.create_deployment(
flow_id=flow_id,
name=deploy_config.get("name"),
work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"),
deployment = RunnerDeployment(
name=deploy_config["name"],
flow_name=deploy_config.get("flow_name"),
entrypoint=deploy_config.get("entrypoint"),
work_pool_name=get_from_dict(deploy_config, "work_pool.name"),
version=deploy_config.get("version"),
schedules=deploy_config.get("schedules"),
paused=deploy_config.get("paused"),
enforce_parameter_schema=deploy_config.get("enforce_parameter_schema", True),
parameter_openapi_schema=deploy_config.get(
"parameter_openapi_schema"
).model_dump_for_openapi(),
work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"),
parameters=deploy_config.get("parameters"),
description=deploy_config.get("description"),
tags=deploy_config.get("tags", []),
version=deploy_config.get("version"),
tags=deploy_config.get("tags"),
concurrency_limit=deploy_config.get("concurrency_limit"),
concurrency_options=deploy_config.get("concurrency_options"),
entrypoint=deploy_config.get("entrypoint"),
pull_steps=pull_steps,
parameter_openapi_schema=deploy_config.get("parameter_openapi_schema"),
schedules=deploy_config.get("schedules"),
paused=deploy_config.get("paused"),
storage=_PullStepStorage(pull_steps),
job_variables=get_from_dict(deploy_config, "work_pool.job_variables"),
)

if deploy_config.get("enforce_parameter_schema") is not None:
deployment.enforce_parameter_schema = deploy_config.get(
"enforce_parameter_schema"
)

apply_coro = deployment.apply()
if TYPE_CHECKING:
assert inspect.isawaitable(apply_coro)

deployment_id = await apply_coro

await _create_deployment_triggers(client, deployment_id, triggers)

# We want to ensure that if a user passes an empty list of SLAs, we call the
# apply endpoint to remove existing SLAs for the deployment.
# If the argument is not provided, we will not call the endpoint.
# # We want to ensure that if a user passes an empty list of SLAs, we call the
# # apply endpoint to remove existing SLAs for the deployment.
# # If the argument is not provided, we will not call the endpoint.
sla_specs = _gather_deployment_sla_definitions(
options.get("sla"), deploy_config.get("sla")
)
Expand Down
12 changes: 10 additions & 2 deletions src/prefect/client/orchestration/_deployments/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ def update_deployment(
"PATCH",
"/deployments/{id}",
path_params={"id": deployment_id},
json=deployment.model_dump(mode="json", exclude_unset=True),
json=deployment.model_dump(
mode="json",
exclude_unset=True,
exclude={"name", "flow_name", "triggers"},
),
)

def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> "UUID":
Expand Down Expand Up @@ -708,7 +712,11 @@ async def update_deployment(
"PATCH",
"/deployments/{id}",
path_params={"id": deployment_id},
json=deployment.model_dump(mode="json", exclude_unset=True),
json=deployment.model_dump(
mode="json",
exclude_unset=True,
exclude={"name", "flow_name", "triggers"},
),
)

async def _create_deployment_from_schema(
Expand Down
2 changes: 2 additions & 0 deletions src/prefect/client/schemas/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ def remove_old_fields(cls, values: dict[str, Any]) -> dict[str, Any]:
"Whether or not the deployment should enforce the parameter schema."
),
)
parameter_openapi_schema: Optional[dict[str, Any]] = Field(default_factory=dict)
pull_steps: Optional[list[dict[str, Any]]] = Field(default=None)

def check_valid_configuration(self, base_job_template: dict[str, Any]) -> None:
"""Check that the combination of base_job_template defaults
Expand Down
Loading

0 comments on commit c8986ed

Please # to comment.