conftest.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import psycopg2
  3. import pytest
  4. from pika import BlockingConnection, URLParameters
  5. from dj_cqrs.transport import current_transport
  6. from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
  7. from integration_tests.tests.utils import REPLICA_TABLES
  8. @pytest.fixture
  9. def replica_cursor():
  10. connection = psycopg2.connect(host='postgres', database='replica', user='user', password='pswd')
  11. # That setting is extremely important for table truncations
  12. connection.set_isolation_level(0)
  13. cursor = connection.cursor()
  14. for table in REPLICA_TABLES:
  15. cursor.execute('TRUNCATE TABLE {0};'.format(table))
  16. yield cursor
  17. cursor.close()
  18. connection.close()
  19. @pytest.fixture
  20. def clean_rabbit_transport_connection():
  21. current_transport.clean_connection()
  22. yield
  23. @pytest.fixture
  24. def replica_channel(settings):
  25. if current_transport is not RabbitMQTransport:
  26. pytest.skip('Replica channel is implemented only for RabbitMQTransport.')
  27. connection = BlockingConnection(
  28. parameters=URLParameters(settings.CQRS['url']),
  29. )
  30. rabbit_mq_channel = connection.channel()
  31. rabbit_mq_channel.queue_purge('replica')
  32. rabbit_mq_channel.queue_purge('dead_letter_replica')
  33. yield rabbit_mq_channel
  34. connection.close()