cqrs_dead_letters.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import ujson
  3. from django.core.management.base import BaseCommand, CommandError
  4. from dj_cqrs.constants import DEFAULT_MASTER_MESSAGE_TTL
  5. from dj_cqrs.dataclasses import TransportPayload
  6. from dj_cqrs.registries import ReplicaRegistry
  7. from dj_cqrs.transport import current_transport
  8. from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
  9. from dj_cqrs.utils import get_message_expiration_dt
  10. class RabbitMQTransportService(RabbitMQTransport):
  11. @classmethod
  12. def get_consumer_settings(cls):
  13. return cls._get_consumer_settings()
  14. @classmethod
  15. def get_common_settings(cls):
  16. return cls._get_common_settings()
  17. @classmethod
  18. def create_connection(cls, host, port, creds, exchange):
  19. return cls._create_connection(host, port, creds, exchange)
  20. @classmethod
  21. def declare_queue(cls, channel, queue_name):
  22. return channel.queue_declare(queue_name, durable=True, exclusive=False)
  23. @classmethod
  24. def nack(cls, channel, delivery_tag, payload=None):
  25. return cls._nack(channel, delivery_tag, payload)
  26. class Command(BaseCommand):
  27. help = 'CQRS dead letters queue management commands'
  28. def add_arguments(self, parser):
  29. command = parser.add_subparsers(dest='command')
  30. command.required = True
  31. command.add_parser('retry', help='Retry all dead letters.')
  32. command.add_parser('dump', help='Dumps all dead letter to stdout.')
  33. command.add_parser('purge', help='Removes all dead letters.')
  34. def handle(self, *args, **options):
  35. self.check_transport()
  36. channel, connection = self.init_broker()
  37. queue_name, dead_letter_queue_name, *_ = RabbitMQTransportService.get_consumer_settings()
  38. dead_letters_queue = RabbitMQTransportService.declare_queue(
  39. channel,
  40. dead_letter_queue_name,
  41. )
  42. dead_letters_count = dead_letters_queue.method.message_count
  43. consumer_generator = channel.consume(
  44. queue=dead_letter_queue_name,
  45. auto_ack=False,
  46. exclusive=False,
  47. )
  48. command = options['command']
  49. if command == 'retry':
  50. self.handle_retry(channel, consumer_generator, dead_letters_count)
  51. elif command == 'dump':
  52. self.handle_dump(consumer_generator, dead_letters_count)
  53. elif command == 'purge':
  54. self.handle_purge(channel, dead_letter_queue_name, dead_letters_count)
  55. if not connection.is_closed:
  56. connection.close()
  57. def check_transport(self):
  58. if not issubclass(current_transport, RabbitMQTransport):
  59. raise CommandError('Dead letters commands available only for RabbitMQTransport.')
  60. def init_broker(self):
  61. host, port, creds, exchange = RabbitMQTransportService.get_common_settings()
  62. connection, channel = RabbitMQTransportService.create_connection(
  63. host,
  64. port,
  65. creds,
  66. exchange,
  67. )
  68. queue_name, dead_letter_queue_name, *_ = RabbitMQTransportService.get_consumer_settings()
  69. RabbitMQTransportService.declare_queue(channel, queue_name)
  70. RabbitMQTransportService.declare_queue(channel, dead_letter_queue_name)
  71. for cqrs_id, _ in ReplicaRegistry.models.items():
  72. channel.queue_bind(exchange=exchange, queue=queue_name, routing_key=cqrs_id)
  73. # Every service must have specific SYNC or requeue routes
  74. channel.queue_bind(
  75. exchange=exchange,
  76. queue=queue_name,
  77. routing_key='cqrs.{0}.{1}'.format(queue_name, cqrs_id),
  78. )
  79. return channel, connection
  80. def handle_retry(self, channel, consumer_generator, dead_letters_count):
  81. self.stdout.write('Total dead letters: {0}'.format(dead_letters_count))
  82. for i in range(1, dead_letters_count + 1):
  83. self.stdout.write('Retrying: {0}/{1}'.format(i, dead_letters_count))
  84. method_frame, properties, body = next(consumer_generator)
  85. dct = ujson.loads(body)
  86. dct['retries'] = 0
  87. if dct.get('expires'):
  88. # Message could expire already
  89. expires = get_message_expiration_dt(DEFAULT_MASTER_MESSAGE_TTL)
  90. dct['expires'] = expires.replace(microsecond=0).isoformat()
  91. payload = TransportPayload.from_message(dct)
  92. payload.is_requeue = True
  93. RabbitMQTransportService.produce(payload)
  94. message = ujson.dumps(dct)
  95. self.stdout.write(message)
  96. RabbitMQTransportService.nack(channel, method_frame.delivery_tag)
  97. def handle_dump(self, consumer_generator, dead_letters_count):
  98. for _ in range(1, dead_letters_count + 1):
  99. *_, body = next(consumer_generator)
  100. self.stdout.write(body.decode('utf-8'))
  101. def handle_purge(self, channel, dead_letter_queue_name, dead_letter_count):
  102. self.stdout.write('Total dead letters: {0}'.format(dead_letter_count))
  103. if dead_letter_count > 0:
  104. channel.queue_purge(dead_letter_queue_name)
  105. self.stdout.write('Purged')