123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import logging
- from django.db import models, transaction
- from django.dispatch import Signal
- from django.utils.timezone import now
- from dj_cqrs.constants import SignalType
- from dj_cqrs.controller import producer
- from dj_cqrs.dataclasses import TransportPayload
- from dj_cqrs.utils import get_message_expiration_dt
- logger = logging.getLogger('django-cqrs')
- post_bulk_create = Signal()
- """
- Signal sent after a bulk create.
- See dj_cqrs.mixins.RawMasterMixin.call_post_bulk_create.
- """
- post_update = Signal()
- """
- Signal sent after a bulk update.
- See dj_cqrs.mixins.RawMasterMixin.call_post_update.
- """
- class MasterSignals:
- """Signals registry and handlers for CQRS master models."""
- @classmethod
- def register_model(cls, model_cls):
- """
- Registers signals for a model.
- Args:
- model_cls (dj_cqrs.mixins.MasterMixin): Model class inherited from CQRS MasterMixin.
- """
- models.signals.post_save.connect(cls.post_save, sender=model_cls)
- models.signals.post_delete.connect(cls.post_delete, sender=model_cls)
- post_bulk_create.connect(cls.post_bulk_create, sender=model_cls)
- post_update.connect(cls.post_bulk_update, sender=model_cls)
- @classmethod
- def post_save(cls, sender, **kwargs):
- """
- Args:
- sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
- """
- if not sender.CQRS_PRODUCE:
- return
- update_fields = kwargs.get('update_fields')
- if update_fields and ('cqrs_revision' not in update_fields):
- return
- instance = kwargs['instance']
- if not instance.is_sync_instance():
- return
- using = kwargs['using']
- sync = kwargs.get('sync', False)
- queue = kwargs.get('queue', None)
- connection = transaction.get_connection(using)
- if not connection.in_atomic_block or instance.is_initial_cqrs_save:
- transaction.on_commit(
- lambda: cls._post_save_produce(sender, instance, using, sync, queue),
- )
- @classmethod
- def _post_save_produce(cls, sender, instance, using, sync, queue):
- # As this method may run 'on_commit', the instance may not exist. In that case, log the
- # error but don't raise an exception.
- try:
- instance.reset_cqrs_saves_count()
- instance_data = instance.to_cqrs_dict(using, sync=sync)
- previous_data = instance.get_tracked_fields_data()
- signal_type = SignalType.SYNC if sync else SignalType.SAVE
- meta = instance.get_cqrs_meta(
- instance_data=instance_data,
- previous_data=previous_data,
- signal_type=signal_type,
- )
- except sender.DoesNotExist:
- logger.error(
- f"Can't produce message from master model '{sender.__name__}': "
- f"The instance doesn't exist (pk={instance.pk})",
- )
- return
- payload = TransportPayload(
- signal_type,
- sender.CQRS_ID,
- instance_data,
- instance.pk,
- queue,
- previous_data,
- expires=get_message_expiration_dt(),
- meta=meta,
- )
- producer.produce(payload)
- @classmethod
- def post_delete(cls, sender, **kwargs):
- """
- Args:
- sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
- """
- if not sender.CQRS_PRODUCE:
- return
- instance = kwargs['instance']
- if not instance.is_sync_instance():
- return
- instance_data = {
- 'id': instance.pk,
- 'cqrs_revision': instance.cqrs_revision + 1,
- 'cqrs_updated': str(now()),
- }
- data = instance.get_custom_cqrs_delete_data()
- if data:
- instance_data['custom'] = data
- signal_type = SignalType.DELETE
- meta = instance.get_cqrs_meta(
- instance_data=instance_data,
- signal_type=signal_type,
- )
- payload = TransportPayload(
- signal_type,
- sender.CQRS_ID,
- instance_data,
- instance.pk,
- expires=get_message_expiration_dt(),
- meta=meta,
- )
- # Delete is always in transaction!
- transaction.on_commit(lambda: producer.produce(payload))
- @classmethod
- def post_bulk_create(cls, sender, **kwargs):
- """
- Args:
- sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
- """
- cls._post_bulk(sender, **kwargs)
- @classmethod
- def post_bulk_update(cls, sender, **kwargs):
- """
- Args:
- sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
- """
- cls._post_bulk(sender, **kwargs)
- @classmethod
- def _post_bulk(cls, sender, **kwargs):
- for instance in kwargs['instances']:
- cls.post_save(sender, instance=instance, using=kwargs['using'])
|