Skip to content

Commit

Permalink
Merge pull request #58 from radexperts/dicom-worker
Browse files Browse the repository at this point in the history
Use custom dicom worker instead of Celery
  • Loading branch information
medihack authored Nov 12, 2023
2 parents 0e572a4 + 21e41da commit 6b819a5
Show file tree
Hide file tree
Showing 63 changed files with 1,420 additions and 1,089 deletions.
14 changes: 13 additions & 1 deletion TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

## Top

- Before new release
-- test job_utils
-- move mixins stuff over to consumer and delete the stuff in slective transfer view (we never post there)
-- Rename Process... to ...Processor
-- Switch from Celery to Huey
-- Fix pyright
-- exclude autoreload when tests are saved (Custom Filter in server command watched files)
-- Text canceled task/job in test_workers.py

- Upgrade REDIS server on RADIS
- Unfix pyright and its VS code extension
- Move over to Celery with Redis backend and get rid of RabbitMQ
- Replace sherlock on RADIS
- Use DicomLogEntry during C-STORE
- Allow to restart or cancel specific dicom task
- Fix dicom explorer search over Accession Number
Expand Down Expand Up @@ -126,7 +139,6 @@
- <https://www.yergler.net/2009/09/27/nested-formsets-with-django/>
- <http://the-frey.github.io/2014/08/18/monitoring-docker-containers-with-monit>
- move or get rid of hijack_logger and store_log_in_task in task_utils
- Use LRU cache for dicom explorer / collector (don't we already do this?!)
- log debug -> info in connector also in production
- Link owner in templates to user profile
- Rewrite dicom_connector to use asyncio (wrap all pynetdicom calls in asyncio.to_thread)
Expand Down
12 changes: 11 additions & 1 deletion adit/batch_query/apps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from django.apps import AppConfig
from django.db.models.signals import post_migrate

from adit.core.site import register_job_stats_collector, register_main_menu_item
from adit.core.site import (
register_dicom_processor,
register_job_stats_collector,
register_main_menu_item,
)

SECTION_NAME = "Batch Query"

Expand All @@ -17,11 +21,17 @@ def ready(self):


def register_app():
from .models import BatchQueryTask
from .processors import ProcessBatchQueryTask

register_main_menu_item(
url_name="batch_query_job_create",
label=SECTION_NAME,
)

model_label = f"{BatchQueryTask._meta.app_label}.{BatchQueryTask._meta.model_name}"
register_dicom_processor(model_label, ProcessBatchQueryTask)

register_job_stats_collector(collect_job_stats)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.7 on 2023-11-08 21:53

from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("batch_query", "0030_alter_batchqueryresult_series_number"),
]

operations = [
migrations.RemoveField(
model_name="batchquerysettings",
name="slot_begin_time",
),
migrations.RemoveField(
model_name="batchquerysettings",
name="slot_end_time",
),
migrations.RemoveField(
model_name="batchquerysettings",
name="transfer_timeout",
),
migrations.RemoveField(
model_name="batchquerytask",
name="celery_task_id",
),
]
4 changes: 0 additions & 4 deletions adit/batch_query/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime
from typing import TYPE_CHECKING

from celery import current_app
from django.contrib.postgres.fields import ArrayField
from django.core.exceptions import ValidationError
from django.db import models
Expand Down Expand Up @@ -33,9 +32,6 @@ class BatchQueryJob(DicomJob):
tasks = RelatedManager["BatchQueryTask"]()
results = RelatedManager["BatchQueryResult"]()

def delay(self):
current_app.send_task("adit.batch_query.tasks.ProcessBatchQueryJob", (self.id,))

def get_absolute_url(self):
return reverse("batch_query_job_detail", args=[str(self.id)])

Expand Down
17 changes: 17 additions & 0 deletions adit/batch_query/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from adit.core.processors import ProcessDicomTask
from adit.core.types import DicomLogEntry

from .models import BatchQuerySettings, BatchQueryTask
from .utils.query_utils import QueryExecutor


class ProcessBatchQueryTask(ProcessDicomTask):
app_name = "Batch Query"
dicom_task_class = BatchQueryTask
app_settings_class = BatchQuerySettings

def process_dicom_task(
self, dicom_task
) -> tuple[BatchQueryTask.Status, str, list[DicomLogEntry]]:
assert isinstance(dicom_task, BatchQueryTask)
return QueryExecutor(dicom_task, self).start()
38 changes: 0 additions & 38 deletions adit/batch_query/tasks.py

This file was deleted.

4 changes: 2 additions & 2 deletions adit/batch_query/tests/integration/test_batch_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_urgent_batch_query_with_dimse_server(
page: Page,
poll: Callable[[Locator], Locator],
dimse_orthancs,
adit_celery_worker,
dicom_worker,
channels_live_server,
create_and_login_user,
grant_access,
Expand Down Expand Up @@ -48,7 +48,7 @@ def test_urgent_batch_query_with_dicomweb_server(
page: Page,
poll: Callable[[Locator], Locator],
dicomweb_orthancs,
adit_celery_worker,
dicom_worker,
channels_live_server,
create_and_login_user,
grant_access,
Expand Down
22 changes: 11 additions & 11 deletions adit/batch_query/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class BatchQueryJobCreateView(BatchQueryLockedMixin, DicomJobCreateView):
form_class = BatchQueryJobForm
template_name = "batch_query/batch_query_job_form.html"
permission_required = "batch_query.add_batchqueryjob"
default_priority = settings.BATCH_QUERY_DEFAULT_PRIORITY
urgent_priority = settings.BATCH_QUERY_URGENT_PRIORITY
object: BatchQueryJob

def get_initial(self) -> dict[str, Any]:
Expand All @@ -80,17 +82,7 @@ def get_initial(self) -> dict[str, Any]:
return initial

def form_valid(self, form):
user = self.request.user
form.instance.owner = user
response = super().form_valid(form)

job = self.object # set by super().form_valid(form)
if user.is_staff or settings.BATCH_QUERY_UNVERIFIED:
job.status = BatchQueryJob.Status.PENDING
job.save()
job.delay()

return response
return super().form_valid(form, settings.BATCH_QUERY_UNVERIFIED)


class BatchQueryJobDetailView(BatchQueryLockedMixin, DicomJobDetailView):
Expand All @@ -113,6 +105,8 @@ class BatchQueryJobDeleteView(BatchQueryLockedMixin, DicomJobDeleteView):

class BatchQueryJobVerifyView(BatchQueryLockedMixin, DicomJobVerifyView):
model = BatchQueryJob
default_priority = settings.BATCH_QUERY_DEFAULT_PRIORITY
urgent_priority = settings.BATCH_QUERY_URGENT_PRIORITY


class BatchQueryJobCancelView(BatchQueryLockedMixin, DicomJobCancelView):
Expand All @@ -121,14 +115,20 @@ class BatchQueryJobCancelView(BatchQueryLockedMixin, DicomJobCancelView):

class BatchQueryJobResumeView(BatchQueryLockedMixin, DicomJobResumeView):
model = BatchQueryJob
default_priority = settings.BATCH_QUERY_DEFAULT_PRIORITY
urgent_priority = settings.BATCH_QUERY_URGENT_PRIORITY


class BatchQueryJobRetryView(BatchQueryLockedMixin, DicomJobRetryView):
model = BatchQueryJob
default_priority = settings.BATCH_QUERY_DEFAULT_PRIORITY
urgent_priority = settings.BATCH_QUERY_URGENT_PRIORITY


class BatchQueryJobRestartView(BatchQueryLockedMixin, DicomJobRestartView):
model = BatchQueryJob
default_priority = settings.BATCH_QUERY_DEFAULT_PRIORITY
urgent_priority = settings.BATCH_QUERY_URGENT_PRIORITY


class BatchQueryTaskDetailView(
Expand Down
12 changes: 11 additions & 1 deletion adit/batch_transfer/apps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from django.apps import AppConfig
from django.db.models.signals import post_migrate

from adit.core.site import register_job_stats_collector, register_main_menu_item
from adit.core.site import (
register_dicom_processor,
register_job_stats_collector,
register_main_menu_item,
)

SECTION_NAME = "Batch Transfer"

Expand All @@ -17,11 +21,17 @@ def ready(self):


def register_app():
from .models import BatchTransferTask
from .processors import ProcessBatchTransferTask

register_main_menu_item(
url_name="batch_transfer_job_create",
label=SECTION_NAME,
)

model_label = f"{BatchTransferTask._meta.app_label}.{BatchTransferTask._meta.model_name}"
register_dicom_processor(model_label, ProcessBatchTransferTask)

register_job_stats_collector(collect_job_stats)


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.7 on 2023-11-08 21:53

from django.db import migrations


class Migration(migrations.Migration):
dependencies = [
("batch_transfer", "0027_alter_batchtransfertask_series_uids"),
]

operations = [
migrations.RemoveField(
model_name="batchtransfersettings",
name="slot_begin_time",
),
migrations.RemoveField(
model_name="batchtransfersettings",
name="slot_end_time",
),
migrations.RemoveField(
model_name="batchtransfersettings",
name="transfer_timeout",
),
migrations.RemoveField(
model_name="batchtransfertask",
name="celery_task_id",
),
]
4 changes: 0 additions & 4 deletions adit/batch_transfer/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import TYPE_CHECKING

from celery import current_app
from django.contrib.postgres.fields import ArrayField
from django.db import models
from django.urls import reverse
Expand All @@ -24,9 +23,6 @@ class BatchTransferJob(TransferJob):
if TYPE_CHECKING:
tasks = RelatedManager["BatchTransferTask"]()

def delay(self):
current_app.send_task("adit.batch_transfer.tasks.ProcessBatchTransferJob", (self.id,))

def get_absolute_url(self):
return reverse("batch_transfer_job_detail", args=[self.id])

Expand Down
17 changes: 17 additions & 0 deletions adit/batch_transfer/processors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from adit.core.processors import ProcessDicomTask
from adit.core.types import DicomLogEntry
from adit.core.utils.transfer_utils import TransferExecutor

from .models import BatchTransferSettings, BatchTransferTask


class ProcessBatchTransferTask(ProcessDicomTask):
app_name = "Batch Transfer"
dicom_task_class = BatchTransferTask
app_settings_class = BatchTransferSettings

def process_dicom_task(
self, dicom_task
) -> tuple[BatchTransferTask.Status, str, list[DicomLogEntry]]:
assert isinstance(dicom_task, BatchTransferTask)
return TransferExecutor(dicom_task, self).start()
38 changes: 0 additions & 38 deletions adit/batch_transfer/tasks.py

This file was deleted.

4 changes: 2 additions & 2 deletions adit/batch_transfer/tests/integration/test_batch_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def test_unpseudonymized_urgent_batch_transfer_with_dimse_server(
page: Page,
poll: Callable[[Locator], Locator],
dimse_orthancs,
adit_celery_worker,
dicom_worker,
channels_live_server,
create_and_login_user,
grant_access,
Expand Down Expand Up @@ -54,7 +54,7 @@ def test_unpseudonymized_urgent_batch_transfer_with_dicomweb_server(
page: Page,
poll: Callable[[Locator], Locator],
dicomweb_orthancs,
adit_celery_worker,
dicom_worker,
channels_live_server,
create_and_login_user,
grant_access,
Expand Down
Loading

0 comments on commit 6b819a5

Please # to comment.