test_delay.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from datetime import datetime, timedelta, timezone
  3. from queue import Full
  4. import pytest
  5. from dj_cqrs.delay import DelayMessage, DelayQueue
  6. def test_delay_message(mocker):
  7. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  8. eta = fake_now + timedelta(seconds=10)
  9. mocker.patch('django.utils.timezone.now', return_value=fake_now)
  10. delay_message = DelayMessage(1, {'test': 'data'}, eta)
  11. assert delay_message.delivery_tag == 1
  12. assert delay_message.payload == {'test': 'data'}
  13. expected_eta = datetime(2020, 1, 1, second=10, tzinfo=timezone.utc)
  14. assert delay_message.eta == expected_eta
  15. def test_delay_queue_put():
  16. fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  17. delay_message = DelayMessage(1, {'test': 'data'}, fake_now)
  18. delay_queue = DelayQueue()
  19. delay_queue.put(delay_message)
  20. assert delay_queue.qsize() == 1
  21. result_message = delay_queue.get()
  22. assert result_message is delay_message
  23. def test_delay_queue_put_same_eta():
  24. eta = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  25. delay_messages = [DelayMessage(delivery_tag, None, eta) for delivery_tag in range(10)]
  26. delay_queue = DelayQueue()
  27. for delay_message in delay_messages:
  28. delay_queue.put(delay_message)
  29. assert delay_queue.qsize() == 10
  30. assert delay_queue.get()
  31. def test_delay_queue_put_full():
  32. eta = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  33. delay_queue = DelayQueue(max_size=1)
  34. delay_queue.put(
  35. DelayMessage(1, None, eta),
  36. )
  37. with pytest.raises(Full):
  38. delay_queue.put(
  39. DelayMessage(2, None, eta),
  40. )
  41. assert delay_queue.qsize() == 1
  42. assert delay_queue.get().delivery_tag == 1
  43. def test_delay_queue_get_ready(mocker):
  44. fake_put_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
  45. mocker.patch('django.utils.timezone.now', return_value=fake_put_now)
  46. delay_queue = DelayQueue()
  47. delay_messages = []
  48. for delay in (1, 0, 3600, 2):
  49. eta = fake_put_now + timedelta(seconds=delay)
  50. delay_message = DelayMessage(None, None, eta)
  51. delay_queue.put(delay_message)
  52. delay_messages.append(delay_message)
  53. mocker.stopall()
  54. fake_get_ready_now = datetime(2020, 1, 1, second=3, tzinfo=timezone.utc)
  55. mocker.patch('django.utils.timezone.now', return_value=fake_get_ready_now)
  56. ready_messages = list(delay_queue.get_ready())
  57. assert len(ready_messages) == 3
  58. sorted_expected = sorted(delay_messages, key=lambda x: x.eta)
  59. expected_not_ready = sorted_expected.pop()
  60. for expected, result in zip(sorted_expected, ready_messages):
  61. assert expected is result
  62. assert delay_queue.qsize() == 1
  63. result_message = delay_queue.get()
  64. assert result_message is expected_not_ready
  65. def test_delay_queue_invalid_max_size():
  66. with pytest.raises(AssertionError) as e:
  67. DelayQueue(max_size=0)
  68. assert e.value.args[0] == 'Delay queue max_size should be positive integer.'