Skip to content

feat(scheduler): add interval #466

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 59 additions & 28 deletions taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,12 @@ async def run_scheduler_loop( # noqa: C901
loop = asyncio.get_event_loop()
running_schedules: Dict[str, asyncio.Task[Any]] = {}
ran_cron_jobs: Set[str] = set()
interval_tasks_last_run: Dict[str, datetime] = {}
current_minute = datetime.now(tz=pytz.UTC).minute
next_run: datetime = datetime.now(tz=pytz.UTC)
while True:
now = datetime.now(tz=pytz.UTC)
interval_next_run: datetime | None = None
# If minute changed, we need to clear
# ran_cron_jobs set and update current minute.
if now.minute != current_minute:
Expand All @@ -174,39 +177,62 @@ async def run_scheduler_loop( # noqa: C901
# otherwise we need assume that
# we will run it at the start of the next minute.
# as crontab does.
else:
elif next_run.minute == current_minute:
next_run = (now + timedelta(minutes=1)).replace(second=1, microsecond=0)
scheduled_tasks = await get_all_schedules(scheduler)
for source, task_list in scheduled_tasks:
logger.debug("Got %d schedules from source %s.", len(task_list), source)
for task in task_list:
try:
task_delay = get_task_delay(task)
except ValueError:
logger.warning(
"Cannot parse cron: %s for task: %s, schedule_id: %s.",
task.cron,
task.task_name,
task.schedule_id,
)
continue
# If task delay is None, we don't need to run it.
if task_delay is None:
continue
# If task is delayed for more than next_run,
# we don't need to run it, because we will
# run it in the next iteration.
if now + timedelta(seconds=task_delay) >= next_run:
continue
# If task is already running, we don't need to run it again.
if task.schedule_id in running_schedules and task_delay < 1:
continue
# If task is cron job, we need to check if
# we already ran it this minute.
if task.cron is not None:
if task.schedule_id in ran_cron_jobs:
if task.interval:
need_to_run = False
# If the task in not in interval_tasks_last_run we run it or
# if the task last run plus his interval in greater than now
if not interval_tasks_last_run.get(task.schedule_id, None) or (
interval_tasks_last_run[task.schedule_id]
+ timedelta(seconds=task.interval)
< now
):
task_delay = 0
interval_tasks_last_run[task.schedule_id] = now
need_to_run = True
# We calculate the minimum task interval next run
task_next_run = interval_tasks_last_run[
task.schedule_id
] + timedelta(seconds=task.interval)
if not interval_next_run or task_next_run < interval_next_run:
interval_next_run = task_next_run
# If the interval task don't need to run
if not need_to_run:
continue
ran_cron_jobs.add(task.schedule_id)
else:
try:
task_delay = get_task_delay(task)
except ValueError:
logger.warning(
"Cannot parse cron: %s for task: %s, schedule_id: %s.",
task.cron,
task.task_name,
task.schedule_id,
)
continue
# If task delay is None, we don't need to run it.
if task_delay is None:
continue
# If task is delayed for more than next_run,
# we don't need to run it, because we will
# run it in the next iteration.
if now + timedelta(seconds=task_delay) >= next_run:
continue
# If task is already running, we don't need to run it again.
if task.schedule_id in running_schedules and task_delay < 1:
continue
# If task is cron job, we need to check if
# we already ran it this minute.
if task.cron is not None:
if task.schedule_id in ran_cron_jobs:
continue
ran_cron_jobs.add(task.schedule_id)

send_task = loop.create_task(
delayed_send(scheduler, source, task, task_delay),
# We need to set the name of the task
Expand All @@ -220,7 +246,12 @@ async def run_scheduler_loop( # noqa: C901
task_future.get_name().removeprefix("schedule_"),
),
)
delay = next_run - datetime.now(tz=pytz.UTC)
# We set the delay with the lowest value between next_run and interval_next_run
delay = (
interval_next_run
if interval_next_run and interval_next_run < next_run
else next_run
) - now
logger.debug(
"Sleeping for %.2f seconds before getting schedules.",
delay.total_seconds(),
Expand Down
7 changes: 6 additions & 1 deletion taskiq/schedule_sources/label_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,11 @@ async def startup(self) -> None:
)
continue
for schedule in task.labels.get("schedule", []):
if "cron" not in schedule and "time" not in schedule:
if (
"cron" not in schedule
and "time" not in schedule
and "interval" not in schedule
):
continue
labels = schedule.get("labels", {})
labels.update(task.labels)
Expand All @@ -58,6 +62,7 @@ async def startup(self) -> None:
cron=schedule.get("cron"),
time=schedule.get("time"),
cron_offset=schedule.get("cron_offset"),
interval=schedule.get("interval"),
)

return await super().startup()
Expand Down
13 changes: 9 additions & 4 deletions taskiq/scheduler/scheduled_task/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@ class ScheduledTask(BaseModel):
cron: Optional[str] = None
cron_offset: Optional[Union[str, timedelta]] = None
time: Optional[datetime] = None
interval: Optional[int] = None

@root_validator(pre=False) # type: ignore
@classmethod
def __check(cls, values: Dict[str, Any]) -> Dict[str, Any]:
"""
This method validates, that either `cron` or `time` field is present.
This method validates, that either `cron`, `time` or `interval` field is present.

:raises ValueError: if cron and time are none.
:raises ValueError: if cron, time and interval are none.
"""
if values.get("cron") is None and values.get("time") is None:
raise ValueError("Either cron or datetime must be present.")
if (
values.get("cron") is None
and values.get("time") is None
and values.get("interval") is None
):
raise ValueError("Either cron, datetime or interval must be present.")
return values
9 changes: 5 additions & 4 deletions taskiq/scheduler/scheduled_task/v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ class ScheduledTask(BaseModel):
cron: Optional[str] = None
cron_offset: Optional[Union[str, timedelta]] = None
time: Optional[datetime] = None
interval: Optional[int] = None

@model_validator(mode="after")
def __check(self) -> Self:
"""
This method validates, that either `cron` or `time` field is present.
This method validates, that either `cron`, `time` or `interval` field is present.

:raises ValueError: if cron and time are none.
:raises ValueError: if cron, time and interval are none.
"""
if self.cron is None and self.time is None:
raise ValueError("Either cron or datetime must be present.")
if self.cron is None and self.time is None and self.interval is None:
raise ValueError("Either cron, datetime or interval must be present.")
return self