Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add function for task maintenance #18

Merged
merged 3 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions src/grimoirelab/core/runner/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@

import click
import django.core
import django.core.wsgi

from django.conf import settings

from grimoirelab.core.scheduler.scheduler import maintain_tasks

if typing.TYPE_CHECKING:
from click import Context
Expand Down Expand Up @@ -59,9 +64,6 @@ def server(ctx: Context, devel: bool):

if devel:
env["GRIMOIRELAB_DEBUG"] = "true"

from django.conf import settings

env["UWSGI_HTTP"] = env.get("GRIMOIRELAB_HTTP_DEV", "127.0.0.1:8000")
env["UWSGI_STATIC_MAP"] = settings.STATIC_URL + "=" + settings.STATIC_ROOT
else:
Expand All @@ -80,6 +82,11 @@ def server(ctx: Context, devel: bool):
env["UWSGI_LAZY_APPS"] = "true"
env["UWSGI_SINGLE_INTERPRETER"] = "true"

# Run maintenance tasks
_ = django.core.wsgi.get_wsgi_application()
maintain_tasks()

# Run the server
os.execvp("uwsgi", ("uwsgi",))


Expand All @@ -101,8 +108,6 @@ def eventizers(workers: int):
Workers get jobs from the Q_PERCEVAL_JOBS queue defined in the
configuration file.
"""
from django.conf import settings

django.core.management.call_command(
'rqworker-pool', settings.Q_PERCEVAL_JOBS,
num_workers=workers
Expand Down
14 changes: 13 additions & 1 deletion src/grimoirelab/core/scheduler/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,26 @@

import typing

from .models import get_all_registered_task_models
from .models import SchedulerStatus, get_all_registered_task_models
from .errors import NotFoundError


if typing.TYPE_CHECKING:
from typing import Iterator
from .models import Task, Job


def find_tasks_by_status(statuses: list[SchedulerStatus]) -> Iterator[Task]:
"""Find tasks by their status.

:param statuses: list of status to filter tasks by.

:returns: iterator of tasks with the given status.
"""
for task_class, _ in get_all_registered_task_models():
yield from task_class.objects.filter(status__in=statuses).iterator()


def find_task(task_uuid: str) -> Task:
"""Find a task by its uuid.

Expand Down
63 changes: 56 additions & 7 deletions src/grimoirelab/core/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import uuid

import django_rq
import rq.exceptions
import rq.job

from django.conf import settings

from grimoirelab_toolkit.datetime import datetime_utcnow

from .db import (
find_tasks_by_status,
find_job,
find_task
)
Expand Down Expand Up @@ -104,6 +106,38 @@ def cancel_task(task_uuid: str) -> None:
task.delete()


def maintain_tasks() -> None:
"""Maintain the tasks that are scheduled to be executed.

This function will check the status of the tasks and jobs
that are scheduled, rescheduling them if necessary.
"""
tasks = find_tasks_by_status(
[
SchedulerStatus.RUNNING,
SchedulerStatus.RECOVERY,
SchedulerStatus.ENQUEUED,
SchedulerStatus.NEW
]
)

for task in tasks:
job_db = task.jobs.order_by('scheduled_at').first()

try:
rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection())
continue
except rq.exceptions.NoSuchJobError:
logger.debug(
f"Job #{job_db.job_id} in queue (task: {task.task_id}) missing. Rescheduling."
)

current_time = datetime_utcnow()
scheduled_at = task.scheduled_at if task.scheduled_at > current_time else current_time

_schedule_job(task, job_db, scheduled_at, job_db.job_args)


def _enqueue_task(
task: Task,
scheduled_at: datetime.datetime | None = None
Expand Down Expand Up @@ -138,9 +172,29 @@ def _enqueue_task(
task=task
)

_schedule_job(task, job, scheduled_at, job_args)

logger.info(
f"Job #{job.job_id} (task: {task.task_id})"
f" enqueued in '{job.queue}' at {scheduled_at}"
)

return job


def _schedule_job(
task: Task,
job: Job,
scheduled_at: datetime.datetime,
job_args: dict[str, Any]
) -> rq.job.Job:
"""Schedule the job to be executed."""

queue = task.default_job_queue

try:
queue_rq = django_rq.get_queue(queue)
queue_rq.enqueue_at(
rq_job = queue_rq.enqueue_at(
datetime=scheduled_at,
f=task.job_function,
result_ttl=settings.GRIMOIRELAB_JOB_RESULT_TTL,
Expand All @@ -163,12 +217,7 @@ def _enqueue_task(
job.save()
task.save()

logger.info(
f"Job #{job.job_id} (task: {task.task_id})"
f" enqueued in '{job.queue}' at {scheduled_at}"
)

return job
return rq_job


def _on_success_callback(
Expand Down
88 changes: 88 additions & 0 deletions tests/scheduler/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import django.test.utils

from grimoirelab.core.scheduler.db import (
find_tasks_by_status,
find_task,
find_job
)
from grimoirelab.core.scheduler.errors import NotFoundError
from grimoirelab.core.scheduler.models import (
SchedulerStatus,
Task,
register_task_model,
GRIMOIRELAB_TASK_MODELS
Expand All @@ -45,6 +47,92 @@ class AnotherDummyTaskDB(Task):
TASK_TYPE = 'another_dummy_task'


class TestFindTasksByStatus(GrimoireLabTestCase):
"""Unit tests for find_task_by_status function"""

@classmethod
def setUpClass(cls):
_, cls.DummyJobClass = register_task_model('dummy_task', DummyTaskDB)
_, cls.AnotherDummyJobClass = register_task_model('another_dummy_task', AnotherDummyTaskDB)
super().setUpClass()

@classmethod
def tearDownClass(cls):
GRIMOIRELAB_TASK_MODELS.clear()
super().tearDownClass()

def setUp(self):
"""Create the test model"""

def cleanup_test_model():
with django.db.connection.schema_editor() as schema_editor:
schema_editor.delete_model(self.DummyJobClass)
schema_editor.delete_model(DummyTaskDB)
schema_editor.delete_model(self.AnotherDummyJobClass)
schema_editor.delete_model(AnotherDummyTaskDB)

with django.db.connection.schema_editor() as schema_editor:
schema_editor.create_model(DummyTaskDB)
schema_editor.create_model(self.DummyJobClass)
schema_editor.create_model(AnotherDummyTaskDB)
schema_editor.create_model(self.AnotherDummyJobClass)

self.addCleanup(cleanup_test_model)
super().setUp()

def test_find_tasks_by_status(self):
"""Find a task by status"""

dummy_task = DummyTaskDB.create_task(
{'arg': 'value'}, 15, 10
)
dummy_task.status = SchedulerStatus.NEW
dummy_task.save()

another_dummy_task = AnotherDummyTaskDB.create_task(
{'arg': 'value'}, 15, 10
)
another_dummy_task.status = SchedulerStatus.NEW
another_dummy_task.save()

task = AnotherDummyTaskDB.create_task(
{'arg': 'value'}, 15, 10
)
task.status = SchedulerStatus.RUNNING
task.save()

task = DummyTaskDB.create_task(
{'arg': 'value'}, 15, 10
)
task.status = SchedulerStatus.FAILED
task.save()

expected = {
dummy_task, another_dummy_task, task
}

result = find_tasks_by_status([SchedulerStatus.NEW, SchedulerStatus.FAILED])
self.assertSetEqual(set(result), expected)

def test_find_tasks_empty(self):
"""No tasks are found for a given status"""

dummy_task = DummyTaskDB.create_task(
{'arg': 'value'}, 15, 10
)
dummy_task.status = SchedulerStatus.NEW
dummy_task.save()

another_dummy_task = AnotherDummyTaskDB.create_task(
{'arg': 'value'}, 15, 10
)
another_dummy_task.status = SchedulerStatus.RUNNING
another_dummy_task.save()

result = find_tasks_by_status([SchedulerStatus.FAILED])
self.assertSetEqual(set(result), set())


class TestFindTask(GrimoireLabTestCase):
"""Unit tests for find_task function"""

Expand Down
Loading
Loading