From 5ffd414f060600681a666b808e8e88f91a76e5e7 Mon Sep 17 00:00:00 2001 From: Yann Normand Date: Sat, 19 Oct 2024 21:10:34 +1000 Subject: [PATCH 1/2] update timing of flaky acceptance test --- tests/acceptance/test_async.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/acceptance/test_async.py b/tests/acceptance/test_async.py index 4056f4f07..953337d43 100644 --- a/tests/acceptance/test_async.py +++ b/tests/acceptance/test_async.py @@ -112,14 +112,14 @@ async def test_abort(async_app): @async_app.task(queue="default", name="task1", pass_context=True) async def task1(context): while True: - await asyncio.sleep(0.1) + await asyncio.sleep(0.02) if await context.should_abort_async(): raise JobAborted @async_app.task(queue="default", name="task2", pass_context=True) def task2(context): while True: - time.sleep(0.1) + time.sleep(0.02) if context.should_abort(): raise JobAborted @@ -130,11 +130,11 @@ def task2(context): async_app.run_worker_async(queues=["default"], wait=False) ) - await asyncio.sleep(0.1) + await asyncio.sleep(0.05) result = await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True) assert result is True - await asyncio.sleep(0.1) + await asyncio.sleep(0.05) result = await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True) assert result is True From c815369160d80c36e2fbe49ab68b48121a8dc04d Mon Sep 17 00:00:00 2001 From: Yann Normand Date: Sat, 19 Oct 2024 21:14:46 +1000 Subject: [PATCH 2/2] Update indexes to improve performance of fetch_job --- .../migrations/0031_add_indexes_for_fetch_job.py | 15 +++++++++++++++ .../02.14.01_01_add_indexes_for_fetch_job.sql | 7 +++++++ procrastinate/sql/schema.sql | 4 +++- 3 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py create mode 100644 procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql diff --git a/procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py b/procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py new file mode 100644 index 000000000..794363039 --- /dev/null +++ b/procrastinate/contrib/django/migrations/0031_add_indexes_for_fetch_job.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from django.db import migrations + +from .. import migrations_utils + + +class Migration(migrations.Migration): + operations = [ + migrations_utils.RunProcrastinateSQL( + name="02.14.01_01_add_indexes_for_fetch_job.sql" + ) + ] + name = "0031_add_indexes_for_fetch_job" + dependencies = [("procrastinate", "0030_alter_procrastinateevent_options")] diff --git a/procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql b/procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql new file mode 100644 index 000000000..0498567d5 --- /dev/null +++ b/procrastinate/sql/migrations/02.14.01_01_add_indexes_for_fetch_job.sql @@ -0,0 +1,7 @@ +-- recreate procrastinate_jobs_id_lock_idx index by adding aborting status to the filter so that it can be used by the fetch job function +CREATE INDEX procrastinate_jobs_id_lock_idx_temp ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]); +DROP INDEX procrastinate_jobs_id_lock_idx; +ALTER INDEX procrastinate_jobs_id_lock_idx_temp RENAME TO procrastinate_jobs_id_lock_idx; + +-- add index to avoid seq scan of outer query in the fetch job function +CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status); diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index a2fe4d950..4dd7bd59a 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -65,7 +65,9 @@ CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs ( CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name); -CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]); +CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]); +CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status); + CREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id);