test_asynchronous_consuming.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from integration_tests.tests.utils import (
  4. REPLICA_BASIC_TABLE,
  5. REPLICA_EVENT_TABLE,
  6. count_replica_rows,
  7. get_replica_all,
  8. transport_delay,
  9. )
  10. from tests.dj_master.models import BasicFieldsModel
  11. @pytest.mark.django_db(transaction=True)
  12. def test_both_consumers_consume(replica_cursor, clean_rabbit_transport_connection):
  13. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
  14. assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 0
  15. BasicFieldsModel.cqrs.bulk_create(
  16. [
  17. BasicFieldsModel(
  18. int_field=index,
  19. char_field='text',
  20. )
  21. for index in range(1, 10)
  22. ],
  23. )
  24. transport_delay(3)
  25. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 9
  26. assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 9
  27. events_data = get_replica_all(replica_cursor, REPLICA_EVENT_TABLE, ('pid',))
  28. assert len({d[0] for d in events_data}) == 2
  29. @pytest.mark.django_db(transaction=True)
  30. def test_de_duplication(replica_cursor, clean_rabbit_transport_connection):
  31. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
  32. assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 0
  33. master_instance = BasicFieldsModel.objects.create(int_field=21, char_field='text')
  34. BasicFieldsModel.call_post_bulk_create([master_instance])
  35. transport_delay(3)
  36. replica_cursor.execute('TRUNCATE TABLE {0};'.format(REPLICA_EVENT_TABLE))
  37. BasicFieldsModel.call_post_bulk_create([master_instance for _ in range(10)])
  38. transport_delay(3)
  39. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
  40. assert count_replica_rows(replica_cursor, REPLICA_EVENT_TABLE) == 10