test_dead_letter.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import json
  3. import pytest
  4. from integration_tests.tests.utils import transport_delay
  5. from tests.dj_master.models import FailModel
  6. @pytest.mark.django_db(transaction=True)
  7. def test_add_to_dead_letter(settings, replica_cursor, replica_channel):
  8. master_instance = FailModel.cqrs.create()
  9. transport_delay(5)
  10. queue = replica_channel.queue_declare('replica', durable=True, exclusive=False)
  11. assert queue.method.message_count == 0
  12. dead_queue = replica_channel.queue_declare(
  13. 'dead_letter_replica',
  14. durable=True,
  15. exclusive=False,
  16. )
  17. assert dead_queue.method.message_count == 1
  18. consumer_generator = replica_channel.consume(
  19. queue=dead_queue.method.queue,
  20. auto_ack=True,
  21. exclusive=False,
  22. )
  23. *_, body = next(consumer_generator)
  24. dead_letter = json.loads(body)
  25. assert dead_letter['instance_pk'] == master_instance.pk
  26. assert dead_letter['retries'] == 2
  27. @pytest.mark.django_db(transaction=True)
  28. def test_dead_letter_expire(settings, replica_cursor, replica_channel):
  29. FailModel.cqrs.create()
  30. transport_delay(5)
  31. dead_queue = replica_channel.queue_declare(
  32. 'dead_letter_replica',
  33. durable=True,
  34. exclusive=False,
  35. )
  36. assert dead_queue.method.message_count == 1
  37. transport_delay(5)
  38. dead_queue = replica_channel.queue_declare(
  39. 'dead_letter_replica',
  40. durable=True,
  41. exclusive=False,
  42. )
  43. assert dead_queue.method.message_count == 0