From c3dd54d5dd20fb14ef84d2e17a58e64295f99d9d Mon Sep 17 00:00:00 2001 From: d3vyce Date: Tue, 20 May 2025 16:22:30 -0400 Subject: [PATCH 1/2] feat(scheduler): add interval --- taskiq/cli/scheduler/run.py | 87 +++++++++++++++++--------- taskiq/schedule_sources/label_based.py | 7 ++- taskiq/scheduler/scheduled_task/v1.py | 13 ++-- taskiq/scheduler/scheduled_task/v2.py | 9 +-- 4 files changed, 79 insertions(+), 37 deletions(-) diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py index 09467dc0..10063379 100644 --- a/taskiq/cli/scheduler/run.py +++ b/taskiq/cli/scheduler/run.py @@ -159,9 +159,12 @@ async def run_scheduler_loop( # noqa: C901 loop = asyncio.get_event_loop() running_schedules: Dict[str, asyncio.Task[Any]] = {} ran_cron_jobs: Set[str] = set() + interval_tasks_last_run: Dict[str, datetime] = {} current_minute = datetime.now(tz=pytz.UTC).minute + next_run: datetime = datetime.now(tz=pytz.UTC) while True: now = datetime.now(tz=pytz.UTC) + interval_next_run: datetime | None = None # If minute changed, we need to clear # ran_cron_jobs set and update current minute. if now.minute != current_minute: @@ -174,39 +177,62 @@ async def run_scheduler_loop( # noqa: C901 # otherwise we need assume that # we will run it at the start of the next minute. # as crontab does. - else: + elif next_run.minute == current_minute: next_run = (now + timedelta(minutes=1)).replace(second=1, microsecond=0) scheduled_tasks = await get_all_schedules(scheduler) for source, task_list in scheduled_tasks: logger.debug("Got %d schedules from source %s.", len(task_list), source) for task in task_list: - try: - task_delay = get_task_delay(task) - except ValueError: - logger.warning( - "Cannot parse cron: %s for task: %s, schedule_id: %s.", - task.cron, - task.task_name, - task.schedule_id, - ) - continue - # If task delay is None, we don't need to run it. - if task_delay is None: - continue - # If task is delayed for more than next_run, - # we don't need to run it, because we will - # run it in the next iteration. - if now + timedelta(seconds=task_delay) >= next_run: - continue - # If task is already running, we don't need to run it again. - if task.schedule_id in running_schedules and task_delay < 1: - continue - # If task is cron job, we need to check if - # we already ran it this minute. - if task.cron is not None: - if task.schedule_id in ran_cron_jobs: + if task.interval: + need_to_run = False + # If the task in not in interval_tasks_last_run we run it or + # if the task last run plus his interval in greater than now + if not interval_tasks_last_run.get(task.schedule_id, None) or ( + interval_tasks_last_run[task.schedule_id] + + timedelta(seconds=task.interval) + < now + ): + task_delay = 0 + interval_tasks_last_run[task.schedule_id] = now + need_to_run = True + # We calculate the minimum task interval next run + task_next_run = interval_tasks_last_run[ + task.schedule_id + ] + timedelta(seconds=task.interval) + if not interval_next_run or task_next_run < interval_next_run: + interval_next_run = task_next_run + # If the interval task don't need to run + if not need_to_run: continue - ran_cron_jobs.add(task.schedule_id) + else: + try: + task_delay = get_task_delay(task) + except ValueError: + logger.warning( + "Cannot parse cron: %s for task: %s, schedule_id: %s.", + task.cron, + task.task_name, + task.schedule_id, + ) + continue + # If task delay is None, we don't need to run it. + if task_delay is None: + continue + # If task is delayed for more than next_run, + # we don't need to run it, because we will + # run it in the next iteration. + if now + timedelta(seconds=task_delay) >= next_run: + continue + # If task is already running, we don't need to run it again. + if task.schedule_id in running_schedules and task_delay < 1: + continue + # If task is cron job, we need to check if + # we already ran it this minute. + if task.cron is not None: + if task.schedule_id in ran_cron_jobs: + continue + ran_cron_jobs.add(task.schedule_id) + send_task = loop.create_task( delayed_send(scheduler, source, task, task_delay), # We need to set the name of the task @@ -220,7 +246,12 @@ async def run_scheduler_loop( # noqa: C901 task_future.get_name().removeprefix("schedule_"), ), ) - delay = next_run - datetime.now(tz=pytz.UTC) + # We set the delay with the lowest value between next_run and interval_next_run + delay = ( + interval_next_run + if interval_next_run and interval_next_run < next_run + else next_run + ) - now logger.debug( "Sleeping for %.2f seconds before getting schedules.", delay.total_seconds(), diff --git a/taskiq/schedule_sources/label_based.py b/taskiq/schedule_sources/label_based.py index 94fd42de..aad233d3 100644 --- a/taskiq/schedule_sources/label_based.py +++ b/taskiq/schedule_sources/label_based.py @@ -43,7 +43,11 @@ async def startup(self) -> None: ) continue for schedule in task.labels.get("schedule", []): - if "cron" not in schedule and "time" not in schedule: + if ( + "cron" not in schedule + and "time" not in schedule + and "interval" not in schedule + ): continue labels = schedule.get("labels", {}) labels.update(task.labels) @@ -58,6 +62,7 @@ async def startup(self) -> None: cron=schedule.get("cron"), time=schedule.get("time"), cron_offset=schedule.get("cron_offset"), + interval=schedule.get("interval"), ) return await super().startup() diff --git a/taskiq/scheduler/scheduled_task/v1.py b/taskiq/scheduler/scheduled_task/v1.py index 5209f61e..205e66e8 100644 --- a/taskiq/scheduler/scheduled_task/v1.py +++ b/taskiq/scheduler/scheduled_task/v1.py @@ -16,15 +16,20 @@ class ScheduledTask(BaseModel): cron: Optional[str] = None cron_offset: Optional[Union[str, timedelta]] = None time: Optional[datetime] = None + interval: Optional[int] = None @root_validator(pre=False) # type: ignore @classmethod def __check(cls, values: Dict[str, Any]) -> Dict[str, Any]: """ - This method validates, that either `cron` or `time` field is present. + This method validates, that either `cron`, `time` or `interval` field is present. - :raises ValueError: if cron and time are none. + :raises ValueError: if cron, time and interval are none. """ - if values.get("cron") is None and values.get("time") is None: - raise ValueError("Either cron or datetime must be present.") + if ( + values.get("cron") is None + and values.get("time") is None + and values.get("interval") is None + ): + raise ValueError("Either cron, datetime or interval must be present.") return values diff --git a/taskiq/scheduler/scheduled_task/v2.py b/taskiq/scheduler/scheduled_task/v2.py index 332dce5d..2176969d 100644 --- a/taskiq/scheduler/scheduled_task/v2.py +++ b/taskiq/scheduler/scheduled_task/v2.py @@ -17,14 +17,15 @@ class ScheduledTask(BaseModel): cron: Optional[str] = None cron_offset: Optional[Union[str, timedelta]] = None time: Optional[datetime] = None + interval: Optional[int] = None @model_validator(mode="after") def __check(self) -> Self: """ - This method validates, that either `cron` or `time` field is present. + This method validates, that either `cron`, `time` or `interval` field is present. - :raises ValueError: if cron and time are none. + :raises ValueError: if cron, time and interval are none. """ - if self.cron is None and self.time is None: - raise ValueError("Either cron or datetime must be present.") + if self.cron is None and self.time is None and self.interval is None: + raise ValueError("Either cron, datetime or interval must be present.") return self From fcb1599bec08ec8c30d6a52c1321ef350f5b5e37 Mon Sep 17 00:00:00 2001 From: d3vyce Date: Fri, 27 Jun 2025 16:13:19 -0400 Subject: [PATCH 2/2] Add doc/tests for interval scheduler --- docs/available-components/schedule-sources.md | 6 ++++-- tests/schedule_sources/test_label_based.py | 2 ++ tests/scheduler/test_label_based_sched.py | 2 ++ 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/docs/available-components/schedule-sources.md b/docs/available-components/schedule-sources.md index 4502ac0b..148dcf23 100644 --- a/docs/available-components/schedule-sources.md +++ b/docs/available-components/schedule-sources.md @@ -34,9 +34,10 @@ The format of the schedule label is the following: @broker.task( schedule=[ { - "cron": "* * * * *", # type: str, either cron or time should be specified. + "cron": "* * * * *", # type: str, either cron, time or interval should be specified. "cron_offset": None # type: str | timedelta | None, can be omitted. - "time": None # type: datetime | None, either cron or time should be specified. + "time": None # type: datetime | None, either cron, time or interval should be specified. + "interval": None # type: int | NOne, either cron, time or interval should be specified. "args": [], # type List[Any] | None, can be omitted. "kwargs": {}, # type: Dict[str, Any] | None, can be omitted. "labels": {}, # type: Dict[str, Any] | None, can be omitted. @@ -52,6 +53,7 @@ Parameters: - `cron` - crontab string when to run the task. - `cron_offset` - timezone offset for cron values. Explained [here](../guide/scheduling-tasks.md#working-with-timezones) - `time` - specific time when send the task. +- `interval` - interval in second between sending the task. - `args` - args to use, when invoking the task. - `kwargs` - key-word arguments to use when invoking the task. - `labels` - additional labels to use when invoking the task. diff --git a/tests/schedule_sources/test_label_based.py b/tests/schedule_sources/test_label_based.py index fa621b07..bef2d24c 100644 --- a/tests/schedule_sources/test_label_based.py +++ b/tests/schedule_sources/test_label_based.py @@ -15,6 +15,7 @@ [ pytest.param([{"cron": "* * * * *"}], id="cron"), pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"), + pytest.param([{"interval": "10"}], id="interval"), ], ) async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None: @@ -35,6 +36,7 @@ def task() -> None: schedule_id=schedules[0].schedule_id, cron=schedule_label[0].get("cron"), time=schedule_label[0].get("time"), + interval=schedule_label[0].get("interval"), task_name="test_task", labels={"schedule": schedule_label}, args=[], diff --git a/tests/scheduler/test_label_based_sched.py b/tests/scheduler/test_label_based_sched.py index 506990b0..2076e936 100644 --- a/tests/scheduler/test_label_based_sched.py +++ b/tests/scheduler/test_label_based_sched.py @@ -20,6 +20,7 @@ [ pytest.param([{"cron": "* * * * *"}], id="cron"), pytest.param([{"time": datetime.now(pytz.UTC)}], id="time"), + pytest.param([{"interval": "10"}], id="interval"), ], ) async def test_label_discovery(schedule_label: List[Dict[str, Any]]) -> None: @@ -40,6 +41,7 @@ def task() -> None: schedule_id=schedules[0].schedule_id, cron=schedule_label[0].get("cron"), time=schedule_label[0].get("time"), + interval=schedule_label[0].get("interval"), task_name="test_task", labels={"schedule": schedule_label}, args=[],