managers.py 13 KB


  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from django.core.exceptions import ValidationError
  4. from django.db import Error, transaction
  5. from django.db.models import F, Manager
  6. from django.utils import timezone
  7. from dj_cqrs.constants import FIELDS_TRACKER_FIELD_NAME, TRACKED_FIELDS_ATTR_NAME
  8. logger = logging.getLogger('django-cqrs')
  9. class MasterManager(Manager):
  10. def bulk_create(self, objs, **kwargs):
  11. """
  12. Custom bulk create method to support sending of create signals.
  13. This can be used only in cases, when IDs are generated on client or DB returns IDs.
  14. Args:
  15. objs (List[django.db.models.Model]): List of objects for creation.
  16. kwargs (dict): Bulk create kwargs.
  17. """
  18. for obj in objs:
  19. obj.save_tracked_fields()
  20. objs = super().bulk_create(objs, **kwargs)
  21. if objs:
  22. self.model.call_post_bulk_create(objs, using=self.db)
  23. return objs
  24. def bulk_update(self, queryset, **kwargs):
  25. """Custom update method to support sending of update signals.
  26. Args:
  27. queryset (django.db.models.QuerySet): Django Queryset (f.e. filter).
  28. kwargs (dict): Update kwargs.
  29. """
  30. prev_data_mapper = {}
  31. collect_prev_data = hasattr(self.model, FIELDS_TRACKER_FIELD_NAME)
  32. # Add filter by list of ids in case of update kwargs
  33. # are the same as the chain filter kwargs in the Queryset.
  34. # If that happen the .all() method will refresh after update and
  35. # result in an empty Queryset that will not send the signal.
  36. ids_list = list(queryset.values_list('pk', flat=True))
  37. def list_all():
  38. return list(queryset.model.objects.filter(pk__in=ids_list).all())
  39. with transaction.atomic(savepoint=False):
  40. if collect_prev_data:
  41. objs = list_all()
  42. if not objs:
  43. return
  44. for obj in objs:
  45. prev_data_mapper[obj.pk] = getattr(obj, FIELDS_TRACKER_FIELD_NAME).current()
  46. current_dt = timezone.now()
  47. result = queryset.update(
  48. cqrs_revision=F('cqrs_revision') + 1,
  49. cqrs_updated=current_dt,
  50. **kwargs,
  51. )
  52. objs = list_all()
  53. if collect_prev_data:
  54. for obj in objs:
  55. setattr(obj, TRACKED_FIELDS_ATTR_NAME, prev_data_mapper.get(obj.pk))
  56. queryset.model.call_post_update(objs, using=queryset.db)
  57. return result
  58. class ReplicaManager(Manager):
  59. def save_instance(
  60. self,
  61. master_data: dict,
  62. previous_data: dict = None,
  63. sync: bool = False,
  64. meta: dict = None,
  65. ):
  66. """This method saves (creates or updates) model instance from CQRS master instance data.
  67. Args:
  68. master_data (dict): CQRS master instance data.
  69. previous_data (dict): Previous values for tracked fields.
  70. sync (bool): Sync package flag.
  71. meta (dict): Payload metadata, if exists.
  72. Returns:
  73. (django.db.models.Model): Model instance.
  74. """
  75. mapped_data = self._map_save_data(master_data)
  76. mapped_previous_data = self._map_previous_data(previous_data) if previous_data else None
  77. if mapped_data:
  78. pk_name = self._get_model_pk_name()
  79. pk_value = mapped_data[pk_name]
  80. f_kwargs = {pk_name: pk_value}
  81. if hasattr(self.model, 'objects_all'): # wxl 2023-8-16
  82. qs = self.model.objects_all.filter(**f_kwargs).order_by()
  83. else:
  84. qs = self.model._default_manager.filter(**f_kwargs).order_by()
  85. if self.model.CQRS_SELECT_FOR_UPDATE:
  86. qs = qs.select_for_update()
  87. instance = qs.first()
  88. if instance:
  89. return self.update_instance(
  90. instance,
  91. mapped_data,
  92. previous_data=mapped_previous_data,
  93. sync=sync,
  94. meta=meta,
  95. )
  96. return self.create_instance(
  97. mapped_data,
  98. previous_data=mapped_previous_data,
  99. sync=sync,
  100. meta=meta,
  101. )
  102. def create_instance(
  103. self,
  104. mapped_data: dict,
  105. previous_data: dict = None,
  106. sync: bool = False,
  107. meta: dict = None,
  108. ):
  109. """This method creates model instance from mapped CQRS master instance data.
  110. Args:
  111. mapped_data (dict): Mapped CQRS master instance data.
  112. previous_data (dict): Previous values for tracked fields.
  113. sync (bool): Sync package flag.
  114. meta (dict): Payload metadata, if exists.
  115. Returns:
  116. (django.db.models.Model): ReplicaMixin instance.
  117. """
  118. f_kw = {'previous_data': previous_data}
  119. if self.model.CQRS_META:
  120. f_kw['meta'] = meta
  121. try:
  122. return self.model.cqrs_create(sync, mapped_data, **f_kw)
  123. except (Error, ValidationError) as e:
  124. pk_value = mapped_data[self._get_model_pk_name()]
  125. logger.error(
  126. '{0}\nCQRS create error: pk = {1} ({2}).'.format(
  127. str(e),
  128. pk_value,
  129. self.model.CQRS_ID,
  130. ),
  131. )
  132. def update_instance(
  133. self,
  134. instance,
  135. mapped_data: dict,
  136. previous_data: dict = None,
  137. sync: bool = False,
  138. meta: dict = None,
  139. ):
  140. """This method updates model instance from mapped CQRS master instance data.
  141. Args:
  142. instance (django.db.models.Model): ReplicaMixin model instance.
  143. mapped_data (dict): Mapped CQRS master instance data.
  144. previous_data (dict): Previous values for tracked fields.
  145. sync (bool): Sync package flag.
  146. meta (dict): Payload metadata, if exists.
  147. Returns:
  148. (django.db.models.Model): ReplicaMixin instance.
  149. """
  150. pk_value = mapped_data[self._get_model_pk_name()]
  151. current_cqrs_revision = mapped_data['cqrs_revision']
  152. existing_cqrs_revision = instance.cqrs_revision
  153. if sync:
  154. if existing_cqrs_revision > current_cqrs_revision:
  155. w_tpl = (
  156. 'CQRS revision downgrade on sync: pk = {0}, '
  157. 'cqrs_revision = new {1} / existing {2} ({3}).'
  158. )
  159. logger.warning(
  160. w_tpl.format(
  161. pk_value,
  162. current_cqrs_revision,
  163. existing_cqrs_revision,
  164. self.model.CQRS_ID,
  165. ),
  166. )
  167. else:
  168. if existing_cqrs_revision > current_cqrs_revision:
  169. e_tpl = (
  170. 'Wrong CQRS sync order: pk = {0}, '
  171. 'cqrs_revision = new {1} / existing {2} ({3}).'
  172. )
  173. logger.error(
  174. e_tpl.format(
  175. pk_value,
  176. current_cqrs_revision,
  177. existing_cqrs_revision,
  178. self.model.CQRS_ID,
  179. ),
  180. )
  181. return instance
  182. if existing_cqrs_revision == current_cqrs_revision:
  183. logger.error(
  184. 'Received duplicate CQRS data: pk = {0}, cqrs_revision = {1} ({2}).'.format(
  185. pk_value,
  186. current_cqrs_revision,
  187. self.model.CQRS_ID,
  188. ),
  189. )
  190. if current_cqrs_revision == 0:
  191. logger.warning(
  192. 'CQRS potential creation race condition: pk = {0} ({1}).'.format(
  193. pk_value,
  194. self.model.CQRS_ID,
  195. ),
  196. )
  197. return instance
  198. if current_cqrs_revision != instance.cqrs_revision + 1:
  199. w_tpl = (
  200. 'Lost or filtered out {0} CQRS packages: pk = {1}, cqrs_revision = {2} ({3})'
  201. )
  202. logger.warning(
  203. w_tpl.format(
  204. current_cqrs_revision - instance.cqrs_revision - 1,
  205. pk_value,
  206. current_cqrs_revision,
  207. self.model.CQRS_ID,
  208. ),
  209. )
  210. f_kw = {'previous_data': previous_data}
  211. if self.model.CQRS_META:
  212. f_kw['meta'] = meta
  213. try:
  214. return instance.cqrs_update(sync, mapped_data, **f_kw)
  215. except (Error, ValidationError) as e:
  216. logger.error(
  217. '{0}\nCQRS update error: pk = {1}, cqrs_revision = {2} ({3}).'.format(
  218. str(e),
  219. pk_value,
  220. current_cqrs_revision,
  221. self.model.CQRS_ID,
  222. ),
  223. )
  224. def delete_instance(self, master_data: dict) -> bool:
  225. """This method deletes model instance from mapped CQRS master instance data.
  226. Args:
  227. master_data (dict): CQRS master instance data.
  228. Returns:
  229. Flag, if delete operation is successful (even if nothing was deleted).
  230. """
  231. mapped_data = self._map_delete_data(master_data)
  232. if mapped_data:
  233. pk_name = self._get_model_pk_name()
  234. pk_value = mapped_data[pk_name]
  235. try:
  236. if hasattr(self.model, 'objects_all'): # wxl 2023-8-16
  237. self.model.objects_all.filter(**{pk_name: pk_value}).delete()
  238. else:
  239. self.model._default_manager.filter(**{pk_name: pk_value}).delete()
  240. return True
  241. except Error as e:
  242. logger.error(
  243. '{0}\nCQRS delete error: pk = {1} ({2}).'.format(
  244. str(e),
  245. pk_value,
  246. self.model.CQRS_ID,
  247. ),
  248. )
  249. return False
  250. def _map_previous_data(self, previous_data):
  251. if self.model.CQRS_MAPPING is None:
  252. return previous_data
  253. mapped_previous_data = {}
  254. for master_name, replica_name in self.model.CQRS_MAPPING.items():
  255. if master_name not in previous_data:
  256. continue
  257. mapped_previous_data[replica_name] = previous_data[master_name]
  258. mapped_previous_data = self._remove_excessive_data(mapped_previous_data)
  259. return mapped_previous_data
  260. def _map_save_data(self, master_data):
  261. if not self._cqrs_fields_are_filled(master_data):
  262. return
  263. mapped_data = self._make_initial_mapping(master_data)
  264. if not mapped_data:
  265. return
  266. if self._get_model_pk_name() not in mapped_data:
  267. self._log_pk_data_error()
  268. return
  269. if self.model.CQRS_CUSTOM_SERIALIZATION:
  270. return mapped_data
  271. mapped_data = self._remove_excessive_data(mapped_data)
  272. if self._all_required_fields_are_filled(mapped_data):
  273. return mapped_data
  274. def _make_initial_mapping(self, master_data):
  275. if self.model.CQRS_MAPPING is None:
  276. return master_data
  277. mapped_data = {
  278. 'cqrs_revision': master_data['cqrs_revision'],
  279. 'cqrs_updated': master_data['cqrs_updated'],
  280. }
  281. for master_name, replica_name in self.model.CQRS_MAPPING.items():
  282. if master_name not in master_data:
  283. logger.error(
  284. 'Bad master-replica mapping for {0} ({1}).'.format(
  285. master_name,
  286. self.model.CQRS_ID,
  287. ),
  288. )
  289. return
  290. mapped_data[replica_name] = master_data[master_name]
  291. return mapped_data
  292. def _remove_excessive_data(self, data):
  293. opts = self.model._meta
  294. possible_field_names = {f.name for f in opts.fields}
  295. return {k: v for k, v in data.items() if k in possible_field_names}
  296. def _all_required_fields_are_filled(self, mapped_data):
  297. opts = self.model._meta
  298. required_field_names = {f.name for f in opts.fields if not f.null}
  299. if not (required_field_names - set(mapped_data.keys())):
  300. return True
  301. logger.error(
  302. 'Not all required CQRS fields are provided in data ({0}).'.format(self.model.CQRS_ID),
  303. )
  304. return False
  305. def _map_delete_data(self, master_data):
  306. if 'id' not in master_data:
  307. self._log_pk_data_error()
  308. return
  309. if not self._cqrs_fields_are_filled(master_data):
  310. return
  311. return {
  312. self._get_model_pk_name(): master_data['id'],
  313. 'cqrs_revision': master_data['cqrs_revision'],
  314. 'cqrs_updated': master_data['cqrs_updated'],
  315. }
  316. def _cqrs_fields_are_filled(self, data):
  317. if 'cqrs_revision' in data and 'cqrs_updated' in data:
  318. return True
  319. logger.error('CQRS sync fields are not provided in data ({0}).'.format(self.model.CQRS_ID))
  320. return False
  321. def _log_pk_data_error(self):
  322. logger.error('CQRS PK is not provided in data ({0}).'.format(self.model.CQRS_ID))
  323. def _get_model_pk_name(self):
  324. return self.model._meta.pk.name