diff --git a/procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py b/procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py new file mode 100644 index 000000000..794363039 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from django.db import migrations + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL( + name="02.14.01_01_add_indexes_for_fetch_job.sql" + ) + ] + name = "0031_add_indexes_for_fetch_job" + dependencies = [("procrastinate", "0030_alter_procrastinateevent_options")] diff --git a/procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql b/procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql new file mode 100644 index 000000000..0498567d5 --- /dev/null +++ b/procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql @@ -0,0 +1,7 @@ +-- recreate procrastinate_jobs_id_lock_idx index by adding aborting status to the filter so that it can be used by the fetch job function +CREATE INDEX procrastinate_jobs_id_lock_idx_temp ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]); +DROP INDEX procrastinate_jobs_id_lock_idx; +ALTER INDEX procrastinate_jobs_id_lock_idx_temp RENAME TO procrastinate_jobs_id_lock_idx; + +-- add index to avoid seq scan of outer query in the fetch job function +CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status); diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index a2fe4d950..4dd7bd59a 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -65,7 +65,9 @@ CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs ( CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name); -CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]); +CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]); +CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status); + CREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id); diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index 4056f4f07..953337d43 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -112,14 +112,14 @@ async def test_abort(async_app): @async_app.task(queue="default", name="task1", pass_context=True) async def task1(context): while True: - await asyncio.sleep(0.1) + await asyncio.sleep(0.02) if await context.should_abort_async(): raise JobAborted @async_app.task(queue="default", name="task2", pass_context=True) def task2(context): while True: - time.sleep(0.1) + time.sleep(0.02) if context.should_abort(): raise JobAborted @@ -130,11 +130,11 @@ def task2(context): async_app.run_worker_async(queues=["default"], wait=False) ) - await asyncio.sleep(0.1) + await asyncio.sleep(0.05) result = await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) assert result is True - await asyncio.sleep(0.1) + await asyncio.sleep(0.05) result = await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) assert result is True