test_sync.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import pytest
  3. from django.core.management import CommandError, call_command
  4. from django.db import connections
  5. from dj_cqrs.constants import SignalType
  6. from tests.dj_master.models import Author
  7. from tests.utils import db_error
  8. COMMAND_NAME = 'cqrs_sync'
  9. def test_no_cqrs_id():
  10. with pytest.raises(CommandError):
  11. call_command(COMMAND_NAME)
  12. def test_bad_cqrs_id():
  13. with pytest.raises(CommandError) as e:
  14. call_command(COMMAND_NAME, '--cqrs-id=invalid')
  15. assert 'Wrong CQRS ID: invalid!' in str(e)
  16. def test_empty_filter_arg(capsys):
  17. call_command(COMMAND_NAME, '--cqrs-id=author')
  18. captured = capsys.readouterr()
  19. assert 'No objects found for filter!' in captured.out
  20. def test_unparseable_filter():
  21. with pytest.raises(CommandError) as e:
  22. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={arg}')
  23. assert 'Bad filter kwargs!' in str(e)
  24. def test_non_kwargs_filter():
  25. with pytest.raises(CommandError) as e:
  26. call_command(COMMAND_NAME, '--cqrs-id=author', '-f=[1]')
  27. assert 'Bad filter kwargs!' in str(e)
  28. def test_bad_kwargs_filter():
  29. with pytest.raises(CommandError) as e:
  30. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"field": "value"}')
  31. assert 'Bad filter kwargs! Cannot resolve keyword' in str(e)
  32. @pytest.mark.django_db
  33. def test_empty_filter(capsys):
  34. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id": 1}')
  35. captured = capsys.readouterr()
  36. assert 'No objects found for filter!' in captured.out
  37. @pytest.mark.django_db(transaction=True)
  38. def test_no_queue(mocker, capsys):
  39. Author.objects.create(id=1, name='author')
  40. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  41. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id": 1}')
  42. publisher_mock.assert_called_once()
  43. payload = publisher_mock.call_args[0][0]
  44. assert payload.queue is None
  45. assert payload.signal_type is SignalType.SYNC
  46. captured = capsys.readouterr()
  47. assert 'Done!\n1 instance(s) synced.\n1 instance(s) processed.' in captured.out
  48. @pytest.mark.django_db(transaction=True)
  49. def test_queue_is_set(mocker, capsys):
  50. Author.objects.create(id=1, name='author')
  51. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  52. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id": 1}', '--queue=replica')
  53. publisher_mock.assert_called_once()
  54. payload = publisher_mock.call_args[0][0]
  55. assert payload.queue == 'replica'
  56. assert payload.signal_type is SignalType.SYNC
  57. captured = capsys.readouterr()
  58. assert 'Done!\n1 instance(s) synced.\n1 instance(s) processed.' in captured.out
  59. @pytest.mark.django_db(transaction=True)
  60. def test_several_synced(mocker, capsys):
  61. for i in range(1, 3):
  62. Author.objects.create(id=i, name='author')
  63. publisher_mock = mocker.patch('dj_cqrs.controller.producer.produce')
  64. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={"id__in": [1, 2]}')
  65. assert publisher_mock.call_count == 2
  66. captured = capsys.readouterr()
  67. assert 'Done!\n2 instance(s) synced.\n2 instance(s) processed.' in captured.out
  68. @pytest.mark.django_db
  69. def test_error(capsys, mocker):
  70. Author.objects.create(id=2, name='2')
  71. mocker.patch('tests.dj_master.models.Author.cqrs_sync', side_effect=db_error)
  72. call_command(COMMAND_NAME, '--cqrs-id=author', '-f={}') # noqa: P103
  73. captured = capsys.readouterr()
  74. assert 'Sync record failed for pk=2' in captured.out
  75. assert '1 instance(s) processed.' in captured.out
  76. assert '0 instance(s) synced.' in captured.out
  77. @pytest.mark.django_db
  78. def test_progress(capsys):
  79. Author.objects.create(id=2, name='2')
  80. call_command(COMMAND_NAME, '--cqrs-id=author', '--progress', '-f={}', '--batch=2') # noqa: P103
  81. captured = capsys.readouterr()
  82. assert 'Processing 1 records with batch size 2' in captured.out
  83. assert '1 of 1 processed - 100% with rate' in captured.out
  84. @pytest.mark.django_db(transaction=True)
  85. def test_reconnect_on_unusable_connection(capsys, mocker):
  86. for i in range(1, 11):
  87. Author.objects.create(id=i, name='author')
  88. is_usable_mock = mocker.patch.object(
  89. connections['default'],
  90. 'is_usable',
  91. side_effect=[True, False, True],
  92. )
  93. call_command(COMMAND_NAME, '--cqrs-id=author', '--progress', '-f={}', '--batch=5') # noqa: P103
  94. assert is_usable_mock.call_count == 2
  95. captured = capsys.readouterr()
  96. assert 'Done!\n10 instance(s) synced.\n10 instance(s) processed.' in captured.out