Skip to content

fix CelerySignalProcessor delete handling #473

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 81 additions & 34 deletions django_elasticsearch_dsl/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,58 +144,98 @@ def handle_delete(self, sender, instance, **kwargs):
"""
self.prepare_registry_delete_task(instance)


@shared_task()
def registry_delete_related_task(doc_module, doc_class, object_ids, action):
"""
A Celery task that fetches the latest data for given object IDs and performs the required indexing action.
This version uses the custom `get_queryset()` method defined in the document class.

Instead of deleting the related objects we update it so that the deleted connection between
the deleted model and the related model is updated into elasticsearch.
"""
doc_instance = getattr(import_module(doc_module), doc_class)()
model = doc_instance.django.model

# Fetch the latest instances from the database
#object_list = model.objects.filter(pk__in=object_ids).all()
# Use the custom queryset method if available
object_list = doc_instance.get_queryset().filter(pk__in=object_ids)
if not object_list:
return

# Generate the bulk update data
bulk_data = list(doc_instance._get_actions(object_list, action))

if bulk_data:
doc_instance._bulk(bulk_data, parallel=True)


def prepare_registry_delete_related_task(self, instance):
"""
Select its related instance before this instance was deleted.
And pass that to celery.
Collect IDs of related instances before the main instance is deleted and queue these IDs
for indexing in Elasticsearch through a registry_delete_related_task.
"""
action = 'index'
for doc in registry._get_related_doc(instance):
doc_instance = doc(related_instance_to_ignore=instance)
related_docs = list(registry._get_related_doc(instance))
if not related_docs:
return

for doc_class in related_docs:
doc_instance = doc_class()
try:
related = doc_instance.get_instances_from_related(instance)
except ObjectDoesNotExist:
related = None
if related is not None:
doc_instance.update(related)

if related:
if isinstance(related, models.Model):
object_list = [related]
object_ids = [related.pk]
else:
object_list = related
bulk_data = list(doc_instance._get_actions(object_list, action)),
self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data)
object_ids = [obj.pk for obj in related if hasattr(obj, 'pk')]

action = 'index' # Set the operation as 'index'
# Send only the IDs to the task
self.registry_delete_related_task.delay(doc_class.__module__, doc_class.__name__, object_ids, action)


@shared_task()
def registry_delete_task(doc_label, data):
def registry_delete_task(doc_module, doc_class, bulk_data):
"""
Handle the bulk delete data on the registry as a Celery task.
The different implementations used are due to the difference between delete and update operations.
The update operation can re-read the updated data from the database to ensure eventual consistency,
but the delete needs to be processed before the database record is deleted to obtain the associated data.
"""
doc_instance = import_module(doc_label)
doc_instance = getattr(import_module(doc_module), doc_class)()
parallel = True
doc_instance._bulk(bulk_data, parallel=parallel)


def prepare_registry_delete_task(self, instance):
"""
Get the prepare did before database record deleted.
Prepare deletion of the instance itself from Elasticsearch.
"""
action = 'delete'
for doc in registry._get_related_doc(instance):
doc_instance = doc(related_instance_to_ignore=instance)
try:
related = doc_instance.get_instances_from_related(instance)
except ObjectDoesNotExist:
related = None
if related is not None:
doc_instance.update(related)
if isinstance(related, models.Model):
object_list = [related]
else:
object_list = related
bulk_data = list(doc_instance.get_actions(object_list, action)),
self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data)

# Find all documents in the registry that are related to the instance's model class
if instance.__class__ not in registry._models:
return

bulk_data = []
for doc_class in registry._models[instance.__class__]:
doc_instance = doc_class() # Create an instance of the document
if isinstance(instance, models.Model):
object_list = [instance]
else:
object_list = instance

# Assuming get_actions method prepares the correct delete actions for Elasticsearch
bulk_data.extend(list(doc_instance._get_actions(object_list, action)))

if bulk_data:
# Ensure registry_delete_task is prepared to handle bulk deletion
self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data)


@shared_task()
def registry_update_task(pk, app_label, model_name):
Expand All @@ -205,9 +245,13 @@ def registry_update_task(pk, app_label, model_name):
except LookupError:
pass
else:
registry.update(
model.objects.get(pk=pk)
)
try:
registry.update(
model.objects.get(pk=pk)
)
except ObjectDoesNotExist as e:
print(f'Error registry_update_task: {e}')


@shared_task()
def registry_update_related_task(pk, app_label, model_name):
Expand All @@ -217,6 +261,9 @@ def registry_update_related_task(pk, app_label, model_name):
except LookupError:
pass
else:
registry.update_related(
model.objects.get(pk=pk)
)
try:
registry.update_related(
model.objects.get(pk=pk)
)
except ObjectDoesNotExist as e:
print(f'Error registry_update_related_task: {e}')