test_utils.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from datetime import (
  3. date,
  4. datetime,
  5. timedelta,
  6. timezone,
  7. )
  8. from unittest.mock import patch
  9. from uuid import UUID
  10. import pytest
  11. from dj_cqrs.utils import (
  12. apply_query_timeouts,
  13. get_delay_queue_max_size,
  14. get_json_valid_value,
  15. get_message_expiration_dt,
  16. get_messages_prefetch_count_per_worker,
  17. )
  18. from tests.dj_replica import models
  19. def test_get_message_expiration_dt_fixed(mocker, settings):
  20. settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 3600
  21. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  22. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  23. result = get_message_expiration_dt()
  24. expected_result = fake_now + timedelta(seconds=3600)
  25. assert result == expected_result
  26. def test_get_message_expiration_dt_fixed_from_parameter(mocker, settings):
  27. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  28. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  29. result = get_message_expiration_dt(message_ttl=2200)
  30. expected_result = fake_now + timedelta(seconds=2200)
  31. assert result == expected_result
  32. def test_get_message_expiration_dt_infinite(mocker, settings):
  33. settings.CQRS['master']['CQRS_MESSAGE_TTL'] = None
  34. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  35. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  36. result = get_message_expiration_dt()
  37. assert result is None
  38. def test_get_delay_queue_max_size_master(settings):
  39. del settings.CQRS['replica']
  40. assert get_delay_queue_max_size() is None
  41. def test_get_delay_queue_max_size_replica(settings):
  42. settings.CQRS['replica']['delay_queue_max_size'] = 4
  43. assert get_delay_queue_max_size() == 4
  44. def test_get_messaged_prefetch_count_per_worker_no_delay_queue(settings):
  45. settings.CQRS['replica']['delay_queue_max_size'] = None
  46. assert get_messages_prefetch_count_per_worker() == 0
  47. def test_get_messaged_prefetch_count_per_worker_with_delay_queue(settings):
  48. settings.CQRS['replica']['delay_queue_max_size'] = 4
  49. assert get_messages_prefetch_count_per_worker() == 5
  50. @pytest.mark.parametrize(
  51. 'value,result',
  52. (
  53. (None, None),
  54. (1, 1),
  55. (datetime(2022, 1, 1, second=0, tzinfo=timezone.utc), '2022-01-01 00:00:00+00:00'),
  56. (date(2022, 2, 1), '2022-02-01'),
  57. (UUID('0419d87b-d477-44e4-82c4-310f56faa3c7'), '0419d87b-d477-44e4-82c4-310f56faa3c7'),
  58. ('abc', 'abc'),
  59. ),
  60. )
  61. def test_get_json_valid_value(value, result):
  62. assert get_json_valid_value(value) == result
  63. @pytest.mark.django_db
  64. @pytest.mark.parametrize(
  65. 'engine, p_count',
  66. [
  67. ('sqlite', 0),
  68. ('postgres', 1),
  69. ('mysql', 1),
  70. ],
  71. )
  72. def test_apply_query_timeouts(settings, engine, p_count):
  73. if settings.DB_ENGINE != engine:
  74. return
  75. settings.CQRS['replica']['CQRS_QUERY_TIMEOUT'] = 1
  76. with patch('dj_cqrs.utils.install_last_query_capturer') as p:
  77. assert apply_query_timeouts(models.BasicFieldsModelRef) is None
  78. assert p.call_count == p_count