test_flow.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from django.db import transaction
  4. from tests.dj_master import models as master_models
  5. from tests.dj_replica import models as replica_models
  6. @pytest.mark.django_db(transaction=True)
  7. def test_create():
  8. master_model = master_models.BasicFieldsModel.objects.create(
  9. int_field=1,
  10. char_field='text',
  11. )
  12. assert replica_models.BasicFieldsModelRef.objects.count() == 1
  13. replica_model = replica_models.BasicFieldsModelRef.objects.first()
  14. for field_name in ('cqrs_revision', 'cqrs_updated', 'int_field', 'char_field'):
  15. assert getattr(master_model, field_name) == getattr(replica_model, field_name)
  16. @pytest.mark.django_db(transaction=True)
  17. def test_update():
  18. master_model = master_models.BasicFieldsModel.objects.create(
  19. int_field=1,
  20. char_field='text',
  21. )
  22. assert replica_models.BasicFieldsModelRef.objects.count() == 1
  23. master_model.char_field = 'new_text'
  24. master_model.save()
  25. master_model.refresh_from_db()
  26. assert replica_models.BasicFieldsModelRef.objects.count() == 1
  27. replica_model = replica_models.BasicFieldsModelRef.objects.first()
  28. for field_name in ('cqrs_revision', 'cqrs_updated', 'int_field', 'char_field'):
  29. assert getattr(master_model, field_name) == getattr(replica_model, field_name)
  30. @pytest.mark.django_db(transaction=True)
  31. def test_delete():
  32. master_model = master_models.BasicFieldsModel.objects.create(
  33. int_field=1,
  34. char_field='text',
  35. )
  36. assert replica_models.BasicFieldsModelRef.objects.count() == 1
  37. master_model.delete()
  38. assert replica_models.BasicFieldsModelRef.objects.count() == 0
  39. @pytest.mark.django_db(transaction=True)
  40. def test_create_serialized():
  41. with transaction.atomic():
  42. publisher = master_models.Publisher.objects.create(id=1, name='publisher')
  43. author = master_models.Author.objects.create(id=1, name='author', publisher=publisher)
  44. for index in range(1, 3):
  45. master_models.Book.objects.create(id=index, title=str(index), author=author)
  46. replica_publisher = replica_models.Publisher.objects.first()
  47. replica_author = replica_models.AuthorRef.objects.first()
  48. books = list(replica_models.Book.objects.all())
  49. assert replica_publisher.id == 1
  50. assert replica_publisher.name == 'publisher'
  51. assert replica_author.id == 1
  52. assert replica_author.name == 'author'
  53. assert replica_author.publisher == replica_publisher
  54. assert replica_author.cqrs_revision == 0
  55. assert len(books) == 2
  56. assert {1} == {book.author_id for book in books}
  57. assert {1, 2} == {book.id for book in books}
  58. @pytest.mark.django_db(transaction=True)
  59. def test_update_serialized():
  60. author = master_models.Author.objects.create(id=1, name='author')
  61. author.refresh_from_db()
  62. assert replica_models.AuthorRef.objects.count() == 1
  63. with transaction.atomic():
  64. publisher = master_models.Publisher.objects.create(id=1, name='publisher')
  65. author.publisher = publisher
  66. author.save()
  67. replica_publisher = replica_models.Publisher.objects.first()
  68. replica_author = replica_models.AuthorRef.objects.first()
  69. assert replica_publisher.id == 1
  70. assert replica_publisher.name == 'publisher'
  71. assert replica_author.id == 1
  72. assert replica_author.name == 'author'
  73. assert replica_author.publisher == replica_publisher
  74. assert replica_author.cqrs_revision == 1
  75. @pytest.mark.django_db(transaction=True)
  76. def test_delete_serialized():
  77. with transaction.atomic():
  78. author = master_models.Author.objects.create(id=1, name='author')
  79. for index in range(1, 3):
  80. master_models.Book.objects.create(id=index, title=str(index), author=author)
  81. assert replica_models.AuthorRef.objects.count() == 1
  82. assert replica_models.Book.objects.count() == 2
  83. author.refresh_from_db()
  84. author.delete()
  85. assert replica_models.AuthorRef.objects.count() == 0
  86. assert replica_models.Book.objects.count() == 0
  87. @pytest.mark.django_db(transaction=True)
  88. def test_sync_is_deleted():
  89. author = master_models.Author.objects.create(id=1, name='author')
  90. author.refresh_from_db()
  91. assert replica_models.AuthorRef.objects.count() == 1
  92. replica_models.AuthorRef.objects.all().delete()
  93. author.cqrs_sync()
  94. assert replica_models.AuthorRef.objects.count() == 1
  95. @pytest.mark.django_db(transaction=True)
  96. def test_sync_exists(mocker):
  97. author = master_models.Author.objects.create(id=1, name='author')
  98. author.refresh_from_db()
  99. assert replica_models.AuthorRef.objects.count() == 1
  100. # We simulate transport error
  101. mocker.patch('dj_cqrs.controller.producer.produce')
  102. author.name = 'new'
  103. author.save()
  104. mocker.stopall()
  105. author.refresh_from_db()
  106. author.cqrs_sync()
  107. assert replica_models.AuthorRef.objects.count() == 1
  108. replica_author = replica_models.AuthorRef.objects.first()
  109. assert replica_author.cqrs_revision == 1
  110. assert replica_author.name == 'new'
  111. @pytest.mark.django_db(transaction=True)
  112. def test_sync_downgrade(mocker, caplog):
  113. author = master_models.Author.objects.create(id=1, name='author')
  114. author.name = 'new'
  115. author.save()
  116. assert replica_models.AuthorRef.objects.count() == 1
  117. replica_author = replica_models.AuthorRef.objects.first()
  118. assert replica_author.cqrs_revision == 1
  119. assert replica_author.name == 'new'
  120. mocker.patch('dj_cqrs.controller.producer.produce')
  121. author.delete()
  122. author = master_models.Author.objects.create(id=1, name='other')
  123. mocker.stopall()
  124. author.cqrs_sync()
  125. assert replica_models.AuthorRef.objects.count() == 1
  126. replica_author = replica_models.AuthorRef.objects.first()
  127. assert replica_author.cqrs_revision == 0
  128. assert replica_author.name == 'other'
  129. e = 'CQRS revision downgrade on sync: pk = 1, cqrs_revision = new 0 / existing 1 (author).'
  130. assert e in caplog.text