delay.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. from queue import Full, PriorityQueue
  3. from django.utils import timezone
  4. class DelayMessage:
  5. """Delay message.
  6. :param delivery_tag: The server-assigned and channel-specific delivery tag.
  7. :type delivery_tag: int
  8. :param payload: Transport payload.
  9. :type payload: dj_cqrs.dataclasses.TransportPayload
  10. :param eta: Time after which the message should be requeued.
  11. :type eta: datetime.datetime
  12. """
  13. def __init__(self, delivery_tag, payload, eta):
  14. self.delivery_tag = delivery_tag
  15. self.payload = payload
  16. self.eta = eta
  17. class DelayQueue:
  18. """Queue for delay messages."""
  19. def __init__(self, max_size=None):
  20. if max_size is not None:
  21. assert max_size > 0, 'Delay queue max_size should be positive integer.'
  22. self._max_size = max_size
  23. self._queue = PriorityQueue()
  24. def get(self):
  25. """
  26. :rtype: DelayMessage
  27. """
  28. *_, delay_message = self._queue.get()
  29. return delay_message
  30. def get_ready(self):
  31. """Returns messages with expired ETA.
  32. :return: delayed messages generator
  33. :rtype: typing.Generator[DelayMessage]
  34. """
  35. while self.qsize():
  36. delay_message = self.get()
  37. if delay_message.eta > timezone.now():
  38. # Queue is ordered by message ETA.
  39. # Remaining messages should wait longer, we don't check them.
  40. self.put(delay_message)
  41. break
  42. yield delay_message
  43. def put(self, delay_message):
  44. """Adds message to queue.
  45. :param delay_message: DelayMessage instance.
  46. :type delay_message: DelayMessage
  47. """
  48. assert isinstance(delay_message, DelayMessage)
  49. if self.full():
  50. raise Full('Delay queue is full')
  51. self._queue.put(
  52. (
  53. delay_message.eta.timestamp(),
  54. delay_message.delivery_tag,
  55. delay_message,
  56. ),
  57. )
  58. def qsize(self):
  59. return self._queue.qsize()
  60. def full(self):
  61. return self._max_size is not None and self.qsize() >= self._max_size