test_rabbit_mq.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from datetime import datetime, timedelta, timezone
  4. from importlib import import_module, reload
  5. import pytest
  6. import ujson
  7. from django.db import DatabaseError
  8. from pika.adapters.utils.connection_workflow import AMQPConnectorException
  9. from pika.exceptions import (
  10. AMQPError,
  11. ChannelError,
  12. ReentrancyError,
  13. StreamLostError,
  14. )
  15. from dj_cqrs.constants import (
  16. DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
  17. DEFAULT_MASTER_MESSAGE_TTL,
  18. DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
  19. DEFAULT_REPLICA_MAX_RETRIES,
  20. DEFAULT_REPLICA_RETRY_DELAY,
  21. SignalType,
  22. )
  23. from dj_cqrs.dataclasses import TransportPayload
  24. from dj_cqrs.delay import DelayMessage, DelayQueue
  25. from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
  26. from tests.utils import db_error
  27. class PublicRabbitMQTransport(RabbitMQTransport):
  28. @classmethod
  29. def get_common_settings(cls):
  30. return cls._get_common_settings()
  31. @classmethod
  32. def get_consumer_settings(cls):
  33. return cls._get_consumer_settings()
  34. @classmethod
  35. def get_produced_message_routing_key(cls, *args):
  36. return cls._get_produced_message_routing_key(*args)
  37. @classmethod
  38. def consume_message(cls, *args):
  39. return cls._consume_message(*args)
  40. @classmethod
  41. def delay_message(cls, *args):
  42. return cls._delay_message(*args)
  43. @classmethod
  44. def fail_message(cls, *args):
  45. return cls._fail_message(*args)
  46. @classmethod
  47. def process_delay_messages(cls, *args):
  48. return cls._process_delay_messages(*args)
  49. @classmethod
  50. def produce_message(cls, *args):
  51. return cls._produce_message(*args)
  52. def test_default_settings():
  53. s = PublicRabbitMQTransport.get_common_settings()
  54. assert s[0] == 'localhost'
  55. assert s[1] == 5672
  56. assert s[2].username == 'guest' and s[2].password == 'guest'
  57. assert s[3] == 'cqrs'
  58. def test_non_default_settings(settings, caplog):
  59. settings.CQRS = {
  60. 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
  61. 'host': 'rabbit',
  62. 'port': 8000,
  63. 'user': 'usr',
  64. 'password': 'pswd',
  65. 'exchange': 'exchange',
  66. }
  67. s = PublicRabbitMQTransport.get_common_settings()
  68. assert s[0] == 'rabbit'
  69. assert s[1] == 8000
  70. assert s[2].username == 'usr' and s[2].password == 'pswd'
  71. assert s[3] == 'exchange'
  72. def test_default_url_settings(settings):
  73. settings.CQRS = {
  74. 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
  75. 'url': 'amqp://localhost',
  76. }
  77. s = PublicRabbitMQTransport.get_common_settings()
  78. assert s[0] == 'localhost'
  79. assert s[1] == 5672
  80. assert s[2].username == 'guest' and s[2].password == 'guest'
  81. assert s[3] == 'cqrs'
  82. def test_non_default_url_settings(settings):
  83. settings.CQRS = {
  84. 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
  85. 'url': 'amqp://usr:pswd@rabbit:8000',
  86. 'exchange': 'exchange',
  87. }
  88. s = PublicRabbitMQTransport.get_common_settings()
  89. assert s[0] == 'rabbit'
  90. assert s[1] == 8000
  91. assert s[2].username == 'usr' and s[2].password == 'pswd'
  92. assert s[3] == 'exchange'
  93. def test_invalid_url_settings(settings):
  94. settings.CQRS = {
  95. 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
  96. 'url': 'rabbit://localhost',
  97. }
  98. with pytest.raises(AssertionError) as ei:
  99. PublicRabbitMQTransport.get_common_settings()
  100. assert ei.match('Scheme must be "amqp" for RabbitMQTransport.')
  101. def test_consumer_default_settings(settings):
  102. settings.CQRS['queue'] = 'replica'
  103. settings.CQRS['replica'].pop('dead_letter_queue', None)
  104. s = PublicRabbitMQTransport.get_consumer_settings()
  105. assert s[1] == 'dead_letter_replica'
  106. assert s[2] == 1001
  107. def test_consumer_non_default_settings(settings, caplog):
  108. settings.CQRS = {
  109. 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
  110. 'queue': 'q',
  111. 'consumer_prefetch_count': 2,
  112. 'replica': {
  113. 'delay_queue_max_size': None, # Infinite
  114. },
  115. }
  116. s = PublicRabbitMQTransport.get_consumer_settings()
  117. assert s[0] == 'q'
  118. assert s[1] == 'dead_letter_q'
  119. assert s[2] == 0 # Infinite
  120. assert "The 'consumer_prefetch_count' setting is ignored for RabbitMQTransport." in caplog.text
  121. @pytest.fixture
  122. def rabbit_transport(settings):
  123. settings.CQRS = {
  124. 'transport': 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport',
  125. 'queue': 'replica',
  126. 'master': {
  127. 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
  128. 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
  129. 'correlation_function': None,
  130. 'meta_function': None,
  131. },
  132. 'replica': {
  133. 'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES,
  134. 'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY,
  135. 'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
  136. },
  137. }
  138. module = reload(import_module('dj_cqrs.transport'))
  139. yield module.current_transport
  140. @pytest.mark.parametrize(
  141. 'exception',
  142. (AMQPError, ChannelError, ReentrancyError, AMQPConnectorException, AssertionError),
  143. )
  144. def test_produce_connection_error(exception, rabbit_transport, mocker, caplog):
  145. mocker.patch.object(RabbitMQTransport, '_get_producer_rmq_objects', side_effect=exception)
  146. rabbit_transport.produce(
  147. TransportPayload(
  148. SignalType.SAVE,
  149. 'CQRS_ID',
  150. {'id': 1},
  151. 1,
  152. ),
  153. )
  154. assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
  155. def test_produce_publish_error(rabbit_transport, mocker, caplog):
  156. mocker.patch.object(
  157. RabbitMQTransport,
  158. '_get_producer_rmq_objects',
  159. return_value=(mocker.MagicMock(), None),
  160. )
  161. mocker.patch.object(RabbitMQTransport, '_produce_message', side_effect=AMQPError)
  162. rabbit_transport.produce(
  163. TransportPayload(
  164. SignalType.SAVE,
  165. 'CQRS_ID',
  166. {'id': 1},
  167. 1,
  168. ),
  169. )
  170. assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
  171. def test_produce_ok(rabbit_transport, mocker, caplog):
  172. caplog.set_level(logging.INFO)
  173. mocker.patch.object(
  174. RabbitMQTransport,
  175. '_get_producer_rmq_objects',
  176. return_value=(mocker.MagicMock(), None),
  177. )
  178. mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)
  179. rabbit_transport.produce(
  180. TransportPayload(
  181. SignalType.SAVE,
  182. 'CQRS_ID',
  183. {'id': 1},
  184. 1,
  185. ),
  186. )
  187. assert 'CQRS is published: pk = 1 (CQRS_ID)' in caplog.text
  188. def test_produce_retry_on_error(rabbit_transport, mocker, caplog):
  189. caplog.set_level(logging.INFO)
  190. mocker.patch.object(
  191. RabbitMQTransport,
  192. '_get_producer_rmq_objects',
  193. side_effect=[
  194. AMQPConnectorException,
  195. (mocker.MagicMock(), None),
  196. ],
  197. )
  198. mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)
  199. rabbit_transport.produce(
  200. TransportPayload(
  201. SignalType.SAVE,
  202. 'CQRS_ID',
  203. {'id': 1},
  204. 1,
  205. ),
  206. )
  207. assert caplog.record_tuples == [
  208. (
  209. 'django-cqrs',
  210. logging.WARNING,
  211. "CQRS couldn't be published: pk = 1 (CQRS_ID)."
  212. ' Error: AMQPConnectorException. Reconnect...',
  213. ),
  214. (
  215. 'django-cqrs',
  216. logging.INFO,
  217. 'CQRS is published: pk = 1 (CQRS_ID), correlation_id = None.',
  218. ),
  219. ]
  220. def test_produce_retry_on_error_1(rabbit_transport, mocker, caplog):
  221. mocker.patch.object(
  222. RabbitMQTransport,
  223. '_get_producer_rmq_objects',
  224. side_effect=[
  225. StreamLostError,
  226. StreamLostError,
  227. ],
  228. )
  229. mocker.patch.object(RabbitMQTransport, '_produce_message', return_value=True)
  230. rabbit_transport.produce(
  231. TransportPayload(
  232. SignalType.SAVE,
  233. 'CQRS_ID',
  234. {'id': 1},
  235. 1,
  236. ),
  237. )
  238. assert caplog.record_tuples == [
  239. (
  240. 'django-cqrs',
  241. logging.WARNING,
  242. "CQRS couldn't be published: pk = 1 (CQRS_ID). Error: StreamLostError. Reconnect...",
  243. ),
  244. (
  245. 'django-cqrs',
  246. logging.ERROR,
  247. "CQRS couldn't be published: pk = 1 (CQRS_ID).",
  248. ),
  249. ]
  250. def test_produce_message_ok(mocker):
  251. expires = datetime(2100, 1, 1, tzinfo=timezone.utc)
  252. expected_expires = '2100-01-01T00:00:00+00:00'
  253. channel = mocker.MagicMock()
  254. payload = TransportPayload(
  255. SignalType.SAVE,
  256. cqrs_id='cqrs_id',
  257. instance_data={},
  258. instance_pk='id',
  259. previous_data={'e': 'f'},
  260. expires=expires,
  261. retries=2,
  262. )
  263. PublicRabbitMQTransport.produce_message(channel, 'exchange', payload)
  264. assert channel.basic_publish.call_count == 1
  265. basic_publish_kwargs = channel.basic_publish.call_args[1]
  266. assert ujson.loads(basic_publish_kwargs['body']) == {
  267. 'signal_type': SignalType.SAVE,
  268. 'cqrs_id': 'cqrs_id',
  269. 'instance_data': {},
  270. 'instance_pk': 'id',
  271. 'previous_data': {'e': 'f'},
  272. 'correlation_id': None,
  273. 'expires': expected_expires,
  274. 'retries': 2,
  275. 'meta': None,
  276. }
  277. assert basic_publish_kwargs['exchange'] == 'exchange'
  278. assert basic_publish_kwargs['mandatory']
  279. assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
  280. assert basic_publish_kwargs['properties'].content_type == 'text/plain'
  281. assert basic_publish_kwargs['properties'].delivery_mode == 2
  282. def test_produce_sync_message_no_queue(mocker):
  283. channel = mocker.MagicMock()
  284. payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, None)
  285. PublicRabbitMQTransport.produce_message(channel, 'exchange', payload)
  286. basic_publish_kwargs = channel.basic_publish.call_args[1]
  287. assert ujson.loads(basic_publish_kwargs['body']) == {
  288. 'signal_type': SignalType.SYNC,
  289. 'cqrs_id': 'cqrs_id',
  290. 'instance_data': {},
  291. 'instance_pk': None,
  292. 'previous_data': None,
  293. 'correlation_id': None,
  294. 'expires': None,
  295. 'retries': 0,
  296. 'meta': None,
  297. }
  298. assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
  299. def test_produce_sync_message_queue(mocker):
  300. channel = mocker.MagicMock()
  301. payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, 'id', 'queue')
  302. PublicRabbitMQTransport.produce_message(channel, 'exchange', payload)
  303. basic_publish_kwargs = channel.basic_publish.call_args[1]
  304. assert ujson.loads(basic_publish_kwargs['body']) == {
  305. 'signal_type': SignalType.SYNC,
  306. 'cqrs_id': 'cqrs_id',
  307. 'instance_data': {},
  308. 'instance_pk': 'id',
  309. 'previous_data': None,
  310. 'correlation_id': None,
  311. 'expires': None,
  312. 'retries': 0,
  313. 'meta': None,
  314. }
  315. assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id'
  316. def test_consume_connection_error(rabbit_transport, mocker, caplog):
  317. mocker.patch.object(
  318. RabbitMQTransport,
  319. '_get_consumer_rmq_objects',
  320. side_effect=AMQPError,
  321. )
  322. mocker.patch('time.sleep', side_effect=db_error)
  323. with pytest.raises(DatabaseError):
  324. rabbit_transport.consume()
  325. assert 'AMQP connection error. Reconnecting...' in caplog.text
  326. def test_consume_ok(rabbit_transport, mocker):
  327. consumer_generator = (v for v in [(1, None, None)])
  328. mocker.patch.object(
  329. RabbitMQTransport,
  330. '_get_consumer_rmq_objects',
  331. return_value=(None, None, consumer_generator),
  332. )
  333. mocker.patch.object(
  334. RabbitMQTransport,
  335. '_consume_message',
  336. db_error,
  337. )
  338. with pytest.raises(DatabaseError):
  339. rabbit_transport.consume()
  340. def test_consume_message_ack(mocker, caplog):
  341. caplog.set_level(logging.INFO)
  342. consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume')
  343. PublicRabbitMQTransport.consume_message(
  344. mocker.MagicMock(),
  345. mocker.MagicMock(),
  346. None,
  347. '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
  348. '"instance_pk":1, "previous_data":{}, "correlation_id":"abc",'
  349. '"expires":"2100-01-01T00:00:00+00:00", "retries":1}',
  350. mocker.MagicMock(),
  351. )
  352. assert consumer_mock.call_count == 1
  353. payload = consumer_mock.call_args[0][0]
  354. assert payload.signal_type == 'signal'
  355. assert payload.cqrs_id == 'cqrs_id'
  356. assert payload.instance_data == {}
  357. assert payload.previous_data == {}
  358. assert payload.pk == 1
  359. assert payload.correlation_id == 'abc'
  360. assert payload.expires == datetime(2100, 1, 1, tzinfo=timezone.utc)
  361. assert payload.retries == 1
  362. assert 'CQRS is received: pk = 1 (cqrs_id), correlation_id = abc.' in caplog.text
  363. assert 'CQRS is applied: pk = 1 (cqrs_id), correlation_id = abc.' in caplog.text
  364. def test_consume_message_nack(mocker, caplog):
  365. caplog.set_level(logging.INFO)
  366. mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
  367. PublicRabbitMQTransport.consume_message(
  368. mocker.MagicMock(),
  369. mocker.MagicMock(),
  370. None,
  371. '{"signal_type":"signal","cqrs_id":"basic","instance_data":{},'
  372. '"instance_pk":1,"previous_data":null,'
  373. '"expires":"2100-01-01T00:00:00+00:00", "retries":0}',
  374. mocker.MagicMock(),
  375. )
  376. assert 'CQRS is received: pk = 1 (basic), correlation_id = None.' in caplog.text
  377. assert 'CQRS is failed: pk = 1 (basic), correlation_id = None, retries = 0.' in caplog.text
  378. def test_consume_message_nack_deprecated_structure(mocker, caplog):
  379. caplog.set_level(logging.INFO)
  380. consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
  381. PublicRabbitMQTransport.consume_message(
  382. mocker.MagicMock(),
  383. mocker.MagicMock(),
  384. None,
  385. '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{}}',
  386. mocker.MagicMock(),
  387. )
  388. assert consumer_mock.call_count == 0
  389. assert "CQRS couldn't proceed, instance_pk isn't found in body" in caplog.text
  390. def test_consume_message_expired(mocker, caplog):
  391. caplog.set_level(logging.INFO)
  392. channel = mocker.MagicMock()
  393. PublicRabbitMQTransport.consume_message(
  394. channel,
  395. mocker.MagicMock(),
  396. None,
  397. '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
  398. '"instance_pk":1,"previous_data":null,'
  399. '"expires":"2000-01-01T00:00:00+00:00", "retries":0}',
  400. mocker.MagicMock(),
  401. )
  402. assert channel.basic_nack.call_count == 1
  403. assert 'CQRS is received: pk = 1 (cqrs_id)' in caplog.text
  404. assert 'CQRS is added to dead letter queue: pk = 1 (cqrs_id)' in caplog.text
  405. def test_consume_message_json_parsing_error(mocker, caplog):
  406. PublicRabbitMQTransport.consume_message(
  407. mocker.MagicMock(),
  408. mocker.MagicMock(),
  409. None,
  410. '{bad_payload:',
  411. mocker.MagicMock(),
  412. )
  413. assert ': {bad_payload:.' in caplog.text
  414. def test_consume_message_package_structure_error(mocker, caplog):
  415. PublicRabbitMQTransport.consume_message(
  416. mocker.MagicMock(),
  417. mocker.MagicMock(),
  418. None,
  419. 'inv{"pk":"1"}',
  420. mocker.MagicMock(),
  421. )
  422. assert """CQRS couldn't be parsed: inv{"pk":"1"}""" in caplog.text
  423. def test_fail_message_with_retry(mocker):
  424. payload = TransportPayload(SignalType.SAVE, 'basic', {'id': 1}, 1)
  425. delay_queue = DelayQueue()
  426. PublicRabbitMQTransport.fail_message(mocker.MagicMock(), 100, payload, None, delay_queue)
  427. assert delay_queue.qsize() == 1
  428. delay_message = delay_queue.get()
  429. assert delay_message.delivery_tag == 100
  430. assert delay_message.payload is payload
  431. def test_message_without_retry_dead_letter(settings, mocker, caplog):
  432. settings.CQRS['replica']['CQRS_MAX_RETRIES'] = 1
  433. produce_message = mocker.patch(
  434. 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport._produce_message',
  435. )
  436. channel = mocker.MagicMock()
  437. payload = TransportPayload(
  438. SignalType.SAVE,
  439. 'basic',
  440. {'id': 1},
  441. 1,
  442. correlation_id='abc',
  443. retries=2,
  444. )
  445. delay_queue = DelayQueue()
  446. PublicRabbitMQTransport.fail_message(channel, 1, payload, None, delay_queue)
  447. assert delay_queue.qsize() == 0
  448. assert channel.basic_nack.call_count == 1
  449. assert produce_message.call_count == 1
  450. produce_payload = produce_message.call_args[0][2]
  451. assert produce_payload is payload
  452. assert getattr(produce_message, 'is_dead_letter', False)
  453. assert 'CQRS is failed: pk = 1 (basic), correlation_id = abc, retries = 2.' in caplog.text
  454. assert 'CQRS is added to dead letter queue: pk = 1 (basic), correlation_id = abc' in caplog.text
  455. def test_fail_message_invalid_model(mocker, caplog):
  456. nack = mocker.patch(
  457. 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport._nack',
  458. )
  459. payload = TransportPayload(SignalType.SAVE, 'not_existing', {'id': 1}, 1)
  460. delay_queue = DelayQueue()
  461. delivery_tag = 101
  462. PublicRabbitMQTransport.fail_message(
  463. mocker.MagicMock(),
  464. delivery_tag,
  465. payload,
  466. None,
  467. delay_queue,
  468. )
  469. assert delay_queue.qsize() == 0
  470. assert nack.call_count == 1
  471. assert nack.call_args[0][1] == delivery_tag
  472. assert 'Model for cqrs_id not_existing is not found.' in caplog.text
  473. def test_get_produced_message_routing_key_dead_letter(settings):
  474. settings.CQRS['replica']['dead_letter_queue'] = 'dead_letter_replica'
  475. payload = TransportPayload(SignalType.SYNC, 'CQRS_ID', {}, None)
  476. payload.is_dead_letter = True
  477. routing_key = PublicRabbitMQTransport.get_produced_message_routing_key(payload)
  478. assert routing_key == 'cqrs.dead_letter_replica.CQRS_ID'
  479. def test_get_produced_message_routing_key_requeue(settings):
  480. settings.CQRS['queue'] = 'replica'
  481. payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {}, None)
  482. payload.is_requeue = True
  483. routing_key = PublicRabbitMQTransport.get_produced_message_routing_key(payload)
  484. assert routing_key == 'cqrs.replica.CQRS_ID'
  485. def test_process_delay_messages(mocker, caplog):
  486. channel = mocker.MagicMock()
  487. produce = mocker.patch('dj_cqrs.transport.rabbit_mq.RabbitMQTransport.produce')
  488. payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': 1}, 1)
  489. delay_queue = DelayQueue()
  490. delay_queue.put(
  491. DelayMessage(delivery_tag=1, payload=payload, eta=datetime.now(tz=timezone.utc)),
  492. )
  493. PublicRabbitMQTransport.process_delay_messages(channel, delay_queue)
  494. assert delay_queue.qsize() == 0
  495. assert channel.basic_nack.call_count == 1
  496. assert produce.call_count == 1
  497. produce_payload = produce.call_args[0][0]
  498. assert produce_payload is payload
  499. assert produce_payload.retries == 1
  500. assert getattr(produce_payload, 'is_requeue', False)
  501. assert 'CQRS is requeued: pk = 1 (CQRS_ID)' in caplog.text
  502. def test_delay_message_with_requeue(mocker, caplog):
  503. channel = mocker.MagicMock()
  504. requeue_message = mocker.patch(
  505. 'dj_cqrs.transport.rabbit_mq.RabbitMQTransport._requeue_message',
  506. )
  507. delay_messages = []
  508. for delay in (2, 1, 3):
  509. payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': delay}, delay)
  510. eta = datetime.now(tz=timezone.utc) + timedelta(hours=delay)
  511. delay_message = DelayMessage(delivery_tag=delay, payload=payload, eta=eta)
  512. delay_messages.append(delay_message)
  513. delay_queue = DelayQueue(max_size=3)
  514. for delay_message in delay_messages:
  515. delay_queue.put(delay_message)
  516. exceeding_delay = 0
  517. exceeding_payload = TransportPayload(SignalType.SAVE, 'CQRS_ID', {'id': 4}, 4)
  518. PublicRabbitMQTransport.delay_message(
  519. channel,
  520. 4,
  521. exceeding_payload,
  522. exceeding_delay,
  523. delay_queue,
  524. )
  525. assert delay_queue.qsize() == 3
  526. assert delay_queue.get().payload is exceeding_payload
  527. assert 'CQRS is delayed: pk = 4 (CQRS_ID), correlation_id = None, delay = 0 sec' in caplog.text
  528. assert requeue_message.call_count == 1
  529. requeue_payload = requeue_message.call_args[0][2]
  530. min_eta_delay_message = sorted(delay_messages, key=lambda x: x.eta)[0]
  531. assert requeue_payload is min_eta_delay_message.payload