mixins.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from django.conf import settings
  4. from django.db import router, transaction
  5. from django.db.models import (
  6. DateField,
  7. DateTimeField,
  8. F,
  9. IntegerField,
  10. Manager,
  11. Model,
  12. UUIDField,
  13. )
  14. from django.db.models.expressions import CombinedExpression
  15. from django.utils.module_loading import import_string
  16. from dj_cqrs.constants import ALL_BASIC_FIELDS, FIELDS_TRACKER_FIELD_NAME, TRACKED_FIELDS_ATTR_NAME
  17. from dj_cqrs.managers import MasterManager, ReplicaManager
  18. from dj_cqrs.metas import MasterMeta, ReplicaMeta
  19. from dj_cqrs.signals import MasterSignals, post_bulk_create, post_update
  20. logger = logging.getLogger('django-cqrs')
  21. class RawMasterMixin(Model):
  22. """Base class for MasterMixin. **Users shouldn't use this
  23. class directly.**"""
  24. CQRS_ID = None
  25. """Unique CQRS identifier for all microservices."""
  26. CQRS_PRODUCE = True
  27. """If false, no cqrs data is sent through the transport."""
  28. CQRS_FIELDS = ALL_BASIC_FIELDS
  29. """
  30. List of fields to include in the CQRS payload.
  31. You can also set the fields attribute to the special value '__all__'
  32. to indicate that all fields in the model should be used.
  33. """
  34. CQRS_SERIALIZER = None
  35. """
  36. Optional serializer used to create the instance representation.
  37. Must be expressed as a module dotted path string like
  38. `mymodule.serializers.MasterModelSerializer`.
  39. """
  40. CQRS_TRACKED_FIELDS = None
  41. """
  42. List of fields of the main model for which you want to track the changes
  43. and send the previous values via transport. You can also set the field
  44. attribute to the special value "__all__" to indicate that all fields in
  45. the model must be used.
  46. """
  47. objects = Manager()
  48. cqrs = MasterManager()
  49. """Manager that adds needed CQRS queryset methods."""
  50. cqrs_revision = IntegerField(
  51. default=0,
  52. help_text='This field must be incremented on any model update. '
  53. "It's used to for CQRS sync.",
  54. )
  55. cqrs_updated = DateTimeField(
  56. auto_now=True,
  57. help_text='This field must be incremented on every model update. '
  58. "It's used to for CQRS sync.",
  59. )
  60. class Meta:
  61. abstract = True
  62. @property
  63. def cqrs_saves_count(self):
  64. """Shows how many times this instance has been saved within the transaction."""
  65. return getattr(self, '_cqrs_saves_count', 0)
  66. @property
  67. def is_initial_cqrs_save(self):
  68. """This flag is used to check if instance has already been registered for CQRS update."""
  69. return self.cqrs_saves_count < 2
  70. def reset_cqrs_saves_count(self):
  71. """This method is used to automatically reset instance CQRS counters on transaction commit.
  72. But this can also be used to control custom behaviour within transaction
  73. or in case of rollback,
  74. when several sequential transactions are used to change the same instance.
  75. """
  76. if hasattr(self, '_cqrs_saves_count'):
  77. self._cqrs_saves_count = 0
  78. def save(self, *args, **kwargs):
  79. update_fields = kwargs.pop('update_fields', None)
  80. update_cqrs_fields = kwargs.pop('update_cqrs_fields', self._update_cqrs_fields_default)
  81. using = kwargs.get('using') or router.db_for_write(self.__class__, instance=self)
  82. connection = transaction.get_connection(using)
  83. if connection.in_atomic_block:
  84. _cqrs_saves_count = self.cqrs_saves_count
  85. self._cqrs_saves_count = _cqrs_saves_count + 1
  86. else:
  87. self.reset_cqrs_saves_count()
  88. if (not update_fields) and self.is_initial_cqrs_save and (not self._state.adding):
  89. self.cqrs_revision = F('cqrs_revision') + 1
  90. elif update_fields and update_cqrs_fields:
  91. self.cqrs_revision = F('cqrs_revision') + 1
  92. update_fields = set(update_fields)
  93. update_fields.update({'cqrs_revision', 'cqrs_updated'})
  94. kwargs['update_fields'] = update_fields
  95. self.save_tracked_fields()
  96. return super(RawMasterMixin, self).save(*args, **kwargs)
  97. def save_tracked_fields(self):
  98. if hasattr(self, FIELDS_TRACKER_FIELD_NAME):
  99. tracker = getattr(self, FIELDS_TRACKER_FIELD_NAME)
  100. if self.is_initial_cqrs_save:
  101. if self._state.adding:
  102. data = tracker.changed_initial()
  103. else:
  104. data = tracker.changed()
  105. setattr(self, TRACKED_FIELDS_ATTR_NAME, data)
  106. @property
  107. def _update_cqrs_fields_default(self):
  108. return settings.CQRS['master']['CQRS_AUTO_UPDATE_FIELDS']
  109. def to_cqrs_dict(self, using: str = None, sync: bool = False) -> dict:
  110. """CQRS serialization for transport payload.
  111. Args:
  112. using (str): The using argument can be used to force the database to use,
  113. defaults to None.
  114. sync (bool): optional
  115. Returns:
  116. (dict): The serialized instance data.
  117. """
  118. if self.CQRS_SERIALIZER:
  119. data = self._class_serialization(using, sync=sync)
  120. else:
  121. self._refresh_f_expr_values(using)
  122. data = self._common_serialization(using)
  123. return data
  124. def get_tracked_fields_data(self) -> dict:
  125. """CQRS serialization for tracked fields to include
  126. in the transport payload.
  127. Returns:
  128. (dict): Previous values for tracked fields.
  129. """
  130. return getattr(self, TRACKED_FIELDS_ATTR_NAME, None)
  131. def cqrs_sync(self, using: str = None, queue: str = None) -> bool:
  132. """Manual instance synchronization.
  133. Args:
  134. using (str): The using argument can be used to force the database
  135. to use, defaults to None.
  136. queue (str): Syncing can be executed just for a single queue, defaults to None
  137. (all queues).
  138. Returns:
  139. (bool): True if instance can be synced, False otherwise.
  140. """
  141. if self._state.adding:
  142. return False
  143. if not self.CQRS_SERIALIZER:
  144. try:
  145. self.refresh_from_db()
  146. except self._meta.model.DoesNotExist:
  147. return False
  148. MasterSignals.post_save(
  149. self._meta.model,
  150. instance=self,
  151. using=using,
  152. queue=queue,
  153. sync=True,
  154. )
  155. return True
  156. def is_sync_instance(self) -> bool:
  157. """
  158. This method can be overridden to apply syncing only to instances by some rules.
  159. For example, only objects with special status or after some creation date, etc.
  160. Returns:
  161. (bool): True if this instance needs to be synced, False otherwise.
  162. """
  163. return True
  164. def get_cqrs_meta(self, **kwargs: dict) -> dict:
  165. """
  166. This method can be overridden to collect model/instance specific metadata.
  167. Args:
  168. kwargs (dict): Signal type, payload data, etc.
  169. Returns:
  170. (dict): Metadata dictionary if it's provided.
  171. """
  172. generic_meta_func = settings.CQRS['master']['meta_function']
  173. if generic_meta_func:
  174. return generic_meta_func(obj=self, **kwargs)
  175. return {}
  176. @classmethod
  177. def relate_cqrs_serialization(cls, queryset):
  178. """
  179. This method shoud be overriden to optimize database access
  180. for example using `select_related` and `prefetch_related`
  181. when related models must be included into the master model
  182. representation.
  183. Args:
  184. queryset (django.db.models.QuerySet): The initial queryset.
  185. Returns:
  186. (django.db.models.QuerySet): The optimized queryset.
  187. """
  188. return queryset
  189. def get_custom_cqrs_delete_data(self):
  190. """This method should be overridden when additional data is needed in DELETE payload."""
  191. pass
  192. @classmethod
  193. def call_post_bulk_create(cls, instances: list, using=None):
  194. """Post bulk create signal caller (django doesn't support it by default).
  195. ``` py3
  196. # Used automatically by cqrs.bulk_create()
  197. instances = model.cqrs.bulk_create(instances)
  198. ```
  199. """
  200. post_bulk_create.send(cls, instances=instances, using=using)
  201. @classmethod
  202. def call_post_update(cls, instances, using=None):
  203. """Post bulk update signal caller (django doesn't support it by default).
  204. ``` py3
  205. # Used automatically by cqrs.bulk_update()
  206. qs = model.objects.filter(k1=v1)
  207. model.cqrs.bulk_update(qs, k2=v2)
  208. ```
  209. """
  210. post_update.send(cls, instances=instances, using=using)
  211. def _common_serialization(self, using):
  212. opts = self._meta
  213. if isinstance(self.CQRS_FIELDS, str) and self.CQRS_FIELDS == ALL_BASIC_FIELDS:
  214. included_fields = None
  215. else:
  216. included_fields = self.CQRS_FIELDS
  217. data = {}
  218. for f in opts.fields:
  219. if included_fields and (f.name not in included_fields):
  220. continue
  221. value = f.value_from_object(self)
  222. if value is not None and isinstance(f, (DateField, DateTimeField, UUIDField)):
  223. value = str(value)
  224. data[f.name] = value
  225. # We need to include additional fields for synchronisation, f.e. to prevent de-duplication
  226. data['cqrs_revision'] = self.cqrs_revision
  227. data['cqrs_updated'] = str(self.cqrs_updated)
  228. return data
  229. def _class_serialization(self, using, sync=False):
  230. if sync:
  231. instance = self
  232. else:
  233. db = using if using is not None else self._state.db
  234. if hasattr(self.__class__, 'objects_all'): # wxl 2023-8-29
  235. qs = self.__class__.objects_all.using(db)
  236. else:
  237. qs = self.__class__._default_manager.using(db)
  238. instance = self.relate_cqrs_serialization(qs).get(pk=self.pk)
  239. data = self._cqrs_serializer_cls(instance).data
  240. data['cqrs_revision'] = instance.cqrs_revision
  241. data['cqrs_updated'] = str(instance.cqrs_updated)
  242. return data
  243. def _refresh_f_expr_values(self, using):
  244. opts = self._meta
  245. fields_to_refresh = []
  246. if isinstance(self.cqrs_revision, CombinedExpression):
  247. fields_to_refresh.append('cqrs_revision')
  248. if isinstance(self.CQRS_FIELDS, str) and self.CQRS_FIELDS == ALL_BASIC_FIELDS:
  249. included_fields = None
  250. else:
  251. included_fields = self.CQRS_FIELDS
  252. for f in opts.fields:
  253. if included_fields and (f.name not in included_fields):
  254. continue
  255. value = f.value_from_object(self)
  256. if value is not None and isinstance(value, CombinedExpression):
  257. fields_to_refresh.append(f.name)
  258. if fields_to_refresh:
  259. self.refresh_from_db(fields=fields_to_refresh)
  260. @property
  261. def _cqrs_serializer_cls(self):
  262. """Serialization class loader."""
  263. if hasattr(self.__class__, '_cqrs_serializer_class'):
  264. return self.__class__._cqrs_serializer_class
  265. try:
  266. serializer = import_string(self.CQRS_SERIALIZER)
  267. self.__class__._cqrs_serializer_class = serializer
  268. return serializer
  269. except ImportError:
  270. raise ImportError(
  271. "Model {0}: CQRS_SERIALIZER can't be imported.".format(self.__class__),
  272. )
  273. class MasterMixin(RawMasterMixin, metaclass=MasterMeta):
  274. """
  275. Mixin for the master CQRS model, that will send data updates to it's replicas.
  276. """
  277. class Meta:
  278. abstract = True
  279. class RawReplicaMixin:
  280. CQRS_ID = None
  281. CQRS_NO_DB_OPERATIONS = True
  282. CQRS_META = False
  283. CQRS_ONLY_DIRECT_SYNCS = False
  284. @classmethod
  285. def cqrs_save(cls, master_data, **kwargs):
  286. raise NotImplementedError
  287. @classmethod
  288. def cqrs_delete(cls, master_data, **kwargs):
  289. raise NotImplementedError
  290. @staticmethod
  291. def should_retry_cqrs(current_retry: int, exception=None) -> bool:
  292. """Checks if we should retry the message after current attempt.
  293. Args:
  294. current_retry (int): Current number of message retries.
  295. exception (Exception): Exception instance raised during message consume.
  296. Returns:
  297. (bool): True if message should be retried, False otherwise.
  298. """
  299. max_retries = settings.CQRS['replica']['CQRS_MAX_RETRIES']
  300. if max_retries is None:
  301. # Infinite
  302. return True
  303. return current_retry < max_retries
  304. @staticmethod
  305. def get_cqrs_retry_delay(current_retry: int) -> int:
  306. """Returns number of seconds to wait before requeuing the message.
  307. Args:
  308. current_retry (int): Current number of message retries.
  309. Returns:
  310. (int): Delay in seconds.
  311. """
  312. return settings.CQRS['replica']['CQRS_RETRY_DELAY']
  313. class ReplicaMixin(RawReplicaMixin, Model, metaclass=ReplicaMeta):
  314. """
  315. Mixin for the replica CQRS model, that will receive data updates from master. Models, using
  316. this mixin should be readonly, but this is not enforced (f.e. for admin).
  317. """
  318. CQRS_ID = None
  319. """Unique CQRS identifier for all microservices."""
  320. CQRS_MAPPING = None
  321. """Mapping of master data field name to replica model field name."""
  322. CQRS_CUSTOM_SERIALIZATION = False
  323. """Set it to True to skip default data check."""
  324. CQRS_SELECT_FOR_UPDATE = False
  325. """Set it to True to acquire lock on instance creation/update."""
  326. CQRS_NO_DB_OPERATIONS = False
  327. """Set it to True to disable any default DB operations for this model."""
  328. CQRS_META = False
  329. """Set it to True to receive meta data for this model."""
  330. CQRS_ONLY_DIRECT_SYNCS = False
  331. """Set it to True to ignore broadcast sync packages and to receive only direct queue syncs."""
  332. objects = Manager()
  333. cqrs = ReplicaManager()
  334. """Manager that adds needed CQRS queryset methods."""
  335. cqrs_revision = IntegerField()
  336. cqrs_updated = DateTimeField()
  337. class Meta:
  338. abstract = True
  339. @classmethod
  340. def cqrs_save(
  341. cls,
  342. master_data: dict,
  343. previous_data: dict = None,
  344. sync: bool = False,
  345. meta: dict = None,
  346. ):
  347. """This method saves (creates or updates) model instance from CQRS master instance data.
  348. This method must not be overridden. Otherwise, sync checks need to be implemented manually.
  349. Args:
  350. master_data (dict): CQRS master instance data.
  351. previous_data (dict): Previous values for tracked fields.
  352. sync (bool): Sync package flag.
  353. meta (dict): Payload metadata, if exists.
  354. Returns:
  355. (django.db.models.Model): Model instance.
  356. """
  357. if cls.CQRS_NO_DB_OPERATIONS:
  358. return super().cqrs_save(master_data, previous_data=previous_data, sync=sync, meta=meta)
  359. return cls.cqrs.save_instance(master_data, previous_data, sync, meta)
  360. @classmethod
  361. def cqrs_create(
  362. cls,
  363. sync: bool,
  364. mapped_data: dict,
  365. previous_data: dict = None,
  366. meta: dict = None,
  367. ):
  368. """This method creates model instance from CQRS mapped instance data. It must be overridden
  369. by replicas of master models with custom serialization.
  370. Args:
  371. sync (dict): Sync package flag.
  372. mapped_data (dict): CQRS mapped instance data.
  373. previous_data (dict): Previous mapped values for tracked fields.
  374. meta (dict): Payload metadata, if exists.
  375. Returns:
  376. (django.db.models.Model): Model instance.
  377. """
  378. if hasattr(cls, 'objects_all'): # wxl 2023-8-16
  379. return cls.objects_all.create(**mapped_data)
  380. else:
  381. return cls._default_manager.create(**mapped_data)
  382. def cqrs_update(
  383. self,
  384. sync: bool,
  385. mapped_data: dict,
  386. previous_data: dict = None,
  387. meta: dict = None,
  388. ):
  389. """This method updates model instance from CQRS mapped instance data. It must be overridden
  390. by replicas of master models with custom serialization.
  391. Args:
  392. sync (dict): Sync package flag.
  393. mapped_data (dict): CQRS mapped instance data.
  394. previous_data (dict): Previous mapped values for tracked fields.
  395. meta (dict): Payload metadata, if exists.
  396. Returns:
  397. (django.db.models.Model): Model instance.
  398. """
  399. for key, value in mapped_data.items():
  400. setattr(self, key, value)
  401. self.save()
  402. return self
  403. @classmethod
  404. def cqrs_delete(cls, master_data: dict, meta: dict = None) -> bool:
  405. """This method deletes model instance from mapped CQRS master instance data.
  406. Args:
  407. master_data (dict): CQRS master instance data.
  408. meta (dict): Payload metadata, if exists.
  409. Returns:
  410. (bool): Flag, if delete operation is successful (even if nothing was deleted).
  411. """
  412. if cls.CQRS_NO_DB_OPERATIONS:
  413. return super().cqrs_delete(master_data, meta=meta)
  414. return cls.cqrs.delete_instance(master_data)