dataclasses.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from dateutil.parser import parse as dateutil_parse
  3. from django.utils import timezone
  4. from dj_cqrs.correlation import get_correlation_id
  5. from dj_cqrs.utils import get_json_valid_value, get_message_expiration_dt
  6. class TransportPayload:
  7. """Transport message payload.
  8. Args:
  9. signal_type (dj_cqrs.constants.SignalType): Type of the signal for this message.
  10. cqrs_id (str): The unique CQRS identifier of the model.
  11. instance_data (dict): Serialized data of the instance that
  12. generates the event.
  13. instance_pk (str): Primary key of the instance.
  14. queue (str): Queue to synchronize, defaults to None.
  15. previous_data (dict): Previous values for fields tracked for changes,
  16. defaults to None.
  17. correlation_id (str): Correlation ID of process, where this payload is used.
  18. retries (int): Current number of message retries.
  19. expires (datetime): Message expiration datetime, infinite if None
  20. meta (dict): Payload metadata
  21. """
  22. def __init__(
  23. self,
  24. signal_type,
  25. cqrs_id: str,
  26. instance_data: dict,
  27. instance_pk: str,
  28. queue: str = None,
  29. previous_data: dict = None,
  30. correlation_id: str = None,
  31. expires=None,
  32. retries: int = 0,
  33. meta: dict = None,
  34. ):
  35. self.__signal_type = signal_type
  36. self.__cqrs_id = cqrs_id
  37. self.__instance_data = instance_data
  38. self.__instance_pk = instance_pk
  39. self.__queue = queue
  40. self.__previous_data = previous_data
  41. self.__meta = meta
  42. if correlation_id:
  43. self.__correlation_id = correlation_id
  44. else:
  45. self.__correlation_id = get_correlation_id(signal_type, cqrs_id, instance_pk, queue)
  46. self.__expires = expires
  47. self.__retries = retries
  48. @classmethod
  49. def from_message(cls, dct):
  50. """Builds payload from message data.
  51. Args:
  52. dct (dict): Deserialized message body data.
  53. Returns:
  54. (TransportPayload): TransportPayload instance.
  55. """
  56. if 'expires' in dct:
  57. expires = dct['expires']
  58. if dct['expires'] is not None:
  59. expires = dateutil_parse(dct['expires'])
  60. else:
  61. # Backward compatibility for old messages otherwise they are infinite by default.
  62. expires = get_message_expiration_dt()
  63. return cls(
  64. dct['signal_type'],
  65. dct['cqrs_id'],
  66. dct['instance_data'],
  67. dct.get('instance_pk'),
  68. previous_data=dct.get('previous_data'),
  69. correlation_id=dct.get('correlation_id'),
  70. expires=expires,
  71. retries=dct.get('retries') or 0,
  72. meta=dct.get('meta'),
  73. )
  74. @property
  75. def signal_type(self):
  76. return self.__signal_type
  77. @property
  78. def cqrs_id(self):
  79. return self.__cqrs_id
  80. @property
  81. def instance_data(self):
  82. return self.__instance_data
  83. @property
  84. def pk(self):
  85. return self.__instance_pk
  86. @property
  87. def queue(self):
  88. return self.__queue
  89. @property
  90. def previous_data(self):
  91. return self.__previous_data
  92. @property
  93. def correlation_id(self):
  94. return self.__correlation_id
  95. @property
  96. def meta(self):
  97. return self.__meta
  98. @property
  99. def expires(self):
  100. return self.__expires
  101. @property
  102. def retries(self):
  103. return self.__retries
  104. @retries.setter
  105. def retries(self, value):
  106. assert value >= 0, 'Payload retries field should be 0 or positive integer.'
  107. self.__retries = value
  108. def to_dict(self) -> dict:
  109. """Return the payload as a dictionary.
  110. Returns:
  111. (dict): This payload.
  112. """
  113. expires = self.__expires
  114. if expires:
  115. expires = expires.replace(microsecond=0).isoformat()
  116. return {
  117. 'signal_type': self.__signal_type,
  118. 'cqrs_id': self.__cqrs_id,
  119. 'instance_data': self.__instance_data,
  120. 'previous_data': self.__previous_data,
  121. 'instance_pk': get_json_valid_value(self.__instance_pk),
  122. 'correlation_id': get_json_valid_value(self.__correlation_id),
  123. 'retries': self.__retries,
  124. 'expires': expires,
  125. 'meta': self.__meta,
  126. }
  127. def is_expired(self):
  128. """Checks if this payload is expired.
  129. Returns:
  130. (bool): True if payload is expired, False otherwise.
  131. """
  132. return self.__expires is not None and self.__expires <= timezone.now()