signals.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from django.db import models, transaction
  4. from django.dispatch import Signal
  5. from django.utils.timezone import now
  6. from dj_cqrs.constants import SignalType
  7. from dj_cqrs.controller import producer
  8. from dj_cqrs.dataclasses import TransportPayload
  9. from dj_cqrs.utils import get_message_expiration_dt
  10. logger = logging.getLogger('django-cqrs')
  11. post_bulk_create = Signal()
  12. """
  13. Signal sent after a bulk create.
  14. See dj_cqrs.mixins.RawMasterMixin.call_post_bulk_create.
  15. """
  16. post_update = Signal()
  17. """
  18. Signal sent after a bulk update.
  19. See dj_cqrs.mixins.RawMasterMixin.call_post_update.
  20. """
  21. class MasterSignals:
  22. """Signals registry and handlers for CQRS master models."""
  23. @classmethod
  24. def register_model(cls, model_cls):
  25. """
  26. Registers signals for a model.
  27. Args:
  28. model_cls (dj_cqrs.mixins.MasterMixin): Model class inherited from CQRS MasterMixin.
  29. """
  30. models.signals.post_save.connect(cls.post_save, sender=model_cls)
  31. models.signals.post_delete.connect(cls.post_delete, sender=model_cls)
  32. post_bulk_create.connect(cls.post_bulk_create, sender=model_cls)
  33. post_update.connect(cls.post_bulk_update, sender=model_cls)
  34. @classmethod
  35. def post_save(cls, sender, **kwargs):
  36. """
  37. Args:
  38. sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
  39. """
  40. if not sender.CQRS_PRODUCE:
  41. return
  42. update_fields = kwargs.get('update_fields')
  43. if update_fields and ('cqrs_revision' not in update_fields):
  44. return
  45. instance = kwargs['instance']
  46. if not instance.is_sync_instance():
  47. return
  48. using = kwargs['using']
  49. sync = kwargs.get('sync', False)
  50. queue = kwargs.get('queue', None)
  51. connection = transaction.get_connection(using)
  52. if not connection.in_atomic_block or instance.is_initial_cqrs_save:
  53. transaction.on_commit(
  54. lambda: cls._post_save_produce(sender, instance, using, sync, queue),
  55. )
  56. @classmethod
  57. def _post_save_produce(cls, sender, instance, using, sync, queue):
  58. # As this method may run 'on_commit', the instance may not exist. In that case, log the
  59. # error but don't raise an exception.
  60. try:
  61. instance.reset_cqrs_saves_count()
  62. instance_data = instance.to_cqrs_dict(using, sync=sync)
  63. previous_data = instance.get_tracked_fields_data()
  64. signal_type = SignalType.SYNC if sync else SignalType.SAVE
  65. meta = instance.get_cqrs_meta(
  66. instance_data=instance_data,
  67. previous_data=previous_data,
  68. signal_type=signal_type,
  69. )
  70. except sender.DoesNotExist:
  71. logger.error(
  72. f"Can't produce message from master model '{sender.__name__}': "
  73. f"The instance doesn't exist (pk={instance.pk})",
  74. )
  75. return
  76. payload = TransportPayload(
  77. signal_type,
  78. sender.CQRS_ID,
  79. instance_data,
  80. instance.pk,
  81. queue,
  82. previous_data,
  83. expires=get_message_expiration_dt(),
  84. meta=meta,
  85. )
  86. producer.produce(payload)
  87. @classmethod
  88. def post_delete(cls, sender, **kwargs):
  89. """
  90. Args:
  91. sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
  92. """
  93. if not sender.CQRS_PRODUCE:
  94. return
  95. instance = kwargs['instance']
  96. if not instance.is_sync_instance():
  97. return
  98. instance_data = {
  99. 'id': instance.pk,
  100. 'cqrs_revision': instance.cqrs_revision + 1,
  101. 'cqrs_updated': str(now()),
  102. }
  103. data = instance.get_custom_cqrs_delete_data()
  104. if data:
  105. instance_data['custom'] = data
  106. signal_type = SignalType.DELETE
  107. meta = instance.get_cqrs_meta(
  108. instance_data=instance_data,
  109. signal_type=signal_type,
  110. )
  111. payload = TransportPayload(
  112. signal_type,
  113. sender.CQRS_ID,
  114. instance_data,
  115. instance.pk,
  116. expires=get_message_expiration_dt(),
  117. meta=meta,
  118. )
  119. # Delete is always in transaction!
  120. transaction.on_commit(lambda: producer.produce(payload))
  121. @classmethod
  122. def post_bulk_create(cls, sender, **kwargs):
  123. """
  124. Args:
  125. sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
  126. """
  127. cls._post_bulk(sender, **kwargs)
  128. @classmethod
  129. def post_bulk_update(cls, sender, **kwargs):
  130. """
  131. Args:
  132. sender (dj_cqrs.mixins.MasterMixin): Class or instance inherited from CQRS MasterMixin.
  133. """
  134. cls._post_bulk(sender, **kwargs)
  135. @classmethod
  136. def _post_bulk(cls, sender, **kwargs):
  137. for instance in kwargs['instances']:
  138. cls.post_save(sender, instance=instance, using=kwargs['using'])