123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import pytest
- from django.conf import settings
- from django.db.models import CharField, IntegerField, QuerySet
- from django.utils.timezone import now
- from dj_cqrs.constants import SignalType
- from dj_cqrs.dataclasses import TransportPayload
- from dj_cqrs.metas import ReplicaMeta
- from dj_cqrs.mixins import RawReplicaMixin
- from tests.dj.transport import TransportStub
- from tests.dj_replica import models
- from tests.utils import db_error
- class ReplicaMetaTest(ReplicaMeta):
- @classmethod
- def check_cqrs_mapping(cls, model_cls):
- return cls._check_cqrs_mapping(model_cls)
- def test_cqrs_fields_non_existing_field(mocker):
- with pytest.raises(AssertionError) as e:
- class Cls(object):
- CQRD_ID = 'ID'
- CQRS_MAPPING = {
- 'chr_field': 'char_field',
- 'integer_field': 'int_field',
- }
- char_field = CharField(max_length=100, primary_key=True)
- int_field = IntegerField()
- _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=())
- _meta.pk.name = 'char_field'
- ReplicaMetaTest.check_cqrs_mapping(Cls)
- assert str(e.value) == 'CQRS_MAPPING field is not correctly set for model Cls.'
- def test_cqrs_fields_id_is_not_included(mocker):
- with pytest.raises(AssertionError) as e:
- class Cls(object):
- CQRD_ID = 'ID'
- CQRS_MAPPING = {
- 'integer_field': 'int_field',
- }
- char_field = CharField(max_length=100, primary_key=True)
- int_field = IntegerField()
- _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=())
- _meta.pk.name = 'char_field'
- ReplicaMetaTest.check_cqrs_mapping(Cls)
- assert str(e.value) == 'PK is not in CQRS_MAPPING for model Cls.'
- def test_cqrs_fields_duplicates(mocker):
- with pytest.raises(AssertionError) as e:
- class Cls(object):
- CQRD_ID = 'ID'
- CQRS_MAPPING = {
- 'integer_field': 'char_field',
- 'char_field': 'char_field',
- }
- char_field = CharField(max_length=100, primary_key=True)
- int_field = IntegerField()
- _meta = mocker.MagicMock(concrete_fields=(char_field, int_field), private_fields=())
- _meta.pk.name = 'char_field'
- ReplicaMetaTest.check_cqrs_mapping(Cls)
- assert str(e.value) == 'Duplicate names in CQRS_MAPPING field for model Cls.'
- @pytest.mark.django_db
- def test_create_simple():
- instance = models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- 'bool_field': False,
- 'date_field': None,
- 'datetime_field': now(),
- 'float_field': 1.25,
- },
- )
- assert isinstance(instance, models.BasicFieldsModelRef)
- instance.refresh_from_db()
- assert instance.char_field == 'text'
- assert instance.float_field == 1.25
- @pytest.mark.django_db
- def test_create_simple_excessive_data():
- instance = models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- 'unexpected_field': 'value',
- },
- )
- assert isinstance(instance, models.BasicFieldsModelRef)
- @pytest.mark.django_db
- def test_create_simple_insufficient_data(caplog):
- models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- },
- )
- assert 'Not all required CQRS fields are provided in data (basic).' in caplog.text
- @pytest.mark.django_db
- def test_create_mapped(caplog):
- instance = models.MappedFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- },
- )
- assert isinstance(instance, models.MappedFieldsModelRef)
- instance.refresh_from_db()
- assert instance.id == 1
- assert instance.name == 'text'
- @pytest.mark.django_db
- def test_create_mapped_bad_mapping(caplog):
- models.BadMappingModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- },
- )
- assert 'Bad master-replica mapping for invalid_field (basic_3).' in caplog.text
- @pytest.mark.django_db
- def test_create_db_error(mocker, caplog):
- mocker.patch.object(models.BasicFieldsModelRef.objects, 'create', side_effect=db_error)
- models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- },
- )
- assert 'CQRS create error: pk = 1 (basic).' in caplog.text
- @pytest.mark.django_db
- def test_update_ok():
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- )
- instance = models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- 'char_field': 'new_text',
- 'float_field': 1.30,
- },
- )
- assert isinstance(instance, models.BasicFieldsModelRef)
- instance.refresh_from_db()
- assert instance.int_field == 1
- assert instance.char_field == 'new_text'
- assert instance.float_field == 1.30
- @pytest.mark.django_db
- def test_update_db_error(mocker, caplog):
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- )
- mocker.patch.object(models.BasicFieldsModelRef, 'save', side_effect=db_error)
- models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- },
- )
- assert 'CQRS update error: pk = 1, cqrs_revision = 1 (basic).' in caplog.text
- @pytest.mark.django_db
- def test_delete_ok():
- dt = now()
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': dt,
- 'char_field': 'text',
- }
- )
- is_deleted = models.BasicFieldsModelRef.cqrs_delete(
- {
- 'id': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': dt,
- },
- )
- assert is_deleted
- assert models.BasicFieldsModelRef.objects.count() == 0
- @pytest.mark.django_db
- def test_delete_non_existing_id():
- is_deleted = models.BasicFieldsModelRef.cqrs_delete(
- {
- 'id': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- },
- )
- assert is_deleted
- assert models.BasicFieldsModelRef.objects.count() == 0
- @pytest.mark.django_db
- def test_delete_db_error(mocker, caplog):
- mocker.patch.object(models.BasicFieldsModelRef.objects, 'filter', side_effect=db_error)
- is_deleted = models.BasicFieldsModelRef.cqrs_delete(
- {
- 'id': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- },
- )
- assert not is_deleted
- assert 'CQRS delete error: pk = 1' in caplog.text
- @pytest.mark.django_db
- def test_save_bad_master_data_field_type(caplog):
- models.BadTypeModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'datetime_field': now(),
- },
- )
- assert 'CQRS create error: pk = 1 (basic_1).' in caplog.text
- @pytest.mark.django_db
- def test_save_no_pk_in_master_data(caplog):
- models.BasicFieldsModelRef.cqrs_save(
- {
- 'id': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- },
- )
- assert 'CQRS PK is not provided in data (basic).' in caplog.text
- @pytest.mark.django_db
- def test_save_no_cqrs_fields_in_master_data(caplog):
- models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'char_field': 'text',
- },
- )
- assert 'CQRS sync fields are not provided in data (basic).' in caplog.text
- @pytest.mark.django_db
- def test_delete_no_id_in_master_data(caplog):
- is_deleted = models.BasicFieldsModelRef.cqrs_delete(
- {
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- },
- )
- assert not is_deleted
- assert 'CQRS PK is not provided in data (basic).' in caplog.text
- @pytest.mark.django_db
- def test_delete_no_cqrs_fields_in_master_data(caplog):
- is_deleted = models.BasicFieldsModelRef.cqrs_delete(
- {
- 'id': 1,
- 'cqrs_revision': 0,
- },
- )
- assert not is_deleted
- assert 'CQRS sync fields are not provided in data (basic).' in caplog.text
- @pytest.mark.django_db(transaction=True)
- def test_update_before_create_is_over(caplog):
- create_data = {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- update_data = {
- 'int_field': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- 'char_field': 'new_text',
- }
- updated_instance = models.BasicFieldsModelRef.cqrs_save(update_data)
- created_instance = models.BasicFieldsModelRef.cqrs.create_instance(create_data)
- updated_instance.refresh_from_db()
- assert updated_instance.cqrs_revision == 1
- assert updated_instance.char_field == 'new_text'
- assert not created_instance
- errors = {
- 'sqlite': (
- 'UNIQUE constraint failed: dj_replica_basicfieldsmodelref.int_field\n'
- 'CQRS create error: pk = 1 (basic).\n'
- ),
- 'postgres': (
- 'duplicate key value violates unique constraint "dj_replica_basicfieldsmodelref_pkey"\n'
- 'DETAIL: Key (int_field)=(1) already exists.\n\n'
- 'CQRS create error: pk = 1 (basic).\n'
- ),
- 'mysql': (
- '(1062, "Duplicate entry \'1\' for key \'dj_replica_basicfieldsmodelref.PRIMARY\'")\n'
- 'CQRS create error: pk = 1 (basic).\n'
- ),
- }
- assert errors[settings.DB_ENGINE] in caplog.text
- @pytest.mark.django_db(transaction=True)
- def test_wrong_update_order(caplog):
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- )
- update_data_1 = {
- 'int_field': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- 'char_field': 'new_text_1',
- }
- update_data_2 = {
- 'int_field': 1,
- 'cqrs_revision': 2,
- 'cqrs_updated': now(),
- 'char_field': 'new_text_2',
- }
- earlier_instance = models.BasicFieldsModelRef.cqrs_save(update_data_2)
- later_instance = models.BasicFieldsModelRef.cqrs_save(update_data_1)
- earlier_instance.refresh_from_db()
- assert earlier_instance.cqrs_revision == 2
- assert earlier_instance.char_field == 'new_text_2'
- assert later_instance
- e = 'Wrong CQRS sync order: pk = 1, cqrs_revision = new 1 / existing 2 (basic).'
- assert e in caplog.text
- @pytest.mark.django_db(transaction=True)
- def test_de_duplication(caplog):
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- )
- update_data = {
- 'int_field': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- 'char_field': 'new_text',
- }
- earlier_instance = models.BasicFieldsModelRef.cqrs_save(update_data)
- duplicate_instance = models.BasicFieldsModelRef.cqrs_save(update_data)
- assert earlier_instance.cqrs_revision == 1
- assert earlier_instance.char_field == 'new_text'
- assert duplicate_instance.cqrs_revision == 1
- assert duplicate_instance.char_field == 'new_text'
- assert 'Received duplicate CQRS data: pk = 1, cqrs_revision = 1 (basic).' in caplog.text
- @pytest.mark.django_db(transaction=True)
- def test_create_before_delete_is_over(caplog):
- # This situation may extremely rarely happen, if the IDs are not auto incremented on master
- # and are not unique in the infinite timeline.
- # This will lead to expected inconsistency.
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- )
- delete_data = {
- 'id': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- }
- new_create_data = {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'other',
- }
- models.BasicFieldsModelRef.cqrs_save(new_create_data)
- is_deleted = models.BasicFieldsModelRef.cqrs_delete(delete_data)
- assert 'Received duplicate CQRS data: pk = 1, cqrs_revision = 0 (basic).' in caplog.text
- assert 'CQRS potential creation race condition: pk = 1 (basic).' in caplog.text
- assert is_deleted
- @pytest.mark.django_db
- def test_updates_were_lost(caplog):
- models.BasicFieldsModelRef.objects.create(
- **{
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- )
- models.BasicFieldsModelRef.cqrs_save(
- {
- 'int_field': 1,
- 'cqrs_revision': 5,
- 'cqrs_updated': now(),
- 'char_field': 'text1',
- },
- )
- assert 'Lost or filtered out 4 CQRS packages: pk = 1, cqrs_revision = 5 (basic)' in caplog.text
- @pytest.mark.django_db()
- def test_tracked_fields_mapped(mocker):
- cqrs_update_mock = mocker.patch.object(models.MappedFieldsModelRef, 'cqrs_update')
- first_payload = {
- 'int_field': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- 'char_field': 'text',
- }
- second_payload = {
- 'int_field': 1,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- 'char_field': 'new_text',
- }
- models.MappedFieldsModelRef.cqrs_save(first_payload)
- models.MappedFieldsModelRef.cqrs_save(second_payload, previous_data={'char_field': 'text'})
- assert cqrs_update_mock.call_count == 1
- _, kwargs = cqrs_update_mock.call_args
- assert 'previous_data' in kwargs
- assert kwargs['previous_data'] == {'name': 'text'}
- @pytest.mark.django_db
- def test_select_for_update_lock(mocker):
- m = mocker.patch.object(
- QuerySet,
- 'select_for_update',
- return_value=models.LockModelRef.objects.all(),
- )
- instance = models.LockModelRef.cqrs_save(
- {
- 'id': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- },
- )
- assert instance.id == 1
- m.assert_called_once()
- @pytest.mark.django_db
- def test_nodb(mocker):
- with pytest.raises(NotImplementedError):
- models.NoDBModelRef.cqrs_save(None)
- with pytest.raises(NotImplementedError):
- models.NoDBModelRef.cqrs_delete(None)
- @pytest.mark.parametrize(
- 'cqrs_max_retries, current_retry, expected_result',
- [
- (5, 0, True),
- (5, 5, False),
- (-1, 0, False),
- (0, 0, False), # Disabled
- (None, 10000, True), # Infinite
- ],
- )
- def test_should_retry_cqrs(settings, cqrs_max_retries, current_retry, expected_result):
- settings.CQRS['replica']['CQRS_MAX_RETRIES'] = cqrs_max_retries
- result = models.BasicFieldsModelRef.should_retry_cqrs(current_retry)
- assert result is expected_result
- @pytest.mark.parametrize('retry_delay', (0, 5))
- @pytest.mark.parametrize('current_retry', (0, 1))
- def test_get_cqrs_retry_delay(settings, retry_delay, current_retry):
- settings.CQRS['replica']['CQRS_RETRY_DELAY'] = retry_delay
- result = models.BasicFieldsModelRef.get_cqrs_retry_delay(current_retry=current_retry)
- assert result is retry_delay
- @pytest.mark.django_db(transaction=True)
- def test_support_for_meta_create():
- meta = TransportStub.consume(
- TransportPayload(
- SignalType.SAVE,
- models.CQRSMetaModel.CQRS_ID,
- {
- 'id': 1,
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- },
- 1,
- meta={'Hello': 'world'},
- ),
- )
- assert meta == {'Hello': 'world'}
- @pytest.mark.django_db(transaction=True)
- def test_support_for_meta_update():
- models.CQRSMetaModel.objects.create(id=2, cqrs_revision=0, cqrs_updated=now())
- cqrs_updated = now()
- t = TransportStub.consume(
- TransportPayload(
- SignalType.SYNC,
- models.CQRSMetaModel.CQRS_ID,
- {
- 'id': 2,
- 'cqrs_revision': 1,
- 'cqrs_updated': cqrs_updated,
- },
- 2,
- meta=[1, 2, 3],
- ),
- )
- assert t == (
- True,
- {'id': 2, 'cqrs_revision': 1, 'cqrs_updated': cqrs_updated},
- None,
- [1, 2, 3],
- )
- @pytest.mark.django_db(transaction=True)
- def test_support_for_meta_delete():
- models.CQRSMetaModel.objects.create(id=3, cqrs_revision=0, cqrs_updated=now())
- meta = TransportStub.consume(
- TransportPayload(
- SignalType.DELETE,
- models.CQRSMetaModel.CQRS_ID,
- {
- 'id': 3,
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- },
- 3,
- meta={1: 2, 2: {}},
- ),
- )
- assert meta == {1: 2, 2: {}}
- assert not models.CQRSMetaModel.objects.exists()
- def test_raw_replica_mixin():
- assert RawReplicaMixin.CQRS_ID is None
- assert RawReplicaMixin.CQRS_NO_DB_OPERATIONS is True
- assert RawReplicaMixin.CQRS_META is False
- with pytest.raises(NotImplementedError):
- RawReplicaMixin.cqrs_save(None)
- with pytest.raises(NotImplementedError):
- RawReplicaMixin.cqrs_delete(None)
- @pytest.mark.parametrize('cqrs_id', ('document1', 'document2'))
- def test_support_for_non_models_save(cqrs_id):
- data = {
- 'slug': 'test',
- 'cqrs_revision': 0,
- 'cqrs_updated': now(),
- }
- meta = {'Hello': 'world'}
- _cqrs_id, _data, kwargs = TransportStub.consume(
- TransportPayload(SignalType.SAVE, cqrs_id, data, 'test', meta=meta),
- )
- assert _cqrs_id == cqrs_id
- assert _data == data
- assert kwargs == {'previous_data': None, 'meta': meta}
- @pytest.mark.parametrize('cqrs_id', ('document1', 'document2'))
- def test_support_for_non_models_delete(cqrs_id):
- data = {
- 'slug': 'test',
- 'cqrs_revision': 1,
- 'cqrs_updated': now(),
- }
- meta = {'other': 1}
- _cqrs_id, _data, kwargs = TransportStub.consume(
- TransportPayload(SignalType.DELETE, cqrs_id, data, 'test', meta=meta),
- )
- assert _cqrs_id == cqrs_id
- assert _data == data
- assert kwargs == {'meta': meta}
|