test_diff_replica.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import sys
  3. from io import StringIO
  4. import pytest
  5. from django.conf import settings
  6. from django.core.management import CommandError, call_command
  7. from django.utils.timezone import now
  8. from tests.dj_master.models import Author
  9. from tests.dj_replica.models import AuthorRef
  10. COMMAND_NAME = 'cqrs_diff_replica'
  11. def test_bad_cqrs_id(mocker):
  12. mocker.patch.object(sys, 'stdin', StringIO('invalid,datetime\n'))
  13. with pytest.raises(CommandError) as e:
  14. call_command(COMMAND_NAME)
  15. assert 'Wrong CQRS ID: invalid!' in str(e)
  16. @pytest.mark.django_db
  17. def test_no_rows(mocker, capsys):
  18. mocker.patch.object(sys, 'stdin', StringIO('author,datetime\n'))
  19. call_command(COMMAND_NAME)
  20. captured = capsys.readouterr()
  21. assert not captured.err
  22. assert 'author,datetime,replica\n' == captured.out
  23. def master_replica_pipe(capsys, mocker, *args):
  24. call_command('cqrs_diff_master', '--cqrs-id=author', *args)
  25. captured = capsys.readouterr()
  26. mocker.patch.object(sys, 'stdin', StringIO(captured.out))
  27. call_command(COMMAND_NAME)
  28. return capsys.readouterr()
  29. @pytest.mark.django_db
  30. def test_first_row(capsys, mocker):
  31. Author.objects.create(name='author', id=1)
  32. captured = master_replica_pipe(capsys, mocker)
  33. first_row = captured.out.split('\n')[0]
  34. assert '{0},'.format(Author.CQRS_ID) in first_row
  35. assert settings.CQRS['queue'] in first_row
  36. @pytest.mark.django_db
  37. def test_all_synced(capsys, mocker):
  38. mocker.patch('dj_cqrs.controller.producer.produce')
  39. Author.objects.create(id=1, name='name')
  40. AuthorRef.objects.create(id=1, name='name', cqrs_revision=0, cqrs_updated=now())
  41. captured = master_replica_pipe(capsys, mocker)
  42. assert not captured.err
  43. @pytest.mark.django_db
  44. def test_sync_for_all(mocker, capsys):
  45. for i in range(3):
  46. Author.objects.create(name=str(i), id=i)
  47. captured = master_replica_pipe(capsys, mocker)
  48. for err_text in ('PK to resync', '0', '1', '2'):
  49. assert err_text in captured.err
  50. second_row = captured.out.split('\n')[1]
  51. assert '[0,1,2]' in second_row
  52. @pytest.mark.django_db
  53. def test_partial_sync(mocker, capsys):
  54. mocker.patch('dj_cqrs.controller.producer.produce')
  55. Author.objects.create(id=1, name='name')
  56. AuthorRef.objects.create(id=1, name='name', cqrs_revision=0, cqrs_updated=now())
  57. Author.objects.create(id=2, name='2')
  58. captured = master_replica_pipe(capsys, mocker)
  59. assert 'PK to resync' in captured.err
  60. assert '2' in captured.err
  61. second_row = captured.out.split('\n')[1]
  62. assert '[2]' in second_row
  63. @pytest.mark.django_db
  64. def test_sync_batch(mocker, capsys):
  65. mocker.patch('dj_cqrs.controller.producer.produce')
  66. for i in range(3):
  67. Author.objects.create(id=i, name=str(i))
  68. AuthorRef.objects.create(id=1, name='name', cqrs_revision=0, cqrs_updated=now())
  69. captured = master_replica_pipe(capsys, mocker, '--batch=1')
  70. for err_text in ('PK to resync', '0', '2'):
  71. assert err_text in captured.err
  72. second_row = captured.out.split('\n')[1]
  73. assert '[0]' in second_row
  74. third_row = captured.out.split('\n')[2]
  75. assert '[2]' in third_row