test_dead_letters.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from datetime import datetime, timezone
  3. import pytest
  4. import ujson
  5. from django.core.management import CommandError, call_command
  6. from dj_cqrs.constants import SignalType
  7. from dj_cqrs.management.commands.cqrs_dead_letters import Command, RabbitMQTransport
  8. COMMAND_NAME = 'cqrs_dead_letters'
  9. def test_dump(capsys, mocker):
  10. mocker.patch.object(Command, 'check_transport')
  11. mocker.patch.object(
  12. RabbitMQTransport,
  13. '_get_consumer_settings',
  14. return_value=('queue', 'dead_letters_queue', 0),
  15. )
  16. mocker.patch.object(
  17. RabbitMQTransport,
  18. '_get_common_settings',
  19. return_value=('host', 'port', mocker.MagicMock(), 'exchange'),
  20. )
  21. queue = mocker.MagicMock()
  22. queue.method.message_count = 1
  23. message_body = ujson.dumps({'cqrs_id': 'test'}).encode('utf-8')
  24. channel = mocker.MagicMock()
  25. channel.consume = lambda *args, **kwargs: (v for v in [(None, None, message_body)])
  26. channel.queue_declare = lambda *args, **kwargs: queue
  27. mocker.patch.object(
  28. RabbitMQTransport,
  29. '_create_connection',
  30. return_value=(mocker.MagicMock(), channel),
  31. )
  32. mocker.patch.object(RabbitMQTransport, '_nack')
  33. call_command(COMMAND_NAME, 'dump')
  34. captured = capsys.readouterr()
  35. assert captured.out.strip() == message_body.decode('utf-8')
  36. def test_handle_retry(settings, capsys, mocker):
  37. produce_channel = mocker.MagicMock()
  38. mocker.patch.object(
  39. RabbitMQTransport,
  40. '_get_producer_rmq_objects',
  41. return_value=(None, produce_channel),
  42. )
  43. channel = mocker.MagicMock()
  44. method_frame = mocker.MagicMock()
  45. method_frame.delivery_tag = 12
  46. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  47. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  48. message = {
  49. 'signal_type': SignalType.SAVE,
  50. 'cqrs_id': 'test',
  51. 'instance_data': {'id': 123},
  52. 'instance_pk': 1,
  53. 'previous_data': None,
  54. 'correlation_id': None,
  55. 'expires': '2020-01-01T00:00:00+00:00',
  56. 'retries': 30,
  57. }
  58. consumer_generator = (v for v in [(method_frame, None, ujson.dumps(message))])
  59. command = Command()
  60. command.handle_retry(channel, consumer_generator, dead_letters_count=1)
  61. assert produce_channel.basic_publish.call_count == 1
  62. produce_kwargs = produce_channel.basic_publish.call_args[1]
  63. assert produce_kwargs['routing_key'] == 'cqrs.replica.test'
  64. produce_message = ujson.loads(produce_kwargs['body'])
  65. assert produce_message['instance_data'] == message['instance_data']
  66. assert produce_message['expires'] == '2020-01-02T00:00:00+00:00'
  67. assert produce_message['retries'] == 0
  68. captured = capsys.readouterr()
  69. total_msg, retrying_msg, body_msg = captured.out.strip().split('\n')
  70. assert total_msg == 'Total dead letters: 1'
  71. assert retrying_msg == 'Retrying: 1/1'
  72. assert '2020-01-02T00:00:00+00:00' in body_msg
  73. assert channel.basic_nack.call_count == 1
  74. assert channel.basic_nack.call_args[0][0] == 12
  75. def test_handle_purge(capsys, mocker):
  76. channel = mocker.MagicMock()
  77. command = Command()
  78. command.handle_purge(channel, 'dead_letters_test', dead_letter_count=3)
  79. assert channel.queue_purge.call_count == 1
  80. assert channel.queue_purge.call_args[0][0] == 'dead_letters_test'
  81. captured = capsys.readouterr()
  82. total_msg, purged_msg = captured.out.strip().split('\n')
  83. assert total_msg == 'Total dead letters: 3'
  84. assert purged_msg == 'Purged'
  85. def test_handle_purge_empty_queue(capsys, mocker):
  86. channel = mocker.MagicMock()
  87. command = Command()
  88. command.handle_purge(channel, 'dead_letters_test', dead_letter_count=0)
  89. assert channel.queue_purge.call_count == 0
  90. captured = capsys.readouterr()
  91. assert captured.out.strip() == 'Total dead letters: 0'
  92. def test_check_transport(settings):
  93. command = Command()
  94. with pytest.raises(CommandError) as e:
  95. command.check_transport()
  96. assert 'Dead letters commands available only for RabbitMQTransport.' in str(e)