consumer.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import copy
  3. import logging
  4. from contextlib import ExitStack
  5. from django.conf import settings
  6. from django.db import Error, close_old_connections, transaction
  7. from dj_cqrs.constants import SignalType
  8. from dj_cqrs.logger import log_timed_out_queries
  9. from dj_cqrs.registries import ReplicaRegistry
  10. from dj_cqrs.utils import apply_query_timeouts
  11. logger = logging.getLogger('django-cqrs')
  12. def consume(payload):
  13. """Consumer controller.
  14. :param dj_cqrs.dataclasses.TransportPayload payload: Consumed payload from master service.
  15. """
  16. payload = copy.deepcopy(payload)
  17. return route_signal_to_replica_model(
  18. payload.signal_type,
  19. payload.cqrs_id,
  20. payload.instance_data,
  21. previous_data=payload.previous_data,
  22. meta=payload.meta,
  23. queue=payload.queue,
  24. )
  25. def route_signal_to_replica_model(
  26. signal_type,
  27. cqrs_id,
  28. instance_data,
  29. previous_data=None,
  30. meta=None,
  31. queue=None,
  32. ):
  33. """Routes signal to model method to create/update/delete replica instance.
  34. :param dj_cqrs.constants.SignalType signal_type: Consumed signal type.
  35. :param str cqrs_id: Replica model CQRS unique identifier.
  36. :param dict instance_data: Master model data.
  37. :param dict or None previous_data: Previous model data for changed tracked fields, if exists.
  38. :param dict or None meta: Payload metadata, if exists.
  39. :param str or None queue: Synced queue.
  40. """
  41. if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC):
  42. logger.error('Bad signal type "{0}" for CQRS_ID "{1}".'.format(signal_type, cqrs_id))
  43. return
  44. model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
  45. if not model_cls:
  46. return
  47. this_queue = settings.CQRS['queue']
  48. if signal_type == SignalType.SYNC and model_cls.CQRS_ONLY_DIRECT_SYNCS and queue != this_queue:
  49. return True
  50. db_is_needed = not model_cls.CQRS_NO_DB_OPERATIONS
  51. if db_is_needed:
  52. close_old_connections()
  53. is_meta_supported = model_cls.CQRS_META
  54. try:
  55. if db_is_needed:
  56. apply_query_timeouts(model_cls)
  57. with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
  58. if signal_type == SignalType.DELETE:
  59. if is_meta_supported:
  60. return model_cls.cqrs_delete(instance_data, meta=meta)
  61. return model_cls.cqrs_delete(instance_data)
  62. f_kw = {'previous_data': previous_data}
  63. if is_meta_supported:
  64. f_kw['meta'] = meta
  65. if signal_type == SignalType.SAVE:
  66. return model_cls.cqrs_save(instance_data, **f_kw)
  67. if signal_type == SignalType.SYNC:
  68. f_kw['sync'] = True
  69. return model_cls.cqrs_save(instance_data, **f_kw)
  70. except Error as e:
  71. pk_name = getattr(model_cls._meta.pk, 'name', 'id')
  72. pk_value = instance_data.get(pk_name)
  73. cqrs_revision = instance_data.get('cqrs_revision')
  74. logger.error(
  75. '{0}\nCQRS {1} error: pk = {2}, cqrs_revision = {3} ({4}).'.format(
  76. str(e),
  77. signal_type,
  78. pk_value,
  79. cqrs_revision,
  80. model_cls.CQRS_ID,
  81. ),
  82. )
  83. log_timed_out_queries(e, model_cls)