123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import pytest
- from django.core.management import CommandError, call_command
- from django.db import connections
- from dj_cqrs.constants import SignalType
- from tests.dj_master.models import Author
- from tests.utils import db_error
- COMMAND_NAME = 'cqrs_sync'
- def test_no_cqrs_id():
- with pytest.raises(CommandError):
- call_command(COMMAND_NAME)
- def test_bad_cqrs_id():
- with pytest.raises(CommandError) as e:
- call_command(COMMAND_NAME, '--cqrs-id=invalid')
- assert 'Wrong CQRS ID: invalid!' in str(e)
- def test_empty_filter_arg(capsys):
- call_command(COMMAND_NAME, '--cqrs-id=author')
- captured = capsys.readouterr()
- assert 'No objects found for filter!' in captured.out
- def test_unparseable_filter():
- with pytest.raises(CommandError) as e:
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={arg}')
- assert 'Bad filter kwargs!' in str(e)
- def test_non_kwargs_filter():
- with pytest.raises(CommandError) as e:
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f=[1]')
- assert 'Bad filter kwargs!' in str(e)
- def test_bad_kwargs_filter():
- with pytest.raises(CommandError) as e:
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"field": "value"}')
- assert 'Bad filter kwargs! Cannot resolve keyword' in str(e)
- @pytest.mark.django_db
- def test_empty_filter(capsys):
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id": 1}')
- captured = capsys.readouterr()
- assert 'No objects found for filter!' in captured.out
- @pytest.mark.django_db(transaction=True)
- def test_no_queue(mocker, capsys):
- Author.objects.create(id=1, name='author')
- publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id": 1}')
- publisher_mock.assert_called_once()
- payload = publisher_mock.call_args[0][0]
- assert payload.queue is None
- assert payload.signal_type is SignalType.SYNC
- captured = capsys.readouterr()
- assert 'Done!\n1 instance(s) synced.\n1 instance(s) processed.' in captured.out
- @pytest.mark.django_db(transaction=True)
- def test_queue_is_set(mocker, capsys):
- Author.objects.create(id=1, name='author')
- publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id": 1}', '--queue=replica')
- publisher_mock.assert_called_once()
- payload = publisher_mock.call_args[0][0]
- assert payload.queue == 'replica'
- assert payload.signal_type is SignalType.SYNC
- captured = capsys.readouterr()
- assert 'Done!\n1 instance(s) synced.\n1 instance(s) processed.' in captured.out
- @pytest.mark.django_db(transaction=True)
- def test_several_synced(mocker, capsys):
- for i in range(1, 3):
- Author.objects.create(id=i, name='author')
- publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id__in": [1, 2]}')
- assert publisher_mock.call_count == 2
- captured = capsys.readouterr()
- assert 'Done!\n2 instance(s) synced.\n2 instance(s) processed.' in captured.out
- @pytest.mark.django_db
- def test_error(capsys, mocker):
- Author.objects.create(id=2, name='2')
- mocker.patch('tests.dj_master.models.Author.cqrs_sync', side_effect=db_error)
- call_command(COMMAND_NAME, '--cqrs-id=author', '-f={}') # noqa: P103
- captured = capsys.readouterr()
- assert 'Sync record failed for pk=2' in captured.out
- assert '1 instance(s) processed.' in captured.out
- assert '0 instance(s) synced.' in captured.out
- @pytest.mark.django_db
- def test_progress(capsys):
- Author.objects.create(id=2, name='2')
- call_command(COMMAND_NAME, '--cqrs-id=author', '--progress', '-f={}', '--batch=2') # noqa: P103
- captured = capsys.readouterr()
- assert 'Processing 1 records with batch size 2' in captured.out
- assert '1 of 1 processed - 100% with rate' in captured.out
- @pytest.mark.django_db(transaction=True)
- def test_reconnect_on_unusable_connection(capsys, mocker):
- for i in range(1, 11):
- Author.objects.create(id=i, name='author')
- is_usable_mock = mocker.patch.object(
- connections['default'],
- 'is_usable',
- side_effect=[True, False, True],
- )
- call_command(COMMAND_NAME, '--cqrs-id=author', '--progress', '-f={}', '--batch=5') # noqa: P103
- assert is_usable_mock.call_count == 2
- captured = capsys.readouterr()
- assert 'Done!\n10 instance(s) synced.\n10 instance(s) processed.' in captured.out
|