123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import logging
- from datetime import datetime, timedelta, timezone
- from importlib import import_module, reload
- import pytest
- import ujson
- from django.db import DatabaseError
- from pika.adapters.utils.connection_workflow import AMQPConnectorException
- from pika.exceptions import (
- AMQPError,
- ChannelError,
- ReentrancyError,
- StreamLostError,
- )
- from dj_cqrs.constants import (
- DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
- DEFAULT_MASTER_MESSAGE_TTL,
- DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
- DEFAULT_REPLICA_MAX_RETRIES,
- DEFAULT_REPLICA_RETRY_DELAY,
- SignalType,
- )
- from dj_cqrs.dataclasses import TransportPayload
- from dj_cqrs.delay import DelayMessage, DelayQueue
- from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
- from tests.utils import db_error
- class PublicRabbitMQTransport(RabbitMQTransport):
- @classmethod
- def get_common_settings(cls):
- return cls._get_common_settings()
- @classmethod
- def get_consumer_settings(cls):
- return cls._get_consumer_settings()
- @classmethod
- def get_produced_message_routing_key(cls, *args):
- return cls._get_produced_message_routing_key(*args)
- @classmethod
- def consume_message(cls, *args):
- return cls._consume_message(*args)
- @classmethod
- def delay_message(cls, *args):
- return cls._delay_message(*args)
- @classmethod
- def fail_message(cls, *args):
- return cls._fail_message(*args)
- @classmethod
- def process_delay_messages(cls, *args):
- return cls._process_delay_messages(*args)
- @classmethod
- def produce_message(cls, *args):
- return cls._produce_message(*args)
- def test_default_settings():
- s = PublicRabbitMQTransport.get_common_settings()
- assert s[0] == 'localhost'
- assert s[1] == 5672
- assert s[2].username == 'guest' and s[2].password == 'guest'
- assert s[3] == 'cqrs'
- def test_non_default_settings(settings, caplog):
- settings.CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'host': 'rabbit',
- 'port': 8000,
- 'user': 'usr',
- 'password': 'pswd',
- 'exchange': 'exchange',
- }
- s = PublicRabbitMQTransport.get_common_settings()
- assert s[0] == 'rabbit'
- assert s[1] == 8000
- assert s[2].username == 'usr' and s[2].password == 'pswd'
- assert s[3] == 'exchange'
- def test_default_url_settings(settings):
- settings.CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'url': 'amqp://localhost',
- }
- s = PublicRabbitMQTransport.get_common_settings()
- assert s[0] == 'localhost'
- assert s[1] == 5672
- assert s[2].username == 'guest' and s[2].password == 'guest'
- assert s[3] == 'cqrs'
- def test_non_default_url_settings(settings):
- settings.CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'url': 'amqp://usr:pswd@rabbit:8000',
- 'exchange': 'exchange',
- }
- s = PublicRabbitMQTransport.get_common_settings()
- assert s[0] == 'rabbit'
- assert s[1] == 8000
- assert s[2].username == 'usr' and s[2].password == 'pswd'
- assert s[3] == 'exchange'
- def test_invalid_url_settings(settings):
- settings.CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'url': 'rabbit://localhost',
- }
- with pytest.raises(AssertionError) as ei:
- PublicRabbitMQTransport.get_common_settings()
- assert ei.match('Scheme must be "amqp" for RabbitMQTransport.')
- def test_consumer_default_settings(settings):
- settings.CQRS['queue'] = 'replica'
- settings.CQRS['replica'].pop('dead_letter_queue', None)
- s = PublicRabbitMQTransport.get_consumer_settings()
- assert s[1] == 'dead_letter_replica'
- assert s[2] == 1001
- def test_consumer_non_default_settings(settings, caplog):
- settings.CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'queue': 'q',
- 'consumer_prefetch_count': 2,
- 'replica': {
- 'delay_queue_max_size': None, # Infinite
- },
- }
- s = PublicRabbitMQTransport.get_consumer_settings()
- assert s[0] == 'q'
- assert s[1] == 'dead_letter_q'
- assert s[2] == 0 # Infinite
- assert "The 'consumer_prefetch_count' setting is ignored for RabbitMQTransport." in caplog.text
- @pytest.fixture
- def rabbit_transport(settings):
- settings.CQRS = {
- 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
- 'queue': 'replica',
- 'master': {
- 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
- 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
- 'correlation_function': None,
- 'meta_function': None,
- },
- 'replica': {
- 'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES,
- 'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY,
- 'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
- },
- }
- module = reload(import_module('dj_cqrs.transport'))
- yield module.current_transport
- @pytest.mark.parametrize(
- 'exception',
- (AMQPError, ChannelError, ReentrancyError, AMQPConnectorException, AssertionError),
- )
- def test_produce_connection_error(exception, rabbit_transport, mocker, caplog):
- mocker.patch.object(RabbitMQTransport, '_get_producer_rmq_objects', side_effect=exception)
- rabbit_transport.produce(
- TransportPayload(
- SignalType.SAVE,
- 'CQRS_ID',
- {'id': 1},
- 1,
- ),
- )
- assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
- def test_produce_publish_error(rabbit_transport, mocker, caplog):
- mocker.patch.object(
- RabbitMQTransport,
- '_get_producer_rmq_objects',
- return_value=(mocker.MagicMock(), None),
- )
- mocker.patch.object(RabbitMQTransport, '_produce_message', side_effect=AMQPError)
- rabbit_transport.produce(
- TransportPayload(
- SignalType.SAVE,
- 'CQRS_ID',
- {'id': 1},
- 1,
- ),
- )
- assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
- def test_produce_ok(rabbit_transport, mocker, caplog):
- caplog.set_level(logging.INFO)
- mocker.patch.object(
- RabbitMQTransport,
- '_get_producer_rmq_objects',
- return_value=(mocker.MagicMock(), None),
- )
- mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)
- rabbit_transport.produce(
- TransportPayload(
- SignalType.SAVE,
- 'CQRS_ID',
- {'id': 1},
- 1,
- ),
- )
- assert 'CQRS is published: pk = 1 (CQRS_ID)' in caplog.text
- def test_produce_retry_on_error(rabbit_transport, mocker, caplog):
- caplog.set_level(logging.INFO)
- mocker.patch.object(
- RabbitMQTransport,
- '_get_producer_rmq_objects',
- side_effect=[
- AMQPConnectorException,
- (mocker.MagicMock(), None),
- ],
- )
- mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)
- rabbit_transport.produce(
- TransportPayload(
- SignalType.SAVE,
- 'CQRS_ID',
- {'id': 1},
- 1,
- ),
- )
- assert caplog.record_tuples == [
- (
- 'django-cqrs',
- logging.WARNING,
- "CQRS couldn't be published: pk = 1 (CQRS_ID)."
- ' Error: AMQPConnectorException. Reconnect...',
- ),
- (
- 'django-cqrs',
- logging.INFO,
- 'CQRS is published: pk = 1 (CQRS_ID), correlation_id = None.',
- ),
- ]
- def test_produce_retry_on_error_1(rabbit_transport, mocker, caplog):
- mocker.patch.object(
- RabbitMQTransport,
- '_get_producer_rmq_objects',
- side_effect=[
- StreamLostError,
- StreamLostError,
- ],
- )
- mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)
- rabbit_transport.produce(
- TransportPayload(
- SignalType.SAVE,
- 'CQRS_ID',
- {'id': 1},
- 1,
- ),
- )
- assert caplog.record_tuples == [
- (
- 'django-cqrs',
- logging.WARNING,
- "CQRS couldn't be published: pk = 1 (CQRS_ID). Error: StreamLostError. Reconnect...",
- ),
- (
- 'django-cqrs',
- logging.ERROR,
- "CQRS couldn't be published: pk = 1 (CQRS_ID).",
- ),
- ]
- def test_produce_message_ok(mocker):
- expires = datetime(2100, 1, 1, tzinfo=timezone.utc)
- expected_expires = '2100-01-01T00:00:00+00:00'
- channel = mocker.MagicMock()
- payload = TransportPayload(
- SignalType.SAVE,
- cqrs_id='cqrs_id',
- instance_data={},
- instance_pk='id',
- previous_data={'e': 'f'},
- expires=expires,
- retries=2,
- )
- PublicRabbitMQTransport.produce_message(channel, 'exchange', payload)
- assert channel.basic_publish.call_count == 1
- basic_publish_kwargs = channel.basic_publish.call_args[1]
- assert ujson.loads(basic_publish_kwargs['body']) == {
- 'signal_type': SignalType.SAVE,
- 'cqrs_id': 'cqrs_id',
- 'instance_data': {},
- 'instance_pk': 'id',
- 'previous_data': {'e': 'f'},
- 'correlation_id': None,
- 'expires': expected_expires,
- 'retries': 2,
- 'meta': None,
- }
- assert basic_publish_kwargs['exchange'] == 'exchange'
- assert basic_publish_kwargs['mandatory']
- assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
- assert basic_publish_kwargs['properties'].content_type == 'text/plain'
- assert basic_publish_kwargs['properties'].delivery_mode == 2
- def test_produce_sync_message_no_queue(mocker):
- channel = mocker.MagicMock()
- payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, None)
- PublicRabbitMQTransport.produce_message(channel, 'exchange', payload)
- basic_publish_kwargs = channel.basic_publish.call_args[1]
- assert ujson.loads(basic_publish_kwargs['body']) == {
- 'signal_type': SignalType.SYNC,
- 'cqrs_id': 'cqrs_id',
- 'instance_data': {},
- 'instance_pk': None,
- 'previous_data': None,
- 'correlation_id': None,
- 'expires': None,
- 'retries': 0,
- 'meta': None,
- }
- assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
- def test_produce_sync_message_queue(mocker):
- channel = mocker.MagicMock()
- payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, 'id', 'queue')
- PublicRabbitMQTransport.produce_message(channel, 'exchange', payload)
- basic_publish_kwargs = channel.basic_publish.call_args[1]
- assert ujson.loads(basic_publish_kwargs['body']) == {
- 'signal_type': SignalType.SYNC,
- 'cqrs_id': 'cqrs_id',
- 'instance_data': {},
- 'instance_pk': 'id',
- 'previous_data': None,
- 'correlation_id': None,
- 'expires': None,
- 'retries': 0,
- 'meta': None,
- }
- assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id'
- def test_consume_connection_error(rabbit_transport, mocker, caplog):
- mocker.patch.object(
- RabbitMQTransport,
- '_get_consumer_rmq_objects',
- side_effect=AMQPError,
- )
- mocker.patch('time.sleep', side_effect=db_error)
- with pytest.raises(DatabaseError):
- rabbit_transport.consume()
- assert 'AMQP connection error. Reconnecting...' in caplog.text
- def test_consume_ok(rabbit_transport, mocker):
- consumer_generator = (v for v in [(1, None, None)])
- mocker.patch.object(
- RabbitMQTransport,
- '_get_consumer_rmq_objects',
- return_value=(None, None, consumer_generator),
- )
- mocker.patch.object(
- RabbitMQTransport,
- '_consume_message',
- db_error,
- )
- with pytest.raises(DatabaseError):
- rabbit_transport.consume()
- def test_consume_message_ack(mocker, caplog):
- caplog.set_level(logging.INFO)
- consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume')
- PublicRabbitMQTransport.consume_message(
- mocker.MagicMock(),
- mocker.MagicMock(),
- None,
- '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
- '"instance_pk":1, "previous_data":{}, "correlation_id":"abc",'
- '"expires":"2100-01-01T00:00:00+00:00", "retries":1}',
- mocker.MagicMock(),
- )
- assert consumer_mock.call_count == 1
- payload = consumer_mock.call_args[0][0]
- assert payload.signal_type == 'signal'
- assert payload.cqrs_id == 'cqrs_id'
- assert payload.instance_data == {}
- assert payload.previous_data == {}
- assert payload.pk == 1
- assert payload.correlation_id == 'abc'
- assert payload.expires == datetime(2100, 1, 1, tzinfo=timezone.utc)
- assert payload.retries == 1
- assert 'CQRS is received: pk = 1 (cqrs_id), correlation_id = abc.' in caplog.text
- assert 'CQRS is applied: pk = 1 (cqrs_id), correlation_id = abc.' in caplog.text
- def test_consume_message_nack(mocker, caplog):
- caplog.set_level(logging.INFO)
- mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
- PublicRabbitMQTransport.consume_message(
- mocker.MagicMock(),
- mocker.MagicMock(),
- None,
- '{"signal_type":"signal","cqrs_id":"basic","instance_data":{},'
- '"instance_pk":1,"previous_data":null,'
- '"expires":"2100-01-01T00:00:00+00:00", "retries":0}',
- mocker.MagicMock(),
- )
- assert 'CQRS is received: pk = 1 (basic), correlation_id = None.' in caplog.text
- assert 'CQRS is failed: pk = 1 (basic), correlation_id = None, retries = 0.' in caplog.text
- def test_consume_message_nack_deprecated_structure(mocker, caplog):
- caplog.set_level(logging.INFO)
- consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
- PublicRabbitMQTransport.consume_message(
- mocker.MagicMock(),
- mocker.MagicMock(),
- None,
- '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{}}',
- mocker.MagicMock(),
- )
- assert consumer_mock.call_count == 0
- assert "CQRS couldn't proceed, instance_pk isn't found in body" in caplog.text
- def test_consume_message_expired(mocker, caplog):
- caplog.set_level(logging.INFO)
- channel = mocker.MagicMock()
- PublicRabbitMQTransport.consume_message(
- channel,
- mocker.MagicMock(),
- None,
- '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
- '"instance_pk":1,"previous_data":null,'
- '"expires":"2000-01-01T00:00:00+00:00", "retries":0}',
- mocker.MagicMock(),
- )
- assert channel.basic_nack.call_count == 1
- assert 'CQRS is received: pk = 1 (cqrs_id)' in caplog.text
- assert 'CQRS is added to dead letter queue: pk = 1 (cqrs_id)' in caplog.text
- def test_consume_message_json_parsing_error(mocker, caplog):
- PublicRabbitMQTransport.consume_message(
- mocker.MagicMock(),
- mocker.MagicMock(),
- None,
- '{bad_payload:',
- mocker.MagicMock(),
- )
- assert ': {bad_payload:.' in caplog.text
- def test_consume_message_package_structure_error(mocker, caplog):
- PublicRabbitMQTransport.consume_message(
- mocker.MagicMock(),
- mocker.MagicMock(),
- None,
- 'inv{"pk":"1"}',
- mocker.MagicMock(),
- )
- assert """CQRS couldn't be parsed: inv{"pk":"1"}""" in caplog.text
- def test_fail_message_with_retry(mocker):
- payload = TransportPayload(SignalType.SAVE, 'basic', {'id': 1}, 1)
- delay_queue = DelayQueue()
- PublicRabbitMQTransport.fail_message(mocker.MagicMock(), 100, payload, None, delay_queue)
- assert delay_queue.qsize() == 1
- delay_message = delay_queue.get()
- assert delay_message.delivery_tag == 100
- assert delay_message.payload is payload
- def test_message_without_retry_dead_letter(settings, mocker, caplog):
- settings.CQRS['replica']['CQRS_MAX_RETRIES'] = 1
- produce_message = mocker.patch(
- 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport._produce_message',
- )
- channel = mocker.MagicMock()
- payload = TransportPayload(
- SignalType.SAVE,
- 'basic',
- {'id': 1},
- 1,
- correlation_id='abc',
- retries=2,
- )
- delay_queue = DelayQueue()
- PublicRabbitMQTransport.fail_message(channel, 1, payload, None, delay_queue)
- assert delay_queue.qsize() == 0
- assert channel.basic_nack.call_count == 1
- assert produce_message.call_count == 1
- produce_payload = produce_message.call_args[0][2]
- assert produce_payload is payload
- assert getattr(produce_message, 'is_dead_letter', False)
- assert 'CQRS is failed: pk = 1 (basic), correlation_id = abc, retries = 2.' in caplog.text
- assert 'CQRS is added to dead letter queue: pk = 1 (basic), correlation_id = abc' in caplog.text
- def test_fail_message_invalid_model(mocker, caplog):
- nack = mocker.patch(
- 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport._nack',
- )
- payload = TransportPayload(SignalType.SAVE, 'not_existing', {'id': 1}, 1)
- delay_queue = DelayQueue()
- delivery_tag = 101
- PublicRabbitMQTransport.fail_message(
- mocker.MagicMock(),
- delivery_tag,
- payload,
- None,
- delay_queue,
- )
- assert delay_queue.qsize() == 0
- assert nack.call_count == 1
- assert nack.call_args[0][1] == delivery_tag
- assert 'Model for cqrs_id not_existing is not found.' in caplog.text
- def test_get_produced_message_routing_key_dead_letter(settings):
- settings.CQRS['replica']['dead_letter_queue'] = 'dead_letter_replica'
- payload = TransportPayload(SignalType.SYNC, 'CQRS_ID', {}, None)
- payload.is_dead_letter = True
- routing_key = PublicRabbitMQTransport.get_produced_message_routing_key(payload)
- assert routing_key == 'cqrs.dead_letter_replica.CQRS_ID'
- def test_get_produced_message_routing_key_requeue(settings):
- settings.CQRS['queue'] = 'replica'
- payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {}, None)
- payload.is_requeue = True
- routing_key = PublicRabbitMQTransport.get_produced_message_routing_key(payload)
- assert routing_key == 'cqrs.replica.CQRS_ID'
- def test_process_delay_messages(mocker, caplog):
- channel = mocker.MagicMock()
- produce = mocker.patch('dj_cqrs.transport.rabbit_mq.RabbitMQTransport.produce')
- payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1)
- delay_queue = DelayQueue()
- delay_queue.put(
- DelayMessage(delivery_tag=1, payload=payload, eta=datetime.now(tz=timezone.utc)),
- )
- PublicRabbitMQTransport.process_delay_messages(channel, delay_queue)
- assert delay_queue.qsize() == 0
- assert channel.basic_nack.call_count == 1
- assert produce.call_count == 1
- produce_payload = produce.call_args[0][0]
- assert produce_payload is payload
- assert produce_payload.retries == 1
- assert getattr(produce_payload, 'is_requeue', False)
- assert 'CQRS is requeued: pk = 1 (CQRS_ID)' in caplog.text
- def test_delay_message_with_requeue(mocker, caplog):
- channel = mocker.MagicMock()
- requeue_message = mocker.patch(
- 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport._requeue_message',
- )
- delay_messages = []
- for delay in (2, 1, 3):
- payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': delay}, delay)
- eta = datetime.now(tz=timezone.utc) + timedelta(hours=delay)
- delay_message = DelayMessage(delivery_tag=delay, payload=payload, eta=eta)
- delay_messages.append(delay_message)
- delay_queue = DelayQueue(max_size=3)
- for delay_message in delay_messages:
- delay_queue.put(delay_message)
- exceeding_delay = 0
- exceeding_payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': 4}, 4)
- PublicRabbitMQTransport.delay_message(
- channel,
- 4,
- exceeding_payload,
- exceeding_delay,
- delay_queue,
- )
- assert delay_queue.qsize() == 3
- assert delay_queue.get().payload is exceeding_payload
- assert 'CQRS is delayed: pk = 4 (CQRS_ID), correlation_id = None, delay = 0 sec' in caplog.text
- assert requeue_message.call_count == 1
- requeue_payload = requeue_message.call_args[0][2]
- min_eta_delay_message = sorted(delay_messages, key=lambda x: x.eta)[0]
- assert requeue_payload is min_eta_delay_message.payload
|