Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix fetch job performance regression #1225

Merged
merged 2 commits into from
Oct 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from __future__ import annotations

from django.db import migrations

from .. import migrations_utils


class Migration(migrations.Migration):
operations = [
migrations_utils.RunProcrastinateSQL(
name="02.14.01_01_add_indexes_for_fetch_job.sql"
)
]
name = "0031_add_indexes_for_fetch_job"
dependencies = [("procrastinate", "0030_alter_procrastinateevent_options")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- recreate procrastinate_jobs_id_lock_idx index by adding aborting status to the filter so that it can be used by the fetch job function
CREATE INDEX procrastinate_jobs_id_lock_idx_temp ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for self, don't forget to tell people in the release notes to create the index concurrently if applicable.

DROP INDEX procrastinate_jobs_id_lock_idx;
ALTER INDEX procrastinate_jobs_id_lock_idx_temp RENAME TO procrastinate_jobs_id_lock_idx;

-- add index to avoid seq scan of outer query in the fetch job function
CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status);
4 changes: 3 additions & 1 deletion procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (
CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing';

CREATE INDEX procrastinate_jobs_queue_name_idx ON procrastinate_jobs(queue_name);
CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status]);
CREATE INDEX procrastinate_jobs_id_lock_idx ON procrastinate_jobs (id, lock) WHERE status = ANY (ARRAY['todo'::procrastinate_job_status, 'doing'::procrastinate_job_status, 'aborting'::procrastinate_job_status]);
CREATE INDEX procrastinate_jobs_priority_idx ON procrastinate_jobs(priority desc, id asc) WHERE (status = 'todo'::procrastinate_job_status);


CREATE INDEX procrastinate_events_job_id_fkey ON procrastinate_events(job_id);

Expand Down
8 changes: 4 additions & 4 deletions tests/acceptance/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ async def test_abort(async_app):
@async_app.task(queue="default", name="task1", pass_context=True)
async def task1(context):
while True:
await asyncio.sleep(0.1)
await asyncio.sleep(0.02)
if await context.should_abort_async():
raise JobAborted

@async_app.task(queue="default", name="task2", pass_context=True)
def task2(context):
while True:
time.sleep(0.1)
time.sleep(0.02)
if context.should_abort():
raise JobAborted

Expand All @@ -130,11 +130,11 @@ def task2(context):
async_app.run_worker_async(queues=["default"], wait=False)
)

await asyncio.sleep(0.1)
await asyncio.sleep(0.05)
result = await async_app.job_manager.cancel_job_by_id_async(job1_id, abort=True)
assert result is True

await asyncio.sleep(0.1)
await asyncio.sleep(0.05)
result = await async_app.job_manager.cancel_job_by_id_async(job2_id, abort=True)
assert result is True

Expand Down
Loading