test_kombu.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from importlib import import_module, reload
  4. import pytest
  5. import ujson
  6. from kombu.exceptions import KombuError
  7. from dj_cqrs.constants import (
  8. DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
  9. DEFAULT_MASTER_MESSAGE_TTL,
  10. SignalType,
  11. )
  12. from dj_cqrs.dataclasses import TransportPayload
  13. from dj_cqrs.registries import ReplicaRegistry
  14. from dj_cqrs.transport.kombu import KombuTransport, _KombuConsumer
  15. class PublicKombuTransport(KombuTransport):
  16. @classmethod
  17. def get_common_settings(cls):
  18. return cls._get_common_settings()
  19. @classmethod
  20. def get_consumer_settings(cls):
  21. return cls._get_consumer_settings()
  22. @classmethod
  23. def consume_message(cls, *args):
  24. return cls._consume_message(*args)
  25. @classmethod
  26. def produce_message(cls, *args):
  27. return cls._produce_message(*args)
  28. @classmethod
  29. def create_exchange(cls, *args):
  30. return cls._create_exchange(*args)
  31. def test_default_settings(settings):
  32. settings.CQRS = {
  33. 'transport': 'dj_cqrs.transport.kombu.KombuTransport',
  34. }
  35. s = PublicKombuTransport.get_common_settings()
  36. assert s[0] == 'amqp://localhost'
  37. assert s[1] == 'cqrs'
  38. def test_non_default_settings(settings, caplog):
  39. settings.CQRS = {
  40. 'url': 'redis://localhost:6379',
  41. 'exchange': 'exchange',
  42. }
  43. s = PublicKombuTransport.get_common_settings()
  44. assert s[0] == 'redis://localhost:6379'
  45. assert s[1] == 'exchange'
  46. def test_consumer_default_settings():
  47. s = PublicKombuTransport.get_consumer_settings()
  48. assert s[1] == 10
  49. def test_consumer_non_default_settings(settings):
  50. settings.CQRS = {
  51. 'transport': 'dj_cqrs.transport.kombu.KombuTransport',
  52. 'queue': 'q',
  53. 'consumer_prefetch_count': 2,
  54. }
  55. s = PublicKombuTransport.get_consumer_settings()
  56. assert s[0] == 'q'
  57. assert s[1] == 2
  58. @pytest.fixture
  59. def kombu_transport(settings):
  60. settings.CQRS = {
  61. 'transport': 'dj_cqrs.transport.kombu.KombuTransport',
  62. 'queue': 'replica',
  63. 'master': {
  64. 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
  65. 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
  66. 'correlation_function': None,
  67. },
  68. }
  69. module = reload(import_module('dj_cqrs.transport'))
  70. yield module.current_transport
  71. def kombu_error(*args, **kwargs):
  72. raise KombuError()
  73. def test_produce_connection_error(kombu_transport, mocker, caplog):
  74. mocker.patch.object(KombuTransport, '_get_producer_kombu_objects', side_effect=kombu_error)
  75. kombu_transport.produce(
  76. TransportPayload(
  77. SignalType.SAVE,
  78. 'CQRS_ID',
  79. {'id': 1},
  80. 1,
  81. ),
  82. )
  83. assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
  84. def test_produce_publish_error(kombu_transport, mocker, caplog):
  85. mocker.patch.object(
  86. KombuTransport,
  87. '_get_producer_kombu_objects',
  88. return_value=(mocker.MagicMock(), None),
  89. )
  90. mocker.patch.object(KombuTransport, '_produce_message', side_effect=kombu_error)
  91. kombu_transport.produce(
  92. TransportPayload(
  93. SignalType.SAVE,
  94. 'CQRS_ID',
  95. {'id': 1},
  96. 1,
  97. ),
  98. )
  99. assert "CQRS couldn't be published: pk = 1 (CQRS_ID)." in caplog.text
  100. def test_produce_ok(kombu_transport, mocker, caplog):
  101. caplog.set_level(logging.INFO)
  102. mocker.patch.object(
  103. KombuTransport,
  104. '_get_producer_kombu_objects',
  105. return_value=(mocker.MagicMock(), None),
  106. )
  107. mocker.patch.object(KombuTransport, '_produce_message', return_value=True)
  108. kombu_transport.produce(
  109. TransportPayload(
  110. SignalType.SAVE,
  111. 'CQRS_ID',
  112. {'id': 1},
  113. 1,
  114. ),
  115. )
  116. assert 'CQRS is published: pk = 1 (CQRS_ID)' in caplog.text
  117. def test_produce_message_ok(mocker):
  118. channel = mocker.MagicMock()
  119. payload = TransportPayload(
  120. SignalType.SAVE,
  121. 'cqrs_id',
  122. {},
  123. 'id',
  124. previous_data={'e': 'f'},
  125. )
  126. exchange = PublicKombuTransport.create_exchange('exchange')
  127. PublicKombuTransport.produce_message(channel, exchange, payload)
  128. assert channel.basic_publish.call_count == 1
  129. prepare_message_args = channel.prepare_message.call_args[0]
  130. basic_publish_kwargs = channel.basic_publish.call_args[1]
  131. assert ujson.loads(prepare_message_args[0]) == {
  132. 'signal_type': SignalType.SAVE,
  133. 'cqrs_id': 'cqrs_id',
  134. 'instance_data': {},
  135. 'instance_pk': 'id',
  136. 'previous_data': {'e': 'f'},
  137. 'correlation_id': None,
  138. 'expires': None,
  139. 'retries': 0,
  140. 'meta': None,
  141. }
  142. assert prepare_message_args[2] == 'text/plain'
  143. assert prepare_message_args[5]['delivery_mode'] == 2
  144. assert basic_publish_kwargs['exchange'] == 'exchange'
  145. assert basic_publish_kwargs['mandatory']
  146. assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
  147. def test_produce_sync_message_no_queue(mocker):
  148. channel = mocker.MagicMock()
  149. payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, None)
  150. exchange = PublicKombuTransport.create_exchange('exchange')
  151. PublicKombuTransport.produce_message(channel, exchange, payload)
  152. prepare_message_args = channel.prepare_message.call_args[0]
  153. basic_publish_kwargs = channel.basic_publish.call_args[1]
  154. assert ujson.loads(prepare_message_args[0]) == {
  155. 'signal_type': SignalType.SYNC,
  156. 'cqrs_id': 'cqrs_id',
  157. 'instance_data': {},
  158. 'instance_pk': None,
  159. 'previous_data': None,
  160. 'correlation_id': None,
  161. 'expires': None,
  162. 'retries': 0,
  163. 'meta': None,
  164. }
  165. assert basic_publish_kwargs['routing_key'] == 'cqrs_id'
  166. def test_produce_sync_message_queue(mocker):
  167. channel = mocker.MagicMock()
  168. payload = TransportPayload(SignalType.SYNC, 'cqrs_id', {}, 'id', 'queue')
  169. exchange = PublicKombuTransport.create_exchange('exchange')
  170. PublicKombuTransport.produce_message(channel, exchange, payload)
  171. prepare_message_args = channel.prepare_message.call_args[0]
  172. basic_publish_kwargs = channel.basic_publish.call_args[1]
  173. assert ujson.loads(prepare_message_args[0]) == {
  174. 'signal_type': SignalType.SYNC,
  175. 'cqrs_id': 'cqrs_id',
  176. 'instance_data': {},
  177. 'instance_pk': 'id',
  178. 'previous_data': None,
  179. 'correlation_id': None,
  180. 'expires': None,
  181. 'retries': 0,
  182. 'meta': None,
  183. }
  184. assert basic_publish_kwargs['routing_key'] == 'cqrs.queue.cqrs_id'
  185. def test_consume_message_ack(mocker, caplog):
  186. caplog.set_level(logging.INFO)
  187. consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume')
  188. message_mock = mocker.MagicMock()
  189. PublicKombuTransport.consume_message(
  190. '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
  191. '"instance_pk":1, "previous_data":{}, "correlation_id":"zyx",'
  192. '"expires":"2100-01-01T00:00:00+00:00", "retries":1}',
  193. message_mock,
  194. )
  195. assert consumer_mock.call_count == 1
  196. assert message_mock.ack.call_count == 1
  197. payload = consumer_mock.call_args[0][0]
  198. assert payload.signal_type == 'signal'
  199. assert payload.cqrs_id == 'cqrs_id'
  200. assert payload.instance_data == {}
  201. assert payload.previous_data == {}
  202. assert payload.pk == 1
  203. assert payload.correlation_id == 'zyx'
  204. assert payload.expires is None
  205. assert payload.retries == 0
  206. assert 'CQRS is received: pk = 1 (cqrs_id), correlation_id = zyx.' in caplog.text
  207. assert 'CQRS is applied: pk = 1 (cqrs_id), correlation_id = zyx.' in caplog.text
  208. def test_consume_message_ack_deprecated_structure(mocker, caplog):
  209. caplog.set_level(logging.INFO)
  210. consumer_mock = mocker.patch('dj_cqrs.controller.consumer.consume')
  211. PublicKombuTransport.consume_message(
  212. '{"signal_type":"signal","cqrs_id":"cqrs_id",' '"instance_data":{},"previous_data":null}',
  213. mocker.MagicMock(),
  214. )
  215. assert consumer_mock.call_count == 0
  216. assert "CQRS couldn't proceed, instance_pk isn't found in body" in caplog.text
  217. def test_consume_message_nack(mocker, caplog):
  218. caplog.set_level(logging.INFO)
  219. mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
  220. message_mock = mocker.MagicMock()
  221. PublicKombuTransport.consume_message(
  222. '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{},'
  223. '"instance_pk":1,"previous_data":null}',
  224. message_mock,
  225. )
  226. assert message_mock.reject.call_count == 1
  227. assert 'CQRS is received: pk = 1 (cqrs_id)' in caplog.text
  228. assert 'CQRS is denied: pk = 1 (cqrs_id)' in caplog.text
  229. def test_consume_message_nack_deprecated_structure(mocker, caplog):
  230. caplog.set_level(logging.INFO)
  231. mocker.patch('dj_cqrs.controller.consumer.consume', return_value=None)
  232. PublicKombuTransport.consume_message(
  233. '{"signal_type":"signal","cqrs_id":"cqrs_id","instance_data":{}}',
  234. mocker.MagicMock(),
  235. )
  236. assert 'CQRS is received: pk = 1 (cqrs_id)' not in caplog.text
  237. assert 'CQRS is denied: pk = 1 (cqrs_id)' not in caplog.text
  238. def test_consume_message_json_parsing_error(mocker, caplog):
  239. PublicKombuTransport.consume_message(
  240. '{bad_payload:',
  241. mocker.MagicMock(),
  242. )
  243. assert ': {bad_payload:.' in caplog.text
  244. def test_consume_message_package_structure_error(mocker, caplog):
  245. PublicKombuTransport.consume_message(
  246. 'inv{"pk":"1"}',
  247. mocker.MagicMock(),
  248. )
  249. assert """CQRS couldn't be parsed: inv{"pk":"1"}""" in caplog.text
  250. def test_consumer_queues(mocker):
  251. mocker.patch('dj_cqrs.transport.kombu.Connection')
  252. def callback(body, message):
  253. pass
  254. c = _KombuConsumer('amqp://localhost', 'cqrs', 'cqrs_queue', 2, callback)
  255. assert len(c.queues) == len(ReplicaRegistry.models) * 2
  256. def test_consumer_consumers(mocker):
  257. mocker.patch('dj_cqrs.transport.kombu.Connection')
  258. def callback(body, message):
  259. pass
  260. c = _KombuConsumer('amqp://localhost', 'cqrs', 'cqrs_queue', 2, callback)
  261. consumers = c.get_consumers(mocker.MagicMock, None)
  262. assert len(consumers) == 1
  263. consumer = consumers[0]
  264. assert consumer.queues == c.queues
  265. assert consumer.callbacks[0] == callback
  266. assert consumer.prefetch_count == 2
  267. def test_consumer_run(mocker):
  268. mocker.patch('dj_cqrs.transport.kombu.Connection')
  269. mocked_run = mocker.patch.object(_KombuConsumer, 'run')
  270. PublicKombuTransport.consume()
  271. mocked_run.assert_called_once()