test_mixin.py 19 KB


  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from django.conf import settings
  4. from django.db.models import CharField, IntegerField, QuerySet
  5. from django.utils.timezone import now
  6. from dj_cqrs.constants import SignalType
  7. from dj_cqrs.dataclasses import TransportPayload
  8. from dj_cqrs.metas import ReplicaMeta
  9. from dj_cqrs.mixins import RawReplicaMixin
  10. from tests.dj.transport import TransportStub
  11. from tests.dj_replica import models
  12. from tests.utils import db_error
  13. class ReplicaMetaTest(ReplicaMeta):
  14. @classmethod
  15. def check_cqrs_mapping(cls, model_cls):
  16. return cls._check_cqrs_mapping(model_cls)
  17. def test_cqrs_fields_non_existing_field(mocker):
  18. with pytest.raises(AssertionError) as e:
  19. class Cls(object):
  20. CQRD_ID = 'ID'
  21. CQRS_MAPPING = {
  22. 'chr_field': 'char_field',
  23. 'integer_field': 'int_field',
  24. }
  25. char_field = CharField(max_length=100, primary_key=True)
  26. int_field = IntegerField()
  27. _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=())
  28. _meta.pk.name = 'char_field'
  29. ReplicaMetaTest.check_cqrs_mapping(Cls)
  30. assert str(e.value) == 'CQRS_MAPPING field is not correctly set for model Cls.'
  31. def test_cqrs_fields_id_is_not_included(mocker):
  32. with pytest.raises(AssertionError) as e:
  33. class Cls(object):
  34. CQRD_ID = 'ID'
  35. CQRS_MAPPING = {
  36. 'integer_field': 'int_field',
  37. }
  38. char_field = CharField(max_length=100, primary_key=True)
  39. int_field = IntegerField()
  40. _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=())
  41. _meta.pk.name = 'char_field'
  42. ReplicaMetaTest.check_cqrs_mapping(Cls)
  43. assert str(e.value) == 'PK is not in CQRS_MAPPING for model Cls.'
  44. def test_cqrs_fields_duplicates(mocker):
  45. with pytest.raises(AssertionError) as e:
  46. class Cls(object):
  47. CQRD_ID = 'ID'
  48. CQRS_MAPPING = {
  49. 'integer_field': 'char_field',
  50. 'char_field': 'char_field',
  51. }
  52. char_field = CharField(max_length=100, primary_key=True)
  53. int_field = IntegerField()
  54. _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=())
  55. _meta.pk.name = 'char_field'
  56. ReplicaMetaTest.check_cqrs_mapping(Cls)
  57. assert str(e.value) == 'Duplicate names in CQRS_MAPPING field for model Cls.'
  58. @pytest.mark.django_db
  59. def test_create_simple():
  60. instance = models.BasicFieldsModelRef.cqrs_save(
  61. {
  62. 'int_field': 1,
  63. 'cqrs_revision': 0,
  64. 'cqrs_updated': now(),
  65. 'char_field': 'text',
  66. 'bool_field': False,
  67. 'date_field': None,
  68. 'datetime_field': now(),
  69. 'float_field': 1.25,
  70. },
  71. )
  72. assert isinstance(instance, models.BasicFieldsModelRef)
  73. instance.refresh_from_db()
  74. assert instance.char_field == 'text'
  75. assert instance.float_field == 1.25
  76. @pytest.mark.django_db
  77. def test_create_simple_excessive_data():
  78. instance = models.BasicFieldsModelRef.cqrs_save(
  79. {
  80. 'int_field': 1,
  81. 'cqrs_revision': 0,
  82. 'cqrs_updated': now(),
  83. 'char_field': 'text',
  84. 'unexpected_field': 'value',
  85. },
  86. )
  87. assert isinstance(instance, models.BasicFieldsModelRef)
  88. @pytest.mark.django_db
  89. def test_create_simple_insufficient_data(caplog):
  90. models.BasicFieldsModelRef.cqrs_save(
  91. {
  92. 'int_field': 1,
  93. 'cqrs_revision': 0,
  94. 'cqrs_updated': now(),
  95. },
  96. )
  97. assert 'Not all required CQRS fields are provided in data (basic).' in caplog.text
  98. @pytest.mark.django_db
  99. def test_create_mapped(caplog):
  100. instance = models.MappedFieldsModelRef.cqrs_save(
  101. {
  102. 'int_field': 1,
  103. 'cqrs_revision': 0,
  104. 'cqrs_updated': now(),
  105. 'char_field': 'text',
  106. },
  107. )
  108. assert isinstance(instance, models.MappedFieldsModelRef)
  109. instance.refresh_from_db()
  110. assert instance.id == 1
  111. assert instance.name == 'text'
  112. @pytest.mark.django_db
  113. def test_create_mapped_bad_mapping(caplog):
  114. models.BadMappingModelRef.cqrs_save(
  115. {
  116. 'int_field': 1,
  117. 'cqrs_revision': 0,
  118. 'cqrs_updated': now(),
  119. 'char_field': 'text',
  120. },
  121. )
  122. assert 'Bad master-replica mapping for invalid_field (basic_3).' in caplog.text
  123. @pytest.mark.django_db
  124. def test_create_db_error(mocker, caplog):
  125. mocker.patch.object(models.BasicFieldsModelRef.objects, 'create', side_effect=db_error)
  126. models.BasicFieldsModelRef.cqrs_save(
  127. {
  128. 'int_field': 1,
  129. 'cqrs_revision': 0,
  130. 'cqrs_updated': now(),
  131. 'char_field': 'text',
  132. },
  133. )
  134. assert 'CQRS create error: pk = 1 (basic).' in caplog.text
  135. @pytest.mark.django_db
  136. def test_update_ok():
  137. models.BasicFieldsModelRef.objects.create(
  138. **{
  139. 'int_field': 1,
  140. 'cqrs_revision': 0,
  141. 'cqrs_updated': now(),
  142. 'char_field': 'text',
  143. }
  144. )
  145. instance = models.BasicFieldsModelRef.cqrs_save(
  146. {
  147. 'int_field': 1,
  148. 'cqrs_revision': 1,
  149. 'cqrs_updated': now(),
  150. 'char_field': 'new_text',
  151. 'float_field': 1.30,
  152. },
  153. )
  154. assert isinstance(instance, models.BasicFieldsModelRef)
  155. instance.refresh_from_db()
  156. assert instance.int_field == 1
  157. assert instance.char_field == 'new_text'
  158. assert instance.float_field == 1.30
  159. @pytest.mark.django_db
  160. def test_update_db_error(mocker, caplog):
  161. models.BasicFieldsModelRef.objects.create(
  162. **{
  163. 'int_field': 1,
  164. 'cqrs_revision': 0,
  165. 'cqrs_updated': now(),
  166. 'char_field': 'text',
  167. }
  168. )
  169. mocker.patch.object(models.BasicFieldsModelRef, 'save', side_effect=db_error)
  170. models.BasicFieldsModelRef.cqrs_save(
  171. {
  172. 'int_field': 1,
  173. 'cqrs_revision': 1,
  174. 'cqrs_updated': now(),
  175. 'char_field': 'text',
  176. },
  177. )
  178. assert 'CQRS update error: pk = 1, cqrs_revision = 1 (basic).' in caplog.text
  179. @pytest.mark.django_db
  180. def test_delete_ok():
  181. dt = now()
  182. models.BasicFieldsModelRef.objects.create(
  183. **{
  184. 'int_field': 1,
  185. 'cqrs_revision': 0,
  186. 'cqrs_updated': dt,
  187. 'char_field': 'text',
  188. }
  189. )
  190. is_deleted = models.BasicFieldsModelRef.cqrs_delete(
  191. {
  192. 'id': 1,
  193. 'cqrs_revision': 0,
  194. 'cqrs_updated': dt,
  195. },
  196. )
  197. assert is_deleted
  198. assert models.BasicFieldsModelRef.objects.count() == 0
  199. @pytest.mark.django_db
  200. def test_delete_non_existing_id():
  201. is_deleted = models.BasicFieldsModelRef.cqrs_delete(
  202. {
  203. 'id': 1,
  204. 'cqrs_revision': 0,
  205. 'cqrs_updated': now(),
  206. },
  207. )
  208. assert is_deleted
  209. assert models.BasicFieldsModelRef.objects.count() == 0
  210. @pytest.mark.django_db
  211. def test_delete_db_error(mocker, caplog):
  212. mocker.patch.object(models.BasicFieldsModelRef.objects, 'filter', side_effect=db_error)
  213. is_deleted = models.BasicFieldsModelRef.cqrs_delete(
  214. {
  215. 'id': 1,
  216. 'cqrs_revision': 0,
  217. 'cqrs_updated': now(),
  218. },
  219. )
  220. assert not is_deleted
  221. assert 'CQRS delete error: pk = 1' in caplog.text
  222. @pytest.mark.django_db
  223. def test_save_bad_master_data_field_type(caplog):
  224. models.BadTypeModelRef.cqrs_save(
  225. {
  226. 'int_field': 1,
  227. 'cqrs_revision': 0,
  228. 'cqrs_updated': now(),
  229. 'datetime_field': now(),
  230. },
  231. )
  232. assert 'CQRS create error: pk = 1 (basic_1).' in caplog.text
  233. @pytest.mark.django_db
  234. def test_save_no_pk_in_master_data(caplog):
  235. models.BasicFieldsModelRef.cqrs_save(
  236. {
  237. 'id': 1,
  238. 'cqrs_revision': 0,
  239. 'cqrs_updated': now(),
  240. 'char_field': 'text',
  241. },
  242. )
  243. assert 'CQRS PK is not provided in data (basic).' in caplog.text
  244. @pytest.mark.django_db
  245. def test_save_no_cqrs_fields_in_master_data(caplog):
  246. models.BasicFieldsModelRef.cqrs_save(
  247. {
  248. 'int_field': 1,
  249. 'cqrs_revision': 0,
  250. 'char_field': 'text',
  251. },
  252. )
  253. assert 'CQRS sync fields are not provided in data (basic).' in caplog.text
  254. @pytest.mark.django_db
  255. def test_delete_no_id_in_master_data(caplog):
  256. is_deleted = models.BasicFieldsModelRef.cqrs_delete(
  257. {
  258. 'cqrs_revision': 0,
  259. 'cqrs_updated': now(),
  260. },
  261. )
  262. assert not is_deleted
  263. assert 'CQRS PK is not provided in data (basic).' in caplog.text
  264. @pytest.mark.django_db
  265. def test_delete_no_cqrs_fields_in_master_data(caplog):
  266. is_deleted = models.BasicFieldsModelRef.cqrs_delete(
  267. {
  268. 'id': 1,
  269. 'cqrs_revision': 0,
  270. },
  271. )
  272. assert not is_deleted
  273. assert 'CQRS sync fields are not provided in data (basic).' in caplog.text
  274. @pytest.mark.django_db(transaction=True)
  275. def test_update_before_create_is_over(caplog):
  276. create_data = {
  277. 'int_field': 1,
  278. 'cqrs_revision': 0,
  279. 'cqrs_updated': now(),
  280. 'char_field': 'text',
  281. }
  282. update_data = {
  283. 'int_field': 1,
  284. 'cqrs_revision': 1,
  285. 'cqrs_updated': now(),
  286. 'char_field': 'new_text',
  287. }
  288. updated_instance = models.BasicFieldsModelRef.cqrs_save(update_data)
  289. created_instance = models.BasicFieldsModelRef.cqrs.create_instance(create_data)
  290. updated_instance.refresh_from_db()
  291. assert updated_instance.cqrs_revision == 1
  292. assert updated_instance.char_field == 'new_text'
  293. assert not created_instance
  294. errors = {
  295. 'sqlite': (
  296. 'UNIQUE constraint failed: dj_replica_basicfieldsmodelref.int_field\n'
  297. 'CQRS create error: pk = 1 (basic).\n'
  298. ),
  299. 'postgres': (
  300. 'duplicate key value violates unique constraint "dj_replica_basicfieldsmodelref_pkey"\n'
  301. 'DETAIL: Key (int_field)=(1) already exists.\n\n'
  302. 'CQRS create error: pk = 1 (basic).\n'
  303. ),
  304. 'mysql': (
  305. '(1062, "Duplicate entry \'1\' for key \'dj_replica_basicfieldsmodelref.PRIMARY\'")\n'
  306. 'CQRS create error: pk = 1 (basic).\n'
  307. ),
  308. }
  309. assert errors[settings.DB_ENGINE] in caplog.text
  310. @pytest.mark.django_db(transaction=True)
  311. def test_wrong_update_order(caplog):
  312. models.BasicFieldsModelRef.objects.create(
  313. **{
  314. 'int_field': 1,
  315. 'cqrs_revision': 0,
  316. 'cqrs_updated': now(),
  317. 'char_field': 'text',
  318. }
  319. )
  320. update_data_1 = {
  321. 'int_field': 1,
  322. 'cqrs_revision': 1,
  323. 'cqrs_updated': now(),
  324. 'char_field': 'new_text_1',
  325. }
  326. update_data_2 = {
  327. 'int_field': 1,
  328. 'cqrs_revision': 2,
  329. 'cqrs_updated': now(),
  330. 'char_field': 'new_text_2',
  331. }
  332. earlier_instance = models.BasicFieldsModelRef.cqrs_save(update_data_2)
  333. later_instance = models.BasicFieldsModelRef.cqrs_save(update_data_1)
  334. earlier_instance.refresh_from_db()
  335. assert earlier_instance.cqrs_revision == 2
  336. assert earlier_instance.char_field == 'new_text_2'
  337. assert later_instance
  338. e = 'Wrong CQRS sync order: pk = 1, cqrs_revision = new 1 / existing 2 (basic).'
  339. assert e in caplog.text
  340. @pytest.mark.django_db(transaction=True)
  341. def test_de_duplication(caplog):
  342. models.BasicFieldsModelRef.objects.create(
  343. **{
  344. 'int_field': 1,
  345. 'cqrs_revision': 0,
  346. 'cqrs_updated': now(),
  347. 'char_field': 'text',
  348. }
  349. )
  350. update_data = {
  351. 'int_field': 1,
  352. 'cqrs_revision': 1,
  353. 'cqrs_updated': now(),
  354. 'char_field': 'new_text',
  355. }
  356. earlier_instance = models.BasicFieldsModelRef.cqrs_save(update_data)
  357. duplicate_instance = models.BasicFieldsModelRef.cqrs_save(update_data)
  358. assert earlier_instance.cqrs_revision == 1
  359. assert earlier_instance.char_field == 'new_text'
  360. assert duplicate_instance.cqrs_revision == 1
  361. assert duplicate_instance.char_field == 'new_text'
  362. assert 'Received duplicate CQRS data: pk = 1, cqrs_revision = 1 (basic).' in caplog.text
  363. @pytest.mark.django_db(transaction=True)
  364. def test_create_before_delete_is_over(caplog):
  365. # This situation may extremely rarely happen, if the IDs are not auto incremented on master
  366. # and are not unique in the infinite timeline.
  367. # This will lead to expected inconsistency.
  368. models.BasicFieldsModelRef.objects.create(
  369. **{
  370. 'int_field': 1,
  371. 'cqrs_revision': 0,
  372. 'cqrs_updated': now(),
  373. 'char_field': 'text',
  374. }
  375. )
  376. delete_data = {
  377. 'id': 1,
  378. 'cqrs_revision': 1,
  379. 'cqrs_updated': now(),
  380. }
  381. new_create_data = {
  382. 'int_field': 1,
  383. 'cqrs_revision': 0,
  384. 'cqrs_updated': now(),
  385. 'char_field': 'other',
  386. }
  387. models.BasicFieldsModelRef.cqrs_save(new_create_data)
  388. is_deleted = models.BasicFieldsModelRef.cqrs_delete(delete_data)
  389. assert 'Received duplicate CQRS data: pk = 1, cqrs_revision = 0 (basic).' in caplog.text
  390. assert 'CQRS potential creation race condition: pk = 1 (basic).' in caplog.text
  391. assert is_deleted
  392. @pytest.mark.django_db
  393. def test_updates_were_lost(caplog):
  394. models.BasicFieldsModelRef.objects.create(
  395. **{
  396. 'int_field': 1,
  397. 'cqrs_revision': 0,
  398. 'cqrs_updated': now(),
  399. 'char_field': 'text',
  400. }
  401. )
  402. models.BasicFieldsModelRef.cqrs_save(
  403. {
  404. 'int_field': 1,
  405. 'cqrs_revision': 5,
  406. 'cqrs_updated': now(),
  407. 'char_field': 'text1',
  408. },
  409. )
  410. assert 'Lost or filtered out 4 CQRS packages: pk = 1, cqrs_revision = 5 (basic)' in caplog.text
  411. @pytest.mark.django_db()
  412. def test_tracked_fields_mapped(mocker):
  413. cqrs_update_mock = mocker.patch.object(models.MappedFieldsModelRef, 'cqrs_update')
  414. first_payload = {
  415. 'int_field': 1,
  416. 'cqrs_revision': 0,
  417. 'cqrs_updated': now(),
  418. 'char_field': 'text',
  419. }
  420. second_payload = {
  421. 'int_field': 1,
  422. 'cqrs_revision': 1,
  423. 'cqrs_updated': now(),
  424. 'char_field': 'new_text',
  425. }
  426. models.MappedFieldsModelRef.cqrs_save(first_payload)
  427. models.MappedFieldsModelRef.cqrs_save(second_payload, previous_data={'char_field': 'text'})
  428. assert cqrs_update_mock.call_count == 1
  429. _, kwargs = cqrs_update_mock.call_args
  430. assert 'previous_data' in kwargs
  431. assert kwargs['previous_data'] == {'name': 'text'}
  432. @pytest.mark.django_db
  433. def test_select_for_update_lock(mocker):
  434. m = mocker.patch.object(
  435. QuerySet,
  436. 'select_for_update',
  437. return_value=models.LockModelRef.objects.all(),
  438. )
  439. instance = models.LockModelRef.cqrs_save(
  440. {
  441. 'id': 1,
  442. 'cqrs_revision': 0,
  443. 'cqrs_updated': now(),
  444. },
  445. )
  446. assert instance.id == 1
  447. m.assert_called_once()
  448. @pytest.mark.django_db
  449. def test_nodb(mocker):
  450. with pytest.raises(NotImplementedError):
  451. models.NoDBModelRef.cqrs_save(None)
  452. with pytest.raises(NotImplementedError):
  453. models.NoDBModelRef.cqrs_delete(None)
  454. @pytest.mark.parametrize(
  455. 'cqrs_max_retries, current_retry, expected_result',
  456. [
  457. (5, 0, True),
  458. (5, 5, False),
  459. (-1, 0, False),
  460. (0, 0, False), # Disabled
  461. (None, 10000, True), # Infinite
  462. ],
  463. )
  464. def test_should_retry_cqrs(settings, cqrs_max_retries, current_retry, expected_result):
  465. settings.CQRS['replica']['CQRS_MAX_RETRIES'] = cqrs_max_retries
  466. result = models.BasicFieldsModelRef.should_retry_cqrs(current_retry)
  467. assert result is expected_result
  468. @pytest.mark.parametrize('retry_delay', (0, 5))
  469. @pytest.mark.parametrize('current_retry', (0, 1))
  470. def test_get_cqrs_retry_delay(settings, retry_delay, current_retry):
  471. settings.CQRS['replica']['CQRS_RETRY_DELAY'] = retry_delay
  472. result = models.BasicFieldsModelRef.get_cqrs_retry_delay(current_retry=current_retry)
  473. assert result is retry_delay
  474. @pytest.mark.django_db(transaction=True)
  475. def test_support_for_meta_create():
  476. meta = TransportStub.consume(
  477. TransportPayload(
  478. SignalType.SAVE,
  479. models.CQRSMetaModel.CQRS_ID,
  480. {
  481. 'id': 1,
  482. 'cqrs_revision': 0,
  483. 'cqrs_updated': now(),
  484. },
  485. 1,
  486. meta={'Hello': 'world'},
  487. ),
  488. )
  489. assert meta == {'Hello': 'world'}
  490. @pytest.mark.django_db(transaction=True)
  491. def test_support_for_meta_update():
  492. models.CQRSMetaModel.objects.create(id=2, cqrs_revision=0, cqrs_updated=now())
  493. cqrs_updated = now()
  494. t = TransportStub.consume(
  495. TransportPayload(
  496. SignalType.SYNC,
  497. models.CQRSMetaModel.CQRS_ID,
  498. {
  499. 'id': 2,
  500. 'cqrs_revision': 1,
  501. 'cqrs_updated': cqrs_updated,
  502. },
  503. 2,
  504. meta=[1, 2, 3],
  505. ),
  506. )
  507. assert t == (
  508. True,
  509. {'id': 2, 'cqrs_revision': 1, 'cqrs_updated': cqrs_updated},
  510. None,
  511. [1, 2, 3],
  512. )
  513. @pytest.mark.django_db(transaction=True)
  514. def test_support_for_meta_delete():
  515. models.CQRSMetaModel.objects.create(id=3, cqrs_revision=0, cqrs_updated=now())
  516. meta = TransportStub.consume(
  517. TransportPayload(
  518. SignalType.DELETE,
  519. models.CQRSMetaModel.CQRS_ID,
  520. {
  521. 'id': 3,
  522. 'cqrs_revision': 1,
  523. 'cqrs_updated': now(),
  524. },
  525. 3,
  526. meta={1: 2, 2: {}},
  527. ),
  528. )
  529. assert meta == {1: 2, 2: {}}
  530. assert not models.CQRSMetaModel.objects.exists()
  531. def test_raw_replica_mixin():
  532. assert RawReplicaMixin.CQRS_ID is None
  533. assert RawReplicaMixin.CQRS_NO_DB_OPERATIONS is True
  534. assert RawReplicaMixin.CQRS_META is False
  535. with pytest.raises(NotImplementedError):
  536. RawReplicaMixin.cqrs_save(None)
  537. with pytest.raises(NotImplementedError):
  538. RawReplicaMixin.cqrs_delete(None)
  539. @pytest.mark.parametrize('cqrs_id', ('document1', 'document2'))
  540. def test_support_for_non_models_save(cqrs_id):
  541. data = {
  542. 'slug': 'test',
  543. 'cqrs_revision': 0,
  544. 'cqrs_updated': now(),
  545. }
  546. meta = {'Hello': 'world'}
  547. _cqrs_id, _data, kwargs = TransportStub.consume(
  548. TransportPayload(SignalType.SAVE, cqrs_id, data, 'test', meta=meta),
  549. )
  550. assert _cqrs_id == cqrs_id
  551. assert _data == data
  552. assert kwargs == {'previous_data': None, 'meta': meta}
  553. @pytest.mark.parametrize('cqrs_id', ('document1', 'document2'))
  554. def test_support_for_non_models_delete(cqrs_id):
  555. data = {
  556. 'slug': 'test',
  557. 'cqrs_revision': 1,
  558. 'cqrs_updated': now(),
  559. }
  560. meta = {'other': 1}
  561. _cqrs_id, _data, kwargs = TransportStub.consume(
  562. TransportPayload(SignalType.DELETE, cqrs_id, data, 'test', meta=meta),
  563. )
  564. assert _cqrs_id == cqrs_id
  565. assert _data == data
  566. assert kwargs == {'meta': meta}