123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- from datetime import (
- date,
- datetime,
- timedelta,
- timezone,
- )
- from unittest.mock import patch
- from uuid import UUID
- import pytest
- from dj_cqrs.utils import (
- apply_query_timeouts,
- get_delay_queue_max_size,
- get_json_valid_value,
- get_message_expiration_dt,
- get_messages_prefetch_count_per_worker,
- )
- from tests.dj_replica import models
- def test_get_message_expiration_dt_fixed(mocker, settings):
- settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 3600
- fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- mocker.patch('django.utils.timezone.now', return_value=fake_now)
- result = get_message_expiration_dt()
- expected_result = fake_now + timedelta(seconds=3600)
- assert result == expected_result
- def test_get_message_expiration_dt_fixed_from_parameter(mocker, settings):
- fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- mocker.patch('django.utils.timezone.now', return_value=fake_now)
- result = get_message_expiration_dt(message_ttl=2200)
- expected_result = fake_now + timedelta(seconds=2200)
- assert result == expected_result
- def test_get_message_expiration_dt_infinite(mocker, settings):
- settings.CQRS['master']['CQRS_MESSAGE_TTL'] = None
- fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- mocker.patch('django.utils.timezone.now', return_value=fake_now)
- result = get_message_expiration_dt()
- assert result is None
- def test_get_delay_queue_max_size_master(settings):
- del settings.CQRS['replica']
- assert get_delay_queue_max_size() is None
- def test_get_delay_queue_max_size_replica(settings):
- settings.CQRS['replica']['delay_queue_max_size'] = 4
- assert get_delay_queue_max_size() == 4
- def test_get_messaged_prefetch_count_per_worker_no_delay_queue(settings):
- settings.CQRS['replica']['delay_queue_max_size'] = None
- assert get_messages_prefetch_count_per_worker() == 0
- def test_get_messaged_prefetch_count_per_worker_with_delay_queue(settings):
- settings.CQRS['replica']['delay_queue_max_size'] = 4
- assert get_messages_prefetch_count_per_worker() == 5
- @pytest.mark.parametrize(
- 'value,result',
- (
- (None, None),
- (1, 1),
- (datetime(2022, 1, 1, second=0, tzinfo=timezone.utc), '2022-01-01 00:00:00+00:00'),
- (date(2022, 2, 1), '2022-02-01'),
- (UUID('0419d87b-d477-44e4-82c4-310f56faa3c7'), '0419d87b-d477-44e4-82c4-310f56faa3c7'),
- ('abc', 'abc'),
- ),
- )
- def test_get_json_valid_value(value, result):
- assert get_json_valid_value(value) == result
- @pytest.mark.django_db
- @pytest.mark.parametrize(
- 'engine, p_count',
- [
- ('sqlite', 0),
- ('postgres', 1),
- ('mysql', 1),
- ],
- )
- def test_apply_query_timeouts(settings, engine, p_count):
- if settings.DB_ENGINE != engine:
- return
- settings.CQRS['replica']['CQRS_QUERY_TIMEOUT'] = 1
- with patch('dj_cqrs.utils.install_last_query_capturer') as p:
- assert apply_query_timeouts(models.BasicFieldsModelRef) is None
- assert p.call_count == p_count
|