Skip to content

Commit

Permalink
bunch of em
Browse files Browse the repository at this point in the history
  • Loading branch information
zzstoatzz committed Jan 31, 2025
1 parent caee64f commit e73e1cb
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 32 deletions.
8 changes: 4 additions & 4 deletions src/prefect/_internal/compatibility/deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
import warnings
from typing import TYPE_CHECKING, Any, Callable, Optional, Union

import pendulum
from pydantic import BaseModel
from typing_extensions import ParamSpec, TypeAlias, TypeVar

from prefect.types._datetime import DateTime, from_format
from prefect.utilities.callables import get_call_parameters
from prefect.utilities.importtools import (
AliasedModuleDefinition,
Expand Down Expand Up @@ -60,18 +60,18 @@ def generate_deprecation_message(
if not start_date and not end_date:
raise ValueError(
"A start date is required if an end date is not provided. Suggested start"
f" date is {pendulum.now('UTC').format(DEPRECATED_DATEFMT)!r}"
f" date is {DateTime.now('UTC').format(DEPRECATED_DATEFMT)!r}"
)

if not end_date:
if TYPE_CHECKING:
assert start_date is not None
parsed_start_date = pendulum.from_format(start_date, DEPRECATED_DATEFMT)
parsed_start_date = from_format(start_date, DEPRECATED_DATEFMT)
parsed_end_date = parsed_start_date.add(months=6)
end_date = parsed_end_date.format(DEPRECATED_DATEFMT)
else:
# Validate format
pendulum.from_format(end_date, DEPRECATED_DATEFMT)
from_format(end_date, DEPRECATED_DATEFMT)

if when:
when = " when " + when
Expand Down
26 changes: 14 additions & 12 deletions src/prefect/cli/deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
from typing import TYPE_CHECKING, Any, Optional, TypedDict
from uuid import UUID

import pendulum
import pendulum.tz
import typer
import yaml
from rich.console import Console
Expand Down Expand Up @@ -42,6 +40,12 @@
)
from prefect.flow_runs import wait_for_flow_run
from prefect.states import Scheduled
from prefect.types._datetime import (
DateTime,
format_diff,
local_timezone,
parse_datetime,
)
from prefect.utilities import urls
from prefect.utilities.collections import listrepr

Expand Down Expand Up @@ -344,7 +348,7 @@ async def create_schedule(
if interval is not None:
if interval_anchor:
try:
pendulum.parse(interval_anchor)
parse_datetime(interval_anchor)
except ValueError:
return exit_with_error("The anchor date must be a valid date string.")

Expand Down Expand Up @@ -552,7 +556,7 @@ async def list_schedules(deployment_name: str):

def sort_by_created_key(schedule: DeploymentSchedule): # type: ignore
assert schedule.created is not None, "All schedules should have a created time."
return pendulum.now("utc") - schedule.created
return DateTime.now("utc") - schedule.created

def schedule_details(schedule: DeploymentSchedule) -> str:
if isinstance(schedule.schedule, IntervalSchedule):
Expand Down Expand Up @@ -643,7 +647,7 @@ def sort_by_name_keys(d: DeploymentResponse):

def sort_by_created_key(d: DeploymentResponse):
assert d.created is not None, "All deployments should have a created time."
return pendulum.now("utc") - d.created
return DateTime.now("utc") - d.created

table = Table(
title="Deployments",
Expand Down Expand Up @@ -751,7 +755,7 @@ async def run(
"""
import dateparser

now = pendulum.now("UTC")
now = DateTime.now("UTC")

multi_params: dict[str, Any] = {}
if multiparams:
Expand Down Expand Up @@ -802,7 +806,7 @@ async def run(
warnings.filterwarnings("ignore", module="dateparser")

try:
start_time_parsed = dateparser.parse(
start_time_parsed = dateparser.parse( # type: ignore[reportUnknownMemberType]
start_time_raw,
settings={
"TO_TIMEZONE": "UTC",
Expand All @@ -820,10 +824,8 @@ async def run(
if start_time_parsed is None:
exit_with_error(f"Unable to parse scheduled start time {start_time_raw!r}.")

scheduled_start_time = pendulum.instance(start_time_parsed)
human_dt_diff = (
" (" + pendulum.format_diff(scheduled_start_time.diff(now)) + ")"
)
scheduled_start_time = DateTime.instance(start_time_parsed)
human_dt_diff = " (" + format_diff(scheduled_start_time.diff(now)) + ")"

async with get_client() as client:
deployment = await get_deployment(client, name, deployment_id)
Expand Down Expand Up @@ -871,7 +873,7 @@ async def run(
raise

run_url = urls.url_for(flow_run) or "<no dashboard available>"
datetime_local_tz = scheduled_start_time.in_tz(pendulum.tz.local_timezone())
datetime_local_tz = scheduled_start_time.in_tz(local_timezone())
scheduled_display = datetime_local_tz.to_datetime_string()
tz_name = datetime_local_tz.tzname()
if tz_name:
Expand Down
17 changes: 10 additions & 7 deletions src/prefect/cli/work_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import json
import textwrap

import pendulum
import typer
from rich.pretty import Pretty
from rich.table import Table
Expand All @@ -20,12 +19,14 @@
from prefect.client.collections import get_collections_metadata_client
from prefect.client.orchestration import get_client
from prefect.client.schemas.actions import WorkPoolCreate, WorkPoolUpdate
from prefect.client.schemas.objects import FlowRun, WorkPool
from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound
from prefect.infrastructure.provisioners import (
_provisioners,
get_infrastructure_provisioner_for_work_pool_type,
)
from prefect.settings import update_current_profile
from prefect.types._datetime import DateTime, PendulumDuration
from prefect.utilities import urls
from prefect.workers.utilities import (
get_available_work_pool_types,
Expand Down Expand Up @@ -278,8 +279,9 @@ async def ls(
async with get_client() as client:
pools = await client.read_work_pools()

def sort_by_created_key(q):
return pendulum.now("utc") - q.created
def sort_by_created_key(q: WorkPool) -> PendulumDuration:
assert q.created is not None
return DateTime.now("utc") - q.created

for pool in sorted(pools, key=sort_by_created_key):
row = [
Expand Down Expand Up @@ -649,18 +651,19 @@ async def preview(
table.add_column("Name", style="green", no_wrap=True)
table.add_column("Deployment ID", style="blue", no_wrap=True)

pendulum.now("utc").add(hours=hours or 1)
DateTime.now("utc").add(hours=hours or 1)

now = pendulum.now("utc")
now = DateTime.now("utc")

def sort_by_created_key(r):
def sort_by_created_key(r: FlowRun) -> PendulumDuration:
assert r.created is not None
return now - r.created

for run in sorted(runs, key=sort_by_created_key):
table.add_row(
(
f"{run.expected_start_time} [red](**)"
if run.expected_start_time < now
if run.expected_start_time and run.expected_start_time < now
else f"{run.expected_start_time}"
),
str(run.id),
Expand Down
7 changes: 4 additions & 3 deletions src/prefect/locking/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing_extensions import TypedDict

from prefect.logging.loggers import get_logger
from prefect.types._datetime import DateTime, PendulumDuration

from .protocol import LockManager

Expand Down Expand Up @@ -116,7 +117,7 @@ def acquire_lock(
)
return self.acquire_lock(key, holder, acquire_timeout, hold_timeout)
expiration = (
pendulum.now("utc") + pendulum.duration(seconds=hold_timeout)
DateTime.now("utc") + PendulumDuration(seconds=hold_timeout)
if hold_timeout is not None
else None
)
Expand Down Expand Up @@ -165,7 +166,7 @@ async def aacquire_lock(
)
return self.acquire_lock(key, holder, acquire_timeout, hold_timeout)
expiration = (
pendulum.now("utc") + pendulum.duration(seconds=hold_timeout)
DateTime.now("utc") + PendulumDuration(seconds=hold_timeout)
if hold_timeout is not None
else None
)
Expand Down Expand Up @@ -207,7 +208,7 @@ def is_locked(self, key: str, use_cache: bool = False) -> bool:
if (expiration := lock_info.get("expiration")) is None:
return True

expired = expiration < pendulum.now("utc")
expired = expiration < DateTime.now("utc")
if expired:
Path(lock_info["path"]).unlink()
self._locks.pop(key, None)
Expand Down
6 changes: 3 additions & 3 deletions src/prefect/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def fast_flow():

import anyio
import anyio.abc
import pendulum
from cachetools import LRUCache
from typing_extensions import Self

Expand Down Expand Up @@ -105,6 +104,7 @@ def fast_flow():
Pending,
exception_to_failed_state,
)
from prefect.types._datetime import DateTime
from prefect.types.entrypoint import EntrypointType
from prefect.utilities.asyncutils import (
asyncnullcontext,
Expand Down Expand Up @@ -797,7 +797,7 @@ async def _get_and_submit_flow_runs(self):
if self.stopping:
return
runs_response = await self._get_scheduled_flow_runs()
self.last_polled: pendulum.DateTime = pendulum.now("UTC")
self.last_polled: DateTime = DateTime.now("UTC")
return await self._submit_scheduled_flow_runs(flow_run_response=runs_response)

async def _check_for_cancelled_flow_runs(
Expand Down Expand Up @@ -1063,7 +1063,7 @@ async def _get_scheduled_flow_runs(
"""
Retrieve scheduled flow runs for this runner.
"""
scheduled_before = pendulum.now("utc").add(seconds=int(self._prefetch_seconds))
scheduled_before = DateTime.now("utc").add(seconds=int(self._prefetch_seconds))
self._logger.debug(
f"Querying for flow runs scheduled before {scheduled_before}"
)
Expand Down
4 changes: 2 additions & 2 deletions src/prefect/runner/server.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import uuid
from typing import TYPE_CHECKING, Any, Callable, Coroutine, Hashable, Optional, Tuple

import pendulum
import uvicorn
from fastapi import APIRouter, FastAPI, HTTPException, status
from fastapi.responses import JSONResponse
Expand All @@ -22,6 +21,7 @@
PREFECT_RUNNER_SERVER_MISSED_POLLS_TOLERANCE,
PREFECT_RUNNER_SERVER_PORT,
)
from prefect.types._datetime import DateTime
from prefect.utilities.asyncutils import run_coro_as_sync
from prefect.utilities.importtools import load_script_as_module

Expand Down Expand Up @@ -54,7 +54,7 @@ def perform_health_check(
)

def _health_check():
now = pendulum.now("utc")
now = DateTime.now("utc")
poll_delay = (now - runner.last_polled).total_seconds()

if poll_delay > delay_threshold:
Expand Down
29 changes: 28 additions & 1 deletion src/prefect/types/_datetime.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from __future__ import annotations

from typing import Any

import pendulum
import pendulum.tz
from pendulum.date import Date as PendulumDate
from pendulum.datetime import DateTime as PendulumDateTime
from pendulum.duration import Duration as PendulumDuration
from pendulum.time import Time as PendulumTime
from pendulum.tz.timezone import FixedTimezone, Timezone
from pydantic_extra_types.pendulum_dt import Date as PydanticDate
from pydantic_extra_types.pendulum_dt import DateTime as PydanticDateTime
from typing_extensions import TypeAlias
Expand All @@ -15,5 +19,28 @@

def parse_datetime(
value: str,
**options: Any,
) -> PendulumDateTime | PendulumDate | PendulumTime | PendulumDuration:
return pendulum.parse(value)
return pendulum.parse(value, **options)


def format_diff(
diff: PendulumDuration,
is_now: bool = True,
absolute: bool = False,
locale: str | None = None,
) -> str:
return pendulum.format_diff(diff, is_now, absolute, locale)


def local_timezone() -> Timezone | FixedTimezone:
return pendulum.tz.local_timezone()


def from_format(
value: str,
fmt: str,
tz: str | Timezone = pendulum.tz.UTC,
locale: str | None = None,
) -> DateTime:
return DateTime.instance(pendulum.from_format(value, fmt, tz, locale))

0 comments on commit e73e1cb

Please # to comment.