Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Periodic tasks on top of the per-second main loop runner #324

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from

Conversation

gencurrent
Copy link

@gencurrent gencurrent commented May 1, 2024

Updates

  1. Per-second main event loop cycle;
  2. ScheduledTask.period field is added;
  3. Periodic tasks execution feature has been added.

Design ways

  1. The current approach with iterating over scheduled tasks each second.
  2. The 2-stages approach:
    a. Iterate over each minute as in the AS-IS solution.
    b. Add a second 60-seconds over-each-second iteration cycle to iterates through to be launched each minute.
    The second approach looks like:
while True:
  ...
  next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(minutes=1)
  while datetime.now().replace(second=0, microsecond=0) != next_minute:
    ...
    next_second = datetime.now().replace(microsecond=0) + timedelta(seconds=1)
    await asyncio.sleep((next_second - datetime.now()).total_seconds())

Notes:

  • The Documentation has to be updated after the PR is merged.

The example to run the feature against is main_test.py in the root directory:

# # broker.py
import asyncio
from taskiq.brokers.inmemory_broker import InMemoryBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler

broker = InMemoryBroker()

scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)


@broker.task(schedule=[{"cron": "* * * * *", "args": [1]}])
async def each_minute_cron(value: int) -> int:
    print(f"The {each_minute_cron.__qualname__} task has been launched")
    await asyncio.sleep(0.5)
    return value + 1


@broker.task(schedule=[{"period": 2, "args": [1]}])
async def each_2_seconds_task(value: int) -> int:
    print(f"The {each_2_seconds_task.__qualname__} task has been launched")
    await asyncio.sleep(0.5)
    return value + 1

… added; 3) Periodic tasks execution feature has been added
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant