test_dataclasses.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from datetime import datetime, timezone
  3. from dj_cqrs.constants import SignalType
  4. from dj_cqrs.dataclasses import TransportPayload
  5. def test_transport_payload_infinite_expires():
  6. payload = TransportPayload.from_message(
  7. {
  8. 'signal_type': SignalType.SYNC,
  9. 'cqrs_id': 'cqrs_id',
  10. 'instance_data': {},
  11. 'instance_pk': 'id',
  12. 'expires': None,
  13. },
  14. )
  15. assert payload.expires is None
  16. def test_transport_payload_without_expires(mocker, settings):
  17. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  18. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  19. settings.CQRS['master']['CQRS_MESSAGE_TTL'] = 10
  20. expected_expires = datetime(2020, 1, 1, second=10, tzinfo=timezone.utc)
  21. payload = TransportPayload.from_message(
  22. {
  23. 'signal_type': SignalType.SYNC,
  24. 'cqrs_id': 'cqrs_id',
  25. 'instance_data': {},
  26. 'instance_pk': 'id',
  27. },
  28. )
  29. assert payload.expires == expected_expires