test_bulk_operations.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from integration_tests.tests.utils import (
  4. REPLICA_BASIC_TABLE,
  5. count_replica_rows,
  6. get_replica_all,
  7. get_replica_first,
  8. transport_delay,
  9. )
  10. from tests.dj_master.models import BasicFieldsModel
  11. @pytest.mark.django_db(transaction=True)
  12. def test_flow(replica_cursor, clean_rabbit_transport_connection):
  13. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 0
  14. # Create
  15. BasicFieldsModel.cqrs.bulk_create(
  16. [
  17. BasicFieldsModel(
  18. int_field=index,
  19. char_field='text',
  20. )
  21. for index in range(1, 4)
  22. ],
  23. )
  24. transport_delay()
  25. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 3
  26. assert {'text'} == {
  27. t[0] for t in get_replica_all(replica_cursor, REPLICA_BASIC_TABLE, ('char_field',))
  28. }
  29. # Update 1 and 2
  30. BasicFieldsModel.cqrs.bulk_update(
  31. BasicFieldsModel.objects.filter(int_field__in=(1, 2)),
  32. char_field='new_text',
  33. )
  34. transport_delay()
  35. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 3
  36. assert ['new_text', 'new_text', 'text'] == [
  37. t[0]
  38. for t in get_replica_all(
  39. replica_cursor,
  40. REPLICA_BASIC_TABLE,
  41. ('char_field',),
  42. order_asc_by='int_field',
  43. )
  44. ]
  45. # Delete 1 and 3
  46. BasicFieldsModel.objects.filter(int_field__in=(1, 3)).delete()
  47. transport_delay()
  48. assert count_replica_rows(replica_cursor, REPLICA_BASIC_TABLE) == 1
  49. assert (2, 'new_text', 1) == get_replica_first(
  50. replica_cursor,
  51. REPLICA_BASIC_TABLE,
  52. ('int_field', 'char_field', 'cqrs_revision'),
  53. )