123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- from datetime import datetime, timedelta, timezone
- from queue import Full
- import pytest
- from dj_cqrs.delay import DelayMessage, DelayQueue
- def test_delay_message(mocker):
- fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- eta = fake_now + timedelta(seconds=10)
- mocker.patch('django.utils.timezone.now', return_value=fake_now)
- delay_message = DelayMessage(1, {'test': 'data'}, eta)
- assert delay_message.delivery_tag == 1
- assert delay_message.payload == {'test': 'data'}
- expected_eta = datetime(2020, 1, 1, second=10, tzinfo=timezone.utc)
- assert delay_message.eta == expected_eta
- def test_delay_queue_put():
- fake_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- delay_message = DelayMessage(1, {'test': 'data'}, fake_now)
- delay_queue = DelayQueue()
- delay_queue.put(delay_message)
- assert delay_queue.qsize() == 1
- result_message = delay_queue.get()
- assert result_message is delay_message
- def test_delay_queue_put_same_eta():
- eta = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- delay_messages = [DelayMessage(delivery_tag, None, eta) for delivery_tag in range(10)]
- delay_queue = DelayQueue()
- for delay_message in delay_messages:
- delay_queue.put(delay_message)
- assert delay_queue.qsize() == 10
- assert delay_queue.get()
- def test_delay_queue_put_full():
- eta = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- delay_queue = DelayQueue(max_size=1)
- delay_queue.put(
- DelayMessage(1, None, eta),
- )
- with pytest.raises(Full):
- delay_queue.put(
- DelayMessage(2, None, eta),
- )
- assert delay_queue.qsize() == 1
- assert delay_queue.get().delivery_tag == 1
- def test_delay_queue_get_ready(mocker):
- fake_put_now = datetime(2020, 1, 1, second=0, tzinfo=timezone.utc)
- mocker.patch('django.utils.timezone.now', return_value=fake_put_now)
- delay_queue = DelayQueue()
- delay_messages = []
- for delay in (1, 0, 3600, 2):
- eta = fake_put_now + timedelta(seconds=delay)
- delay_message = DelayMessage(None, None, eta)
- delay_queue.put(delay_message)
- delay_messages.append(delay_message)
- mocker.stopall()
- fake_get_ready_now = datetime(2020, 1, 1, second=3, tzinfo=timezone.utc)
- mocker.patch('django.utils.timezone.now', return_value=fake_get_ready_now)
- ready_messages = list(delay_queue.get_ready())
- assert len(ready_messages) == 3
- sorted_expected = sorted(delay_messages, key=lambda x: x.eta)
- expected_not_ready = sorted_expected.pop()
- for expected, result in zip(sorted_expected, ready_messages):
- assert expected is result
- assert delay_queue.qsize() == 1
- result_message = delay_queue.get()
- assert result_message is expected_not_ready
- def test_delay_queue_invalid_max_size():
- with pytest.raises(AssertionError) as e:
- DelayQueue(max_size=0)
- assert e.value.args[0] == 'Delay queue max_size should be positive integer.'
|