123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import threading
- from importlib import import_module, reload
- from pathlib import Path
- import pytest
- from django.core.management import CommandError, call_command
- from dj_cqrs.management.commands.cqrs_consume import WorkersManager, consume
- COMMAND_NAME = 'cqrs_consume'
- @pytest.fixture
- def reload_transport():
- reload(import_module('dj_cqrs.transport'))
- def test_no_arguments(mocker, reload_transport):
- mocked_worker = mocker.patch('dj_cqrs.management.commands.cqrs_consume.WorkersManager')
- call_command(COMMAND_NAME)
- mocked_worker.assert_called_once_with(
- consume_kwargs={},
- workers=1,
- reload=False,
- ignore_paths=None,
- sigint_timeout=5,
- sigkill_timeout=1,
- )
- def test_with_arguments(mocker, reload_transport):
- mocked_worker = mocker.patch('dj_cqrs.management.commands.cqrs_consume.WorkersManager')
- mocker.patch(
- 'dj_cqrs.management.commands.cqrs_consume.Path',
- side_effect=[
- mocker.MagicMock(resolve=mocker.MagicMock(return_value='/path1')),
- mocker.MagicMock(resolve=mocker.MagicMock(return_value='/path2')),
- ],
- )
- call_command(COMMAND_NAME, '--workers=2', '-r', '-cid=author', '--ignore-paths=path1,path2')
- mocked_worker.assert_called_once_with(
- consume_kwargs={'cqrs_ids': {'author'}},
- workers=2,
- reload=True,
- ignore_paths=['/path1', '/path2'],
- sigint_timeout=5,
- sigkill_timeout=1,
- )
- def test_several_cqrs_id(mocker, reload_transport):
- mocked_worker = mocker.patch('dj_cqrs.management.commands.cqrs_consume.WorkersManager')
- call_command(COMMAND_NAME, cqrs_id=['author', 'basic', 'author', 'no_db'])
- mocked_worker.assert_called_once_with(
- consume_kwargs={'cqrs_ids': {'author', 'basic', 'no_db'}},
- workers=1,
- reload=False,
- ignore_paths=None,
- sigint_timeout=5,
- sigkill_timeout=1,
- )
- def test_wrong_cqrs_id(reload_transport):
- with pytest.raises(CommandError) as e:
- call_command(COMMAND_NAME, cqrs_id=['author', 'random', 'no_db'])
- assert 'Wrong CQRS ID: random!' in str(e)
- def test_worker_manager_constructor_with_reload(mocker):
- mocked_flt_instance = mocker.MagicMock()
- mocked_pyfilter = mocker.patch(
- 'dj_cqrs.management.commands.cqrs_consume.PythonFilter',
- return_value=mocked_flt_instance,
- )
- mocked_watch = mocker.patch('dj_cqrs.management.commands.cqrs_consume.watch')
- worker = WorkersManager(
- {},
- reload=True,
- sigint_timeout=10,
- )
- assert worker.workers == 1
- assert worker.reload is True
- assert worker.sigint_timeout == 10
- assert worker.sigkill_timeout == 1
- assert isinstance(worker.stop_event, threading.Event)
- mocked_pyfilter.assert_called_once_with(ignore_paths=None)
- assert worker.watch_filter == mocked_flt_instance
- mocked_watch.assert_called_once_with(
- Path.cwd(),
- watch_filter=mocked_flt_instance,
- stop_event=worker.stop_event,
- yield_on_timeout=True,
- )
- def test_worker_manager_run_no_reload(mocker):
- mocked_start_process = mocker.patch('dj_cqrs.management.commands.cqrs_consume.start_process')
- worker = WorkersManager(
- {'cqrs_ids': {'author', 'basic', 'no_db'}},
- workers=2,
- )
- worker.stop_event.wait = mocker.MagicMock()
- worker.run()
- mocked_start_process.assert_called()
- def test_worker_manager_run_with_reload(mocker):
- mocker.patch.object(
- WorkersManager,
- '__next__',
- side_effect=[[Path.cwd() / Path('file1.py')], None],
- )
- mocked_start_process = mocker.patch('dj_cqrs.management.commands.cqrs_consume.start_process')
- worker = WorkersManager(
- {'cqrs_ids': {'author', 'basic', 'no_db'}},
- reload=True,
- )
- worker.stop_event.wait = mocker.MagicMock()
- worker.run()
- mocked_start_process.assert_called()
- def test_worker_manager_handle_signal():
- worker = WorkersManager({})
- worker.handle_signal()
- assert worker.stop_event.is_set()
- def test_worker_manager_iterator():
- worker = WorkersManager({})
- worker.watcher = iter([[(None, '/file1.py')], None])
- expected_result = [[Path('/file1.py')], None]
- result = []
- for file in worker:
- result.append(file)
- assert result == expected_result
- def test_consume(mocker):
- mocked_setup = mocker.patch('django.setup')
- mocked_consume = mocker.patch(
- 'dj_cqrs.transport.current_transport.consume',
- )
- consume_kwargs = {'cqrs_ids': {'author', 'basic', 'no_db'}}
- consume(**consume_kwargs)
- mocked_setup.assert_called_once()
- mocked_consume.assert_called_once_with(**consume_kwargs)
|