test_consume.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import threading
  3. from importlib import import_module, reload
  4. from pathlib import Path
  5. import pytest
  6. from django.core.management import CommandError, call_command
  7. from dj_cqrs.management.commands.cqrs_consume import WorkersManager, consume
  8. COMMAND_NAME = 'cqrs_consume'
  9. @pytest.fixture
  10. def reload_transport():
  11. reload(import_module('dj_cqrs.transport'))
  12. def test_no_arguments(mocker, reload_transport):
  13. mocked_worker = mocker.patch('dj_cqrs.management.commands.cqrs_consume.WorkersManager')
  14. call_command(COMMAND_NAME)
  15. mocked_worker.assert_called_once_with(
  16. consume_kwargs={},
  17. workers=1,
  18. reload=False,
  19. ignore_paths=None,
  20. sigint_timeout=5,
  21. sigkill_timeout=1,
  22. )
  23. def test_with_arguments(mocker, reload_transport):
  24. mocked_worker = mocker.patch('dj_cqrs.management.commands.cqrs_consume.WorkersManager')
  25. mocker.patch(
  26. 'dj_cqrs.management.commands.cqrs_consume.Path',
  27. side_effect=[
  28. mocker.MagicMock(resolve=mocker.MagicMock(return_value='/path1')),
  29. mocker.MagicMock(resolve=mocker.MagicMock(return_value='/path2')),
  30. ],
  31. )
  32. call_command(COMMAND_NAME, '--workers=2', '-r', '-cid=author', '--ignore-paths=path1,path2')
  33. mocked_worker.assert_called_once_with(
  34. consume_kwargs={'cqrs_ids': {'author'}},
  35. workers=2,
  36. reload=True,
  37. ignore_paths=['/path1', '/path2'],
  38. sigint_timeout=5,
  39. sigkill_timeout=1,
  40. )
  41. def test_several_cqrs_id(mocker, reload_transport):
  42. mocked_worker = mocker.patch('dj_cqrs.management.commands.cqrs_consume.WorkersManager')
  43. call_command(COMMAND_NAME, cqrs_id=['author', 'basic', 'author', 'no_db'])
  44. mocked_worker.assert_called_once_with(
  45. consume_kwargs={'cqrs_ids': {'author', 'basic', 'no_db'}},
  46. workers=1,
  47. reload=False,
  48. ignore_paths=None,
  49. sigint_timeout=5,
  50. sigkill_timeout=1,
  51. )
  52. def test_wrong_cqrs_id(reload_transport):
  53. with pytest.raises(CommandError) as e:
  54. call_command(COMMAND_NAME, cqrs_id=['author', 'random', 'no_db'])
  55. assert 'Wrong CQRS ID: random!' in str(e)
  56. def test_worker_manager_constructor_with_reload(mocker):
  57. mocked_flt_instance = mocker.MagicMock()
  58. mocked_pyfilter = mocker.patch(
  59. 'dj_cqrs.management.commands.cqrs_consume.PythonFilter',
  60. return_value=mocked_flt_instance,
  61. )
  62. mocked_watch = mocker.patch('dj_cqrs.management.commands.cqrs_consume.watch')
  63. worker = WorkersManager(
  64. {},
  65. reload=True,
  66. sigint_timeout=10,
  67. )
  68. assert worker.workers == 1
  69. assert worker.reload is True
  70. assert worker.sigint_timeout == 10
  71. assert worker.sigkill_timeout == 1
  72. assert isinstance(worker.stop_event, threading.Event)
  73. mocked_pyfilter.assert_called_once_with(ignore_paths=None)
  74. assert worker.watch_filter == mocked_flt_instance
  75. mocked_watch.assert_called_once_with(
  76. Path.cwd(),
  77. watch_filter=mocked_flt_instance,
  78. stop_event=worker.stop_event,
  79. yield_on_timeout=True,
  80. )
  81. def test_worker_manager_run_no_reload(mocker):
  82. mocked_start_process = mocker.patch('dj_cqrs.management.commands.cqrs_consume.start_process')
  83. worker = WorkersManager(
  84. {'cqrs_ids': {'author', 'basic', 'no_db'}},
  85. workers=2,
  86. )
  87. worker.stop_event.wait = mocker.MagicMock()
  88. worker.run()
  89. mocked_start_process.assert_called()
  90. def test_worker_manager_run_with_reload(mocker):
  91. mocker.patch.object(
  92. WorkersManager,
  93. '__next__',
  94. side_effect=[[Path.cwd() / Path('file1.py')], None],
  95. )
  96. mocked_start_process = mocker.patch('dj_cqrs.management.commands.cqrs_consume.start_process')
  97. worker = WorkersManager(
  98. {'cqrs_ids': {'author', 'basic', 'no_db'}},
  99. reload=True,
  100. )
  101. worker.stop_event.wait = mocker.MagicMock()
  102. worker.run()
  103. mocked_start_process.assert_called()
  104. def test_worker_manager_handle_signal():
  105. worker = WorkersManager({})
  106. worker.handle_signal()
  107. assert worker.stop_event.is_set()
  108. def test_worker_manager_iterator():
  109. worker = WorkersManager({})
  110. worker.watcher = iter([[(None, '/file1.py')], None])
  111. expected_result = [[Path('/file1.py')], None]
  112. result = []
  113. for file in worker:
  114. result.append(file)
  115. assert result == expected_result
  116. def test_consume(mocker):
  117. mocked_setup = mocker.patch('django.setup')
  118. mocked_consume = mocker.patch(
  119. 'dj_cqrs.transport.current_transport.consume',
  120. )
  121. consume_kwargs = {'cqrs_ids': {'author', 'basic', 'no_db'}}
  122. consume(**consume_kwargs)
  123. mocked_setup.assert_called_once()
  124. mocked_consume.assert_called_once_with(**consume_kwargs)