test_signals.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from datetime import datetime, timezone
  3. import pytest
  4. from django.db import transaction
  5. from django.db.models.signals import post_delete, post_save
  6. from dj_cqrs.constants import SignalType
  7. from dj_cqrs.signals import post_bulk_create, post_update
  8. from tests.dj_master import models
  9. from tests.utils import assert_is_sub_dict, assert_publisher_once_called_with_args
  10. @pytest.mark.parametrize('model', (models.AllFieldsModel, models.BasicFieldsModel))
  11. @pytest.mark.parametrize('signal', (post_delete, post_save, post_bulk_create, post_update))
  12. def test_signals_are_registered(model, signal):
  13. assert signal.has_listeners(model)
  14. @pytest.mark.django_db(transaction=True)
  15. def test_post_save_create(mocker):
  16. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  17. models.SimplestModel.objects.create(id=1)
  18. assert_publisher_once_called_with_args(
  19. publisher_mock,
  20. SignalType.SAVE,
  21. models.SimplestModel.CQRS_ID,
  22. {'id': 1, 'name': None},
  23. 1,
  24. )
  25. @pytest.mark.django_db(transaction=True)
  26. def test_post_save_create_with_retry_fields(settings, mocker):
  27. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  28. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  29. settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 10
  30. expected_expires = datetime(2020, 1, 1, second=10, tzinfo=timezone.utc)
  31. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  32. models.SimplestModel.objects.create(id=1)
  33. assert publisher_mock.call_count == 1
  34. call_t_payload = publisher_mock.call_args[0][0]
  35. assert call_t_payload.expires == expected_expires
  36. assert call_t_payload.retries == 0
  37. @pytest.mark.django_db(transaction=True)
  38. def test_post_save_update(mocker):
  39. m = models.SimplestModel.objects.create(id=1)
  40. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  41. m.name = 'new'
  42. m.save()
  43. assert_publisher_once_called_with_args(
  44. publisher_mock,
  45. SignalType.SAVE,
  46. models.SimplestModel.CQRS_ID,
  47. {'id': 1, 'name': 'new'},
  48. 1,
  49. )
  50. @pytest.mark.django_db(transaction=True)
  51. def test_post_save_delete(mocker):
  52. m = models.SimplestModel.objects.create(id=1)
  53. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  54. m.delete()
  55. assert_publisher_once_called_with_args(
  56. publisher_mock,
  57. SignalType.DELETE,
  58. models.SimplestModel.CQRS_ID,
  59. {'id': 1, 'cqrs_revision': 1},
  60. 1,
  61. )
  62. cqrs_updated = publisher_mock.call_args[0][0].to_dict()['instance_data']['cqrs_updated']
  63. assert isinstance(cqrs_updated, str)
  64. @pytest.mark.django_db(transaction=True)
  65. def test_post_save_instance_doesnt_exist(caplog):
  66. with transaction.atomic():
  67. models.Author.objects.create(id=1, name='The author')
  68. models.Author.objects.get(id=1).delete()
  69. assert (
  70. "Can't produce message from master model 'Author': " "The instance doesn't exist (pk=1)"
  71. ) in caplog.text
  72. @pytest.mark.django_db(transaction=True)
  73. def test_post_save_delete_with_retry_fields(settings, mocker):
  74. m = models.SimplestModel.objects.create(id=1)
  75. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  76. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  77. settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 10
  78. expected_expires = datetime(2020, 1, 1, second=10, tzinfo=timezone.utc)
  79. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  80. m.delete()
  81. assert publisher_mock.call_count == 1
  82. call_t_payload = publisher_mock.call_args[0][0]
  83. assert call_t_payload.expires == expected_expires
  84. assert call_t_payload.retries == 0
  85. @pytest.mark.django_db(transaction=True)
  86. def test_manual_post_bulk_create(mocker):
  87. models.AutoFieldsModel.objects.bulk_create([models.AutoFieldsModel() for _ in range(3)])
  88. created_models = list(models.AutoFieldsModel.objects.all())
  89. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  90. models.AutoFieldsModel.call_post_bulk_create(created_models)
  91. assert publisher_mock.call_count == 3
  92. @pytest.mark.django_db(transaction=True)
  93. def test_automatic_post_bulk_create(mocker):
  94. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  95. instances = models.SimplestTrackedModel.cqrs.bulk_create(
  96. [models.SimplestTrackedModel(id=i, status='new') for i in range(1, 4)],
  97. )
  98. assert len(instances) == 3
  99. for index in range(3):
  100. instance = instances[index]
  101. assert instance.id == index + 1
  102. assert instance.status == 'new'
  103. assert instance.cqrs_revision == 0
  104. assert publisher_mock.call_count == 3
  105. for index, call in enumerate(publisher_mock.call_args_list, start=1):
  106. payload = call[0][0]
  107. assert payload.signal_type == SignalType.SAVE
  108. assert payload.instance_data['id'] == index
  109. assert payload.instance_data['status'] == 'new'
  110. assert payload.instance_data['description'] is None
  111. assert payload.previous_data == {'status': None}
  112. @pytest.mark.parametrize(
  113. 'filter_kwargs',
  114. ({'name': 'old'}, {'id__in': {0, 1, 2}}),
  115. )
  116. @pytest.mark.django_db(transaction=True)
  117. def test_post_bulk_update_wout_prev_data(mocker, filter_kwargs):
  118. for i in range(3):
  119. models.SimplestModel.objects.create(id=i, name='old')
  120. cqrs_updated = list(
  121. models.SimplestModel.objects.all().values_list('cqrs_updated', flat=True),
  122. )
  123. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  124. models.SimplestModel.cqrs.bulk_update(
  125. queryset=models.SimplestModel.objects.filter(**filter_kwargs),
  126. name='new',
  127. )
  128. assert publisher_mock.call_count == 3
  129. for pk, t0_payload in enumerate(
  130. [x[0][0] for x in publisher_mock.call_args_list],
  131. ):
  132. assert t0_payload.signal_type == SignalType.SAVE
  133. assert t0_payload.cqrs_id == models.SimplestModel.CQRS_ID
  134. assert t0_payload.pk == pk
  135. assert_is_sub_dict(
  136. {'id': pk, 'name': 'new'},
  137. t0_payload.instance_data,
  138. )
  139. m = models.SimplestModel.objects.get(id=pk)
  140. assert m.cqrs_updated > cqrs_updated[pk]
  141. assert m.cqrs_revision == 1
  142. assert m.name == 'new'
  143. @pytest.mark.parametrize(
  144. ('filter_kwargs', 'cqrs_revision', 'data'),
  145. (
  146. (
  147. {'description': 'old'},
  148. 1,
  149. (
  150. (0, {'description': 'old', 'status': None}, 1),
  151. (1, {'description': 'old', 'status': 'x'}, 2),
  152. (2, {'description': 'old', 'status': None}, 1),
  153. ),
  154. ),
  155. (
  156. {'id__in': {0, 1}},
  157. 0,
  158. (
  159. (0, {'description': 'old', 'status': None}, 1),
  160. (1, {'description': 'old', 'status': 'x'}, 2),
  161. ),
  162. ),
  163. ),
  164. )
  165. @pytest.mark.django_db(transaction=True)
  166. def test_post_bulk_update_with_prev_data(mocker, filter_kwargs, cqrs_revision, data):
  167. for i in range(3):
  168. models.SimplestTrackedModel.objects.create(id=i, description='old')
  169. m = models.SimplestTrackedModel.objects.get(id=1)
  170. m.status = 'x'
  171. m.save()
  172. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  173. call_count = models.SimplestTrackedModel.cqrs.bulk_update(
  174. queryset=models.SimplestTrackedModel.objects.filter(**filter_kwargs).order_by('id'),
  175. description='new',
  176. status=None,
  177. )
  178. m = models.SimplestTrackedModel.objects.get(id=2)
  179. assert m.cqrs_revision == cqrs_revision
  180. assert publisher_mock.call_count == call_count
  181. publisher_call_args_list = sorted(publisher_mock.call_args_list, key=lambda x: x[0][0].pk)
  182. for pk, prev_data, revision in data:
  183. t0_payload = publisher_call_args_list[pk][0][0]
  184. assert t0_payload.signal_type == SignalType.SAVE
  185. assert t0_payload.cqrs_id == models.SimplestTrackedModel.CQRS_ID
  186. assert t0_payload.pk == pk
  187. assert_is_sub_dict(
  188. {'id': pk, 'description': 'new', 'status': None},
  189. t0_payload.instance_data,
  190. )
  191. assert t0_payload.previous_data == prev_data
  192. m = models.SimplestTrackedModel.objects.get(id=pk)
  193. assert m.cqrs_revision == revision
  194. assert m.description == 'new'
  195. assert m.status is None
  196. @pytest.mark.django_db
  197. def test_post_bulk_update_nothing_to_update(mocker):
  198. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  199. models.SimplestTrackedModel.cqrs.bulk_update(
  200. queryset=models.SimplestTrackedModel.objects.all(),
  201. description='something',
  202. )
  203. publisher_mock.assert_not_called()