123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- from datetime import datetime, timezone
- import pytest
- import ujson
- from django.core.management import CommandError, call_command
- from dj_cqrs.constants import SignalType
- from dj_cqrs.management.commands.cqrs_dead_letters import Command, RabbitMQTransport
- COMMAND_NAME = 'cqrs_dead_letters'
- def test_dump(capsys, mocker):
- mocker.patch.object(Command, 'check_transport')
- mocker.patch.object(
- RabbitMQTransport,
- '_get_consumer_settings',
- return_value=('queue', 'dead_letters_queue', 0),
- )
- mocker.patch.object(
- RabbitMQTransport,
- '_get_common_settings',
- return_value=('host', 'port', mocker.MagicMock(), 'exchange'),
- )
- queue = mocker.MagicMock()
- queue.method.message_count = 1
- message_body = ujson.dumps({'cqrs_id': 'test'}).encode('utf-8')
- channel = mocker.MagicMock()
- channel.consume = lambda *args, **kwargs: (v for v in [(None, None, message_body)])
- channel.queue_declare = lambda *args, **kwargs: queue
- mocker.patch.object(
- RabbitMQTransport,
- '_create_connection',
- return_value=(mocker.MagicMock(), channel),
- )
- mocker.patch.object(RabbitMQTransport, '_nack')
- call_command(COMMAND_NAME, 'dump')
- captured = capsys.readouterr()
- assert captured.out.strip() == message_body.decode('utf-8')
- def test_handle_retry(settings, capsys, mocker):
- produce_channel = mocker.MagicMock()
- mocker.patch.object(
- RabbitMQTransport,
- '_get_producer_rmq_objects',
- return_value=(None, produce_channel),
- )
- channel = mocker.MagicMock()
- method_frame = mocker.MagicMock()
- method_frame.delivery_tag = 12
- fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- mocker.patch('django.utils.timezone.now', return_value=fake_now)
- message = {
- 'signal_type': SignalType.SAVE,
- 'cqrs_id': 'test',
- 'instance_data': {'id': 123},
- 'instance_pk': 1,
- 'previous_data': None,
- 'correlation_id': None,
- 'expires': '2020-01-01T00:00:00+00:00',
- 'retries': 30,
- }
- consumer_generator = (v for v in [(method_frame, None, ujson.dumps(message))])
- command = Command()
- command.handle_retry(channel, consumer_generator, dead_letters_count=1)
- assert produce_channel.basic_publish.call_count == 1
- produce_kwargs = produce_channel.basic_publish.call_args[1]
- assert produce_kwargs['routing_key'] == 'cqrs.replica.test'
- produce_message = ujson.loads(produce_kwargs['body'])
- assert produce_message['instance_data'] == message['instance_data']
- assert produce_message['expires'] == '2020-01-02T00:00:00+00:00'
- assert produce_message['retries'] == 0
- captured = capsys.readouterr()
- total_msg, retrying_msg, body_msg = captured.out.strip().split('\n')
- assert total_msg == 'Total dead letters: 1'
- assert retrying_msg == 'Retrying: 1/1'
- assert '2020-01-02T00:00:00+00:00' in body_msg
- assert channel.basic_nack.call_count == 1
- assert channel.basic_nack.call_args[0][0] == 12
- def test_handle_purge(capsys, mocker):
- channel = mocker.MagicMock()
- command = Command()
- command.handle_purge(channel, 'dead_letters_test', dead_letter_count=3)
- assert channel.queue_purge.call_count == 1
- assert channel.queue_purge.call_args[0][0] == 'dead_letters_test'
- captured = capsys.readouterr()
- total_msg, purged_msg = captured.out.strip().split('\n')
- assert total_msg == 'Total dead letters: 3'
- assert purged_msg == 'Purged'
- def test_handle_purge_empty_queue(capsys, mocker):
- channel = mocker.MagicMock()
- command = Command()
- command.handle_purge(channel, 'dead_letters_test', dead_letter_count=0)
- assert channel.queue_purge.call_count == 0
- captured = capsys.readouterr()
- assert captured.out.strip() == 'Total dead letters: 0'
- def test_check_transport(settings):
- command = Command()
- with pytest.raises(CommandError) as e:
- command.check_transport()
- assert 'Dead letters commands available only for RabbitMQTransport.' in str(e)
|