From 3c986430831ecf7a21882df1a415289bb3f5417f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Due=C3=B1as?= Date: Fri, 13 Dec 2024 16:42:37 +0100 Subject: [PATCH 1/3] [scheduler:db] Add 'find_tasks_by_status' function MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This function allows to find a list of tasks that share the same status. Signed-off-by: Santiago Dueñas --- src/grimoirelab/core/scheduler/db.py | 14 ++++- tests/scheduler/test_db.py | 88 ++++++++++++++++++++++++++++ 2 files changed, 101 insertions(+), 1 deletion(-) diff --git a/src/grimoirelab/core/scheduler/db.py b/src/grimoirelab/core/scheduler/db.py index 54142e4..32fd83c 100644 --- a/src/grimoirelab/core/scheduler/db.py +++ b/src/grimoirelab/core/scheduler/db.py @@ -20,14 +20,26 @@ import typing -from .models import get_all_registered_task_models +from .models import SchedulerStatus, get_all_registered_task_models from .errors import NotFoundError if typing.TYPE_CHECKING: + from typing import Iterator from .models import Task, Job +def find_tasks_by_status(statuses: list[SchedulerStatus]) -> Iterator[Task]: + """Find tasks by their status. + + :param statuses: list of status to filter tasks by. + + :returns: iterator of tasks with the given status. + """ + for task_class, _ in get_all_registered_task_models(): + yield from task_class.objects.filter(status__in=statuses).iterator() + + def find_task(task_uuid: str) -> Task: """Find a task by its uuid. diff --git a/tests/scheduler/test_db.py b/tests/scheduler/test_db.py index 6bc060a..27b5006 100644 --- a/tests/scheduler/test_db.py +++ b/tests/scheduler/test_db.py @@ -20,11 +20,13 @@ import django.test.utils from grimoirelab.core.scheduler.db import ( + find_tasks_by_status, find_task, find_job ) from grimoirelab.core.scheduler.errors import NotFoundError from grimoirelab.core.scheduler.models import ( + SchedulerStatus, Task, register_task_model, GRIMOIRELAB_TASK_MODELS @@ -45,6 +47,92 @@ class AnotherDummyTaskDB(Task): TASK_TYPE = 'another_dummy_task' +class TestFindTasksByStatus(GrimoireLabTestCase): + """Unit tests for find_task_by_status function""" + + @classmethod + def setUpClass(cls): + _, cls.DummyJobClass = register_task_model('dummy_task', DummyTaskDB) + _, cls.AnotherDummyJobClass = register_task_model('another_dummy_task', AnotherDummyTaskDB) + super().setUpClass() + + @classmethod + def tearDownClass(cls): + GRIMOIRELAB_TASK_MODELS.clear() + super().tearDownClass() + + def setUp(self): + """Create the test model""" + + def cleanup_test_model(): + with django.db.connection.schema_editor() as schema_editor: + schema_editor.delete_model(self.DummyJobClass) + schema_editor.delete_model(DummyTaskDB) + schema_editor.delete_model(self.AnotherDummyJobClass) + schema_editor.delete_model(AnotherDummyTaskDB) + + with django.db.connection.schema_editor() as schema_editor: + schema_editor.create_model(DummyTaskDB) + schema_editor.create_model(self.DummyJobClass) + schema_editor.create_model(AnotherDummyTaskDB) + schema_editor.create_model(self.AnotherDummyJobClass) + + self.addCleanup(cleanup_test_model) + super().setUp() + + def test_find_tasks_by_status(self): + """Find a task by status""" + + dummy_task = DummyTaskDB.create_task( + {'arg': 'value'}, 15, 10 + ) + dummy_task.status = SchedulerStatus.NEW + dummy_task.save() + + another_dummy_task = AnotherDummyTaskDB.create_task( + {'arg': 'value'}, 15, 10 + ) + another_dummy_task.status = SchedulerStatus.NEW + another_dummy_task.save() + + task = AnotherDummyTaskDB.create_task( + {'arg': 'value'}, 15, 10 + ) + task.status = SchedulerStatus.RUNNING + task.save() + + task = DummyTaskDB.create_task( + {'arg': 'value'}, 15, 10 + ) + task.status = SchedulerStatus.FAILED + task.save() + + expected = { + dummy_task, another_dummy_task, task + } + + result = find_tasks_by_status([SchedulerStatus.NEW, SchedulerStatus.FAILED]) + self.assertSetEqual(set(result), expected) + + def test_find_tasks_empty(self): + """No tasks are found for a given status""" + + dummy_task = DummyTaskDB.create_task( + {'arg': 'value'}, 15, 10 + ) + dummy_task.status = SchedulerStatus.NEW + dummy_task.save() + + another_dummy_task = AnotherDummyTaskDB.create_task( + {'arg': 'value'}, 15, 10 + ) + another_dummy_task.status = SchedulerStatus.RUNNING + another_dummy_task.save() + + result = find_tasks_by_status([SchedulerStatus.FAILED]) + self.assertSetEqual(set(result), set()) + + class TestFindTask(GrimoireLabTestCase): """Unit tests for find_task function""" From 9176779be16b3053c3191f27d72d00f8e0debb46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Due=C3=B1as?= Date: Fri, 13 Dec 2024 16:46:04 +0100 Subject: [PATCH 2/3] [scheduler] Add function to maintain task MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This function acts like a garbage collector checking if there are inconsistencies in tasks and trying to fix these problems. For example, a task has a RUNNING status but there's no job running. In this case, the function will be re-schedule again the task creating a new job. Signed-off-by: Santiago Dueñas --- src/grimoirelab/core/scheduler/scheduler.py | 63 +++++++++-- tests/scheduler/test_scheduler.py | 113 ++++++++++++++++++++ 2 files changed, 169 insertions(+), 7 deletions(-) diff --git a/src/grimoirelab/core/scheduler/scheduler.py b/src/grimoirelab/core/scheduler/scheduler.py index 501aeb4..387beeb 100644 --- a/src/grimoirelab/core/scheduler/scheduler.py +++ b/src/grimoirelab/core/scheduler/scheduler.py @@ -24,6 +24,7 @@ import uuid import django_rq +import rq.exceptions import rq.job from django.conf import settings @@ -31,6 +32,7 @@ from grimoirelab_toolkit.datetime import datetime_utcnow from .db import ( + find_tasks_by_status, find_job, find_task ) @@ -104,6 +106,38 @@ def cancel_task(task_uuid: str) -> None: task.delete() +def maintain_tasks() -> None: + """Maintain the tasks that are scheduled to be executed. + + This function will check the status of the tasks and jobs + that are scheduled, rescheduling them if necessary. + """ + tasks = find_tasks_by_status( + [ + SchedulerStatus.RUNNING, + SchedulerStatus.RECOVERY, + SchedulerStatus.ENQUEUED, + SchedulerStatus.NEW + ] + ) + + for task in tasks: + job_db = task.jobs.order_by('scheduled_at').first() + + try: + rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + continue + except rq.exceptions.NoSuchJobError: + logger.debug( + f"Job #{job_db.job_id} in queue (task: {task.task_id}) missing. Rescheduling." + ) + + current_time = datetime_utcnow() + scheduled_at = task.scheduled_at if task.scheduled_at > current_time else current_time + + _schedule_job(task, job_db, scheduled_at, job_db.job_args) + + def _enqueue_task( task: Task, scheduled_at: datetime.datetime | None = None @@ -138,9 +172,29 @@ def _enqueue_task( task=task ) + _schedule_job(task, job, scheduled_at, job_args) + + logger.info( + f"Job #{job.job_id} (task: {task.task_id})" + f" enqueued in '{job.queue}' at {scheduled_at}" + ) + + return job + + +def _schedule_job( + task: Task, + job: Job, + scheduled_at: datetime.datetime, + job_args: dict[str, Any] +) -> rq.job.Job: + """Schedule the job to be executed.""" + + queue = task.default_job_queue + try: queue_rq = django_rq.get_queue(queue) - queue_rq.enqueue_at( + rq_job = queue_rq.enqueue_at( datetime=scheduled_at, f=task.job_function, result_ttl=settings.GRIMOIRELAB_JOB_RESULT_TTL, @@ -163,12 +217,7 @@ def _enqueue_task( job.save() task.save() - logger.info( - f"Job #{job.job_id} (task: {task.task_id})" - f" enqueued in '{job.queue}' at {scheduled_at}" - ) - - return job + return rq_job def _on_success_callback( diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py index 7ee1573..18e498e 100644 --- a/tests/scheduler/test_scheduler.py +++ b/tests/scheduler/test_scheduler.py @@ -35,6 +35,7 @@ from grimoirelab.core.scheduler.scheduler import ( schedule_task, cancel_task, + maintain_tasks, _enqueue_task, _on_success_callback, _on_failure_callback @@ -216,6 +217,118 @@ def test_error_enqueuing_task(self, mock_get_queue): _enqueue_task(task, scheduled_at=None) +class TestMaintainTasks(GrimoireLabTestCase): + """Class for testing the maintenance of tasks""" + + def setUp(self): + GRIMOIRELAB_TASK_MODELS.clear() + task_class, job_class = register_task_model('test_task', SchedulerTestTask) + + def cleanup_test_model(): + with django.db.connection.schema_editor() as schema_editor: + schema_editor.delete_model(job_class) + schema_editor.delete_model(task_class) + + with django.db.connection.schema_editor() as schema_editor: + schema_editor.create_model(task_class) + schema_editor.create_model(job_class) + + self.addCleanup(cleanup_test_model) + super().setUp() + + def test_maintain_tasks_reschedule(self): + """Tasks with inconsistent state are re-scheduled""" + + task_args = { + 'a': 1, + 'b': 2, + } + + task1 = schedule_task('test_task', task_args) + task2 = schedule_task('test_task', task_args) + + # Delete one of the jobs manually to create the inconsistent state + job_db = task2.jobs.first() + job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + job_rq.delete() + + # Run the maintenance tasks + before_dt = grimoirelab_toolkit.datetime.datetime_utcnow() + maintain_tasks() + after_dt = grimoirelab_toolkit.datetime.datetime_utcnow() + + # Check if jobs were re-scheduled + + # Task1 was't re-scheduled + job_db = task1.jobs.first() + self.assertLessEqual(job_db.last_modified, before_dt) + self.assertLessEqual(job_db.last_modified, after_dt) + + # Task2 was re-scheduler + job_db = task2.jobs.first() + self.assertGreaterEqual(job_db.last_modified, before_dt) + self.assertLessEqual(job_db.last_modified, after_dt) + + job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + self.assertEqual(job_rq.id, job_db.uuid) + + def test_maintain_tasks_reschedule_expired_scheduled_at(self): + """Tasks with inconsistent state with expired scheduled time are re-scheduled with current time""" + + task_args = { + 'a': 1, + 'b': 2, + } + + before_dt = grimoirelab_toolkit.datetime.datetime_utcnow() + task = schedule_task('test_task', task_args) + before_dt = grimoirelab_toolkit.datetime.datetime_utcnow() + + # Delete job manually to create the inconsistent state + job_db = task.jobs.first() + job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + job_rq.delete() + + # Run the maintenance tasks + before_dt = grimoirelab_toolkit.datetime.datetime_utcnow() + maintain_tasks() + after_dt = grimoirelab_toolkit.datetime.datetime_utcnow() + + # Check if jobs were re-scheduled with a different time + job_db = task.jobs.first() + self.assertLessEqual(task.scheduled_at, before_dt) + self.assertLessEqual(task.scheduled_at, after_dt) + + job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + self.assertEqual(job_rq.id, job_db.uuid) + + def test_maintain_tasks_reschedule_non_expired_scheduled_at(self): + """Tasks with inconsistent state are re-scheduled keeping the same scheduled time""" + + task_args = { + 'a': 1, + 'b': 2, + } + schedule_time = datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc) + + task = SchedulerTestTask.create_task(task_args, 360, 10) + job_db = _enqueue_task(task, scheduled_at=schedule_time) + + # Delete job manually to create the inconsistent state + job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + job_rq.delete() + + # Run the maintenance tasks + maintain_tasks() + + # Check if jobs were re-scheduled with a different time + job_db = task.jobs.first() + self.assertLessEqual(task.scheduled_at, schedule_time) + + job_rq = rq.job.Job.fetch(job_db.uuid, connection=django_rq.get_connection()) + self.assertEqual(job_rq.id, job_db.uuid) + + class OnSuccessCallbackTestTask(Task): """Class for testing on success callback calls""" From 7ba4cbeab23cc9393cb6463f6e42de37714a07b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Santiago=20Due=C3=B1as?= Date: Fri, 13 Dec 2024 16:57:18 +0100 Subject: [PATCH 3/3] [cmd] Run maintenance tasks when the server starts MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The maintenance tasks will ensure that the status of the tasks are in a consistent state. Signed-off-by: Santiago Dueñas --- src/grimoirelab/core/runner/commands/run.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/grimoirelab/core/runner/commands/run.py b/src/grimoirelab/core/runner/commands/run.py index a903ed9..e17c88e 100644 --- a/src/grimoirelab/core/runner/commands/run.py +++ b/src/grimoirelab/core/runner/commands/run.py @@ -23,6 +23,11 @@ import click import django.core +import django.core.wsgi + +from django.conf import settings + +from grimoirelab.core.scheduler.scheduler import maintain_tasks if typing.TYPE_CHECKING: from click import Context @@ -59,9 +64,6 @@ def server(ctx: Context, devel: bool): if devel: env["GRIMOIRELAB_DEBUG"] = "true" - - from django.conf import settings - env["UWSGI_HTTP"] = env.get("GRIMOIRELAB_HTTP_DEV", "127.0.0.1:8000") env["UWSGI_STATIC_MAP"] = settings.STATIC_URL + "=" + settings.STATIC_ROOT else: @@ -80,6 +82,11 @@ def server(ctx: Context, devel: bool): env["UWSGI_LAZY_APPS"] = "true" env["UWSGI_SINGLE_INTERPRETER"] = "true" + # Run maintenance tasks + _ = django.core.wsgi.get_wsgi_application() + maintain_tasks() + + # Run the server os.execvp("uwsgi", ("uwsgi",)) @@ -101,8 +108,6 @@ def eventizers(workers: int): Workers get jobs from the Q_PERCEVAL_JOBS queue defined in the configuration file. """ - from django.conf import settings - django.core.management.call_command( 'rqworker-pool', settings.Q_PERCEVAL_JOBS, num_workers=workers