From e73e1cb26602c5461f730c408de94f8d6c93541b Mon Sep 17 00:00:00 2001 From: zzstoatzz Date: Fri, 31 Jan 2025 11:29:24 -0600 Subject: [PATCH] bunch of em --- .../_internal/compatibility/deprecated.py | 8 ++--- src/prefect/cli/deployment.py | 26 +++++++++-------- src/prefect/cli/work_pool.py | 17 ++++++----- src/prefect/locking/filesystem.py | 7 +++-- src/prefect/runner/runner.py | 6 ++-- src/prefect/runner/server.py | 4 +-- src/prefect/types/_datetime.py | 29 ++++++++++++++++++- 7 files changed, 65 insertions(+), 32 deletions(-) diff --git a/src/prefect/_internal/compatibility/deprecated.py b/src/prefect/_internal/compatibility/deprecated.py index fcffa9e7ee52..05c5c19cfd25 100644 --- a/src/prefect/_internal/compatibility/deprecated.py +++ b/src/prefect/_internal/compatibility/deprecated.py @@ -15,10 +15,10 @@ import warnings from typing import TYPE_CHECKING, Any, Callable, Optional, Union -import pendulum from pydantic import BaseModel from typing_extensions import ParamSpec, TypeAlias, TypeVar +from prefect.types._datetime import DateTime, from_format from prefect.utilities.callables import get_call_parameters from prefect.utilities.importtools import ( AliasedModuleDefinition, @@ -60,18 +60,18 @@ def generate_deprecation_message( if not start_date and not end_date: raise ValueError( "A start date is required if an end date is not provided. Suggested start" - f" date is {pendulum.now('UTC').format(DEPRECATED_DATEFMT)!r}" + f" date is {DateTime.now('UTC').format(DEPRECATED_DATEFMT)!r}" ) if not end_date: if TYPE_CHECKING: assert start_date is not None - parsed_start_date = pendulum.from_format(start_date, DEPRECATED_DATEFMT) + parsed_start_date = from_format(start_date, DEPRECATED_DATEFMT) parsed_end_date = parsed_start_date.add(months=6) end_date = parsed_end_date.format(DEPRECATED_DATEFMT) else: # Validate format - pendulum.from_format(end_date, DEPRECATED_DATEFMT) + from_format(end_date, DEPRECATED_DATEFMT) if when: when = " when " + when diff --git a/src/prefect/cli/deployment.py b/src/prefect/cli/deployment.py index dbe229e58b7f..f8ecb475df8d 100644 --- a/src/prefect/cli/deployment.py +++ b/src/prefect/cli/deployment.py @@ -13,8 +13,6 @@ from typing import TYPE_CHECKING, Any, Optional, TypedDict from uuid import UUID -import pendulum -import pendulum.tz import typer import yaml from rich.console import Console @@ -42,6 +40,12 @@ ) from prefect.flow_runs import wait_for_flow_run from prefect.states import Scheduled +from prefect.types._datetime import ( + DateTime, + format_diff, + local_timezone, + parse_datetime, +) from prefect.utilities import urls from prefect.utilities.collections import listrepr @@ -344,7 +348,7 @@ async def create_schedule( if interval is not None: if interval_anchor: try: - pendulum.parse(interval_anchor) + parse_datetime(interval_anchor) except ValueError: return exit_with_error("The anchor date must be a valid date string.") @@ -552,7 +556,7 @@ async def list_schedules(deployment_name: str): def sort_by_created_key(schedule: DeploymentSchedule): # type: ignore assert schedule.created is not None, "All schedules should have a created time." - return pendulum.now("utc") - schedule.created + return DateTime.now("utc") - schedule.created def schedule_details(schedule: DeploymentSchedule) -> str: if isinstance(schedule.schedule, IntervalSchedule): @@ -643,7 +647,7 @@ def sort_by_name_keys(d: DeploymentResponse): def sort_by_created_key(d: DeploymentResponse): assert d.created is not None, "All deployments should have a created time." - return pendulum.now("utc") - d.created + return DateTime.now("utc") - d.created table = Table( title="Deployments", @@ -751,7 +755,7 @@ async def run( """ import dateparser - now = pendulum.now("UTC") + now = DateTime.now("UTC") multi_params: dict[str, Any] = {} if multiparams: @@ -802,7 +806,7 @@ async def run( warnings.filterwarnings("ignore", module="dateparser") try: - start_time_parsed = dateparser.parse( + start_time_parsed = dateparser.parse( # type: ignore[reportUnknownMemberType] start_time_raw, settings={ "TO_TIMEZONE": "UTC", @@ -820,10 +824,8 @@ async def run( if start_time_parsed is None: exit_with_error(f"Unable to parse scheduled start time {start_time_raw!r}.") - scheduled_start_time = pendulum.instance(start_time_parsed) - human_dt_diff = ( - " (" + pendulum.format_diff(scheduled_start_time.diff(now)) + ")" - ) + scheduled_start_time = DateTime.instance(start_time_parsed) + human_dt_diff = " (" + format_diff(scheduled_start_time.diff(now)) + ")" async with get_client() as client: deployment = await get_deployment(client, name, deployment_id) @@ -871,7 +873,7 @@ async def run( raise run_url = urls.url_for(flow_run) or "" - datetime_local_tz = scheduled_start_time.in_tz(pendulum.tz.local_timezone()) + datetime_local_tz = scheduled_start_time.in_tz(local_timezone()) scheduled_display = datetime_local_tz.to_datetime_string() tz_name = datetime_local_tz.tzname() if tz_name: diff --git a/src/prefect/cli/work_pool.py b/src/prefect/cli/work_pool.py index ff528d959458..66da4be95eae 100644 --- a/src/prefect/cli/work_pool.py +++ b/src/prefect/cli/work_pool.py @@ -5,7 +5,6 @@ import json import textwrap -import pendulum import typer from rich.pretty import Pretty from rich.table import Table @@ -20,12 +19,14 @@ from prefect.client.collections import get_collections_metadata_client from prefect.client.orchestration import get_client from prefect.client.schemas.actions import WorkPoolCreate, WorkPoolUpdate +from prefect.client.schemas.objects import FlowRun, WorkPool from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound from prefect.infrastructure.provisioners import ( _provisioners, get_infrastructure_provisioner_for_work_pool_type, ) from prefect.settings import update_current_profile +from prefect.types._datetime import DateTime, PendulumDuration from prefect.utilities import urls from prefect.workers.utilities import ( get_available_work_pool_types, @@ -278,8 +279,9 @@ async def ls( async with get_client() as client: pools = await client.read_work_pools() - def sort_by_created_key(q): - return pendulum.now("utc") - q.created + def sort_by_created_key(q: WorkPool) -> PendulumDuration: + assert q.created is not None + return DateTime.now("utc") - q.created for pool in sorted(pools, key=sort_by_created_key): row = [ @@ -649,18 +651,19 @@ async def preview( table.add_column("Name", style="green", no_wrap=True) table.add_column("Deployment ID", style="blue", no_wrap=True) - pendulum.now("utc").add(hours=hours or 1) + DateTime.now("utc").add(hours=hours or 1) - now = pendulum.now("utc") + now = DateTime.now("utc") - def sort_by_created_key(r): + def sort_by_created_key(r: FlowRun) -> PendulumDuration: + assert r.created is not None return now - r.created for run in sorted(runs, key=sort_by_created_key): table.add_row( ( f"{run.expected_start_time} [red](**)" - if run.expected_start_time < now + if run.expected_start_time and run.expected_start_time < now else f"{run.expected_start_time}" ), str(run.id), diff --git a/src/prefect/locking/filesystem.py b/src/prefect/locking/filesystem.py index c324f7d17aab..239ca798c5b8 100644 --- a/src/prefect/locking/filesystem.py +++ b/src/prefect/locking/filesystem.py @@ -9,6 +9,7 @@ from typing_extensions import TypedDict from prefect.logging.loggers import get_logger +from prefect.types._datetime import DateTime, PendulumDuration from .protocol import LockManager @@ -116,7 +117,7 @@ def acquire_lock( ) return self.acquire_lock(key, holder, acquire_timeout, hold_timeout) expiration = ( - pendulum.now("utc") + pendulum.duration(seconds=hold_timeout) + DateTime.now("utc") + PendulumDuration(seconds=hold_timeout) if hold_timeout is not None else None ) @@ -165,7 +166,7 @@ async def aacquire_lock( ) return self.acquire_lock(key, holder, acquire_timeout, hold_timeout) expiration = ( - pendulum.now("utc") + pendulum.duration(seconds=hold_timeout) + DateTime.now("utc") + PendulumDuration(seconds=hold_timeout) if hold_timeout is not None else None ) @@ -207,7 +208,7 @@ def is_locked(self, key: str, use_cache: bool = False) -> bool: if (expiration := lock_info.get("expiration")) is None: return True - expired = expiration < pendulum.now("utc") + expired = expiration < DateTime.now("utc") if expired: Path(lock_info["path"]).unlink() self._locks.pop(key, None) diff --git a/src/prefect/runner/runner.py b/src/prefect/runner/runner.py index 048d40125874..00567bdc1c0e 100644 --- a/src/prefect/runner/runner.py +++ b/src/prefect/runner/runner.py @@ -64,7 +64,6 @@ def fast_flow(): import anyio import anyio.abc -import pendulum from cachetools import LRUCache from typing_extensions import Self @@ -105,6 +104,7 @@ def fast_flow(): Pending, exception_to_failed_state, ) +from prefect.types._datetime import DateTime from prefect.types.entrypoint import EntrypointType from prefect.utilities.asyncutils import ( asyncnullcontext, @@ -797,7 +797,7 @@ async def _get_and_submit_flow_runs(self): if self.stopping: return runs_response = await self._get_scheduled_flow_runs() - self.last_polled: pendulum.DateTime = pendulum.now("UTC") + self.last_polled: DateTime = DateTime.now("UTC") return await self._submit_scheduled_flow_runs(flow_run_response=runs_response) async def _check_for_cancelled_flow_runs( @@ -1063,7 +1063,7 @@ async def _get_scheduled_flow_runs( """ Retrieve scheduled flow runs for this runner. """ - scheduled_before = pendulum.now("utc").add(seconds=int(self._prefetch_seconds)) + scheduled_before = DateTime.now("utc").add(seconds=int(self._prefetch_seconds)) self._logger.debug( f"Querying for flow runs scheduled before {scheduled_before}" ) diff --git a/src/prefect/runner/server.py b/src/prefect/runner/server.py index b58cbf02e4d5..ce7eb5ac62b8 100644 --- a/src/prefect/runner/server.py +++ b/src/prefect/runner/server.py @@ -1,7 +1,6 @@ import uuid from typing import TYPE_CHECKING, Any, Callable, Coroutine, Hashable, Optional, Tuple -import pendulum import uvicorn from fastapi import APIRouter, FastAPI, HTTPException, status from fastapi.responses import JSONResponse @@ -22,6 +21,7 @@ PREFECT_RUNNER_SERVER_MISSED_POLLS_TOLERANCE, PREFECT_RUNNER_SERVER_PORT, ) +from prefect.types._datetime import DateTime from prefect.utilities.asyncutils import run_coro_as_sync from prefect.utilities.importtools import load_script_as_module @@ -54,7 +54,7 @@ def perform_health_check( ) def _health_check(): - now = pendulum.now("utc") + now = DateTime.now("utc") poll_delay = (now - runner.last_polled).total_seconds() if poll_delay > delay_threshold: diff --git a/src/prefect/types/_datetime.py b/src/prefect/types/_datetime.py index 61b85dbe1423..b7679714eada 100644 --- a/src/prefect/types/_datetime.py +++ b/src/prefect/types/_datetime.py @@ -1,10 +1,14 @@ from __future__ import annotations +from typing import Any + import pendulum +import pendulum.tz from pendulum.date import Date as PendulumDate from pendulum.datetime import DateTime as PendulumDateTime from pendulum.duration import Duration as PendulumDuration from pendulum.time import Time as PendulumTime +from pendulum.tz.timezone import FixedTimezone, Timezone from pydantic_extra_types.pendulum_dt import Date as PydanticDate from pydantic_extra_types.pendulum_dt import DateTime as PydanticDateTime from typing_extensions import TypeAlias @@ -15,5 +19,28 @@ def parse_datetime( value: str, + **options: Any, ) -> PendulumDateTime | PendulumDate | PendulumTime | PendulumDuration: - return pendulum.parse(value) + return pendulum.parse(value, **options) + + +def format_diff( + diff: PendulumDuration, + is_now: bool = True, + absolute: bool = False, + locale: str | None = None, +) -> str: + return pendulum.format_diff(diff, is_now, absolute, locale) + + +def local_timezone() -> Timezone | FixedTimezone: + return pendulum.tz.local_timezone() + + +def from_format( + value: str, + fmt: str, + tz: str | Timezone = pendulum.tz.UTC, + locale: str | None = None, +) -> DateTime: + return DateTime.instance(pendulum.from_format(value, fmt, tz, locale))