From 3df0d604e1c9849758643f48ccbe71c608ea6023 Mon Sep 17 00:00:00 2001 From: Bragegs Date: Fri, 2 Feb 2024 01:36:36 +0100 Subject: [PATCH 1/2] fix CelerySignalProcessor delete handling 1. Removed unecessary comma from setter of `bulk_data` variable. 2. Add `doc_module` to `registry_delete_task` function parameters so that we can init the registered document correctly. --- django_elasticsearch_dsl/signals.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/django_elasticsearch_dsl/signals.py b/django_elasticsearch_dsl/signals.py index 48f42249..259be739 100644 --- a/django_elasticsearch_dsl/signals.py +++ b/django_elasticsearch_dsl/signals.py @@ -162,18 +162,18 @@ def prepare_registry_delete_related_task(self, instance): object_list = [related] else: object_list = related - bulk_data = list(doc_instance._get_actions(object_list, action)), - self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data) + bulk_data = list(doc_instance._get_actions(object_list, action)) + self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data) @shared_task() - def registry_delete_task(doc_label, data): + def registry_delete_task(doc_module, doc_class, bulk_data): """ Handle the bulk delete data on the registry as a Celery task. The different implementations used are due to the difference between delete and update operations. The update operation can re-read the updated data from the database to ensure eventual consistency, but the delete needs to be processed before the database record is deleted to obtain the associated data. """ - doc_instance = import_module(doc_label) + doc_instance = getattr(import_module(doc_module), doc_class)() parallel = True doc_instance._bulk(bulk_data, parallel=parallel) @@ -194,8 +194,8 @@ def prepare_registry_delete_task(self, instance): object_list = [related] else: object_list = related - bulk_data = list(doc_instance.get_actions(object_list, action)), - self.registry_delete_task.delay(doc_instance.__class__.__name__, bulk_data) + bulk_data = list(doc_instance.get_actions(object_list, action)) + self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data) @shared_task() def registry_update_task(pk, app_label, model_name): From 801fe3ee2f3441734445b71871c447b755715099 Mon Sep 17 00:00:00 2001 From: Bragegs Date: Wed, 12 Jun 2024 11:37:47 +0200 Subject: [PATCH 2/2] further work on signals.py This version seems to actually delete all indexes and related docs instances correctly --- django_elasticsearch_dsl/signals.py | 111 ++++++++++++++++++++-------- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/django_elasticsearch_dsl/signals.py b/django_elasticsearch_dsl/signals.py index 259be739..5701424c 100644 --- a/django_elasticsearch_dsl/signals.py +++ b/django_elasticsearch_dsl/signals.py @@ -144,26 +144,59 @@ def handle_delete(self, sender, instance, **kwargs): """ self.prepare_registry_delete_task(instance) + + @shared_task() + def registry_delete_related_task(doc_module, doc_class, object_ids, action): + """ + A Celery task that fetches the latest data for given object IDs and performs the required indexing action. + This version uses the custom `get_queryset()` method defined in the document class. + + Instead of deleting the related objects we update it so that the deleted connection between + the deleted model and the related model is updated into elasticsearch. + """ + doc_instance = getattr(import_module(doc_module), doc_class)() + model = doc_instance.django.model + + # Fetch the latest instances from the database + #object_list = model.objects.filter(pk__in=object_ids).all() + # Use the custom queryset method if available + object_list = doc_instance.get_queryset().filter(pk__in=object_ids) + if not object_list: + return + + # Generate the bulk update data + bulk_data = list(doc_instance._get_actions(object_list, action)) + + if bulk_data: + doc_instance._bulk(bulk_data, parallel=True) + + def prepare_registry_delete_related_task(self, instance): """ - Select its related instance before this instance was deleted. - And pass that to celery. + Collect IDs of related instances before the main instance is deleted and queue these IDs + for indexing in Elasticsearch through a registry_delete_related_task. """ - action = 'index' - for doc in registry._get_related_doc(instance): - doc_instance = doc(related_instance_to_ignore=instance) + related_docs = list(registry._get_related_doc(instance)) + if not related_docs: + return + + for doc_class in related_docs: + doc_instance = doc_class() try: related = doc_instance.get_instances_from_related(instance) except ObjectDoesNotExist: related = None - if related is not None: - doc_instance.update(related) + + if related: if isinstance(related, models.Model): - object_list = [related] + object_ids = [related.pk] else: - object_list = related - bulk_data = list(doc_instance._get_actions(object_list, action)) - self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data) + object_ids = [obj.pk for obj in related if hasattr(obj, 'pk')] + + action = 'index' # Set the operation as 'index' + # Send only the IDs to the task + self.registry_delete_related_task.delay(doc_class.__module__, doc_class.__name__, object_ids, action) + @shared_task() def registry_delete_task(doc_module, doc_class, bulk_data): @@ -177,25 +210,32 @@ def registry_delete_task(doc_module, doc_class, bulk_data): parallel = True doc_instance._bulk(bulk_data, parallel=parallel) + def prepare_registry_delete_task(self, instance): """ - Get the prepare did before database record deleted. + Prepare deletion of the instance itself from Elasticsearch. """ action = 'delete' - for doc in registry._get_related_doc(instance): - doc_instance = doc(related_instance_to_ignore=instance) - try: - related = doc_instance.get_instances_from_related(instance) - except ObjectDoesNotExist: - related = None - if related is not None: - doc_instance.update(related) - if isinstance(related, models.Model): - object_list = [related] - else: - object_list = related - bulk_data = list(doc_instance.get_actions(object_list, action)) - self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data) + + # Find all documents in the registry that are related to the instance's model class + if instance.__class__ not in registry._models: + return + + bulk_data = [] + for doc_class in registry._models[instance.__class__]: + doc_instance = doc_class() # Create an instance of the document + if isinstance(instance, models.Model): + object_list = [instance] + else: + object_list = instance + + # Assuming get_actions method prepares the correct delete actions for Elasticsearch + bulk_data.extend(list(doc_instance._get_actions(object_list, action))) + + if bulk_data: + # Ensure registry_delete_task is prepared to handle bulk deletion + self.registry_delete_task.delay(doc_instance.__module__, doc_instance.__class__.__name__, bulk_data) + @shared_task() def registry_update_task(pk, app_label, model_name): @@ -205,9 +245,13 @@ def registry_update_task(pk, app_label, model_name): except LookupError: pass else: - registry.update( - model.objects.get(pk=pk) - ) + try: + registry.update( + model.objects.get(pk=pk) + ) + except ObjectDoesNotExist as e: + print(f'Error registry_update_task: {e}') + @shared_task() def registry_update_related_task(pk, app_label, model_name): @@ -217,6 +261,9 @@ def registry_update_related_task(pk, app_label, model_name): except LookupError: pass else: - registry.update_related( - model.objects.get(pk=pk) - ) + try: + registry.update_related( + model.objects.get(pk=pk) + ) + except ObjectDoesNotExist as e: + print(f'Error registry_update_related_task: {e}')