test_sync_to_a_certain_service.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from integration_tests.tests.utils import (
  4. REPLICA_BASIC_TABLE,
  5. count_replica_rows,
  6. get_replica_first,
  7. transport_delay,
  8. )
  9. from tests.dj_master.models import BasicFieldsModel
  10. @pytest.mark.django_db(transaction=True)
  11. def test_flow(replica_cursor, mocker, clean_rabbit_transport_connection):
  12. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
  13. # Create
  14. master_instance = BasicFieldsModel.objects.create(
  15. int_field=1,
  16. char_field='text',
  17. )
  18. assert master_instance.cqrs_revision == 0
  19. transport_delay()
  20. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
  21. replica_tuple = get_replica_first(
  22. replica_cursor,
  23. REPLICA_BASIC_TABLE,
  24. ('int_field', 'char_field', 'cqrs_revision', 'cqrs_updated'),
  25. )
  26. assert (
  27. master_instance.int_field,
  28. master_instance.char_field,
  29. master_instance.cqrs_revision,
  30. master_instance.cqrs_updated,
  31. ) == replica_tuple
  32. # We simulate transport error
  33. mocker.patch('dj_cqrs.controller.producer.produce')
  34. master_instance.char_field = 'new'
  35. master_instance.save()
  36. mocker.stopall()
  37. master_instance.refresh_from_db()
  38. assert master_instance.cqrs_revision == 1
  39. # Sync to other service
  40. master_instance.cqrs_sync(queue='other_replica')
  41. transport_delay()
  42. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
  43. replica_tuple = get_replica_first(
  44. replica_cursor,
  45. REPLICA_BASIC_TABLE,
  46. ('int_field', 'char_field', 'cqrs_revision', 'cqrs_updated'),
  47. )
  48. assert replica_tuple[0] == 1
  49. assert replica_tuple[1] == 'text'
  50. assert replica_tuple[2] == 0
  51. # Sync to replica
  52. master_instance.cqrs_sync(queue='replica')
  53. transport_delay()
  54. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
  55. replica_tuple = get_replica_first(
  56. replica_cursor,
  57. REPLICA_BASIC_TABLE,
  58. ('int_field', 'char_field', 'cqrs_revision', 'cqrs_updated'),
  59. )
  60. assert replica_tuple[0] == 1
  61. assert replica_tuple[1] == 'new'
  62. assert replica_tuple[2] == 1
  63. mocker.patch('dj_cqrs.controller.producer.produce')
  64. master_instance.char_field = 'new2'
  65. master_instance.save()
  66. mocker.stopall()
  67. # Sync to all
  68. master_instance.cqrs_sync()
  69. transport_delay()
  70. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
  71. replica_tuple = get_replica_first(
  72. replica_cursor,
  73. REPLICA_BASIC_TABLE,
  74. ('int_field', 'char_field', 'cqrs_revision', 'cqrs_updated'),
  75. )
  76. assert replica_tuple[0] == 1
  77. assert replica_tuple[1] == 'new2'
  78. assert replica_tuple[2] == 2