transport.py 1.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import os
  3. from dj_cqrs.controller import consumer
  4. from dj_cqrs.transport.base import BaseTransport
  5. from dj_cqrs.transport.kombu import KombuTransport
  6. from dj_cqrs.transport.rabbit_mq import RabbitMQTransport
  7. class TransportStub(BaseTransport):
  8. @staticmethod
  9. def produce(payload):
  10. TransportStub.consume(payload)
  11. @staticmethod
  12. def consume(payload=None):
  13. if payload:
  14. return consumer.consume(payload)
  15. class RabbitMQTransportWithEvents(RabbitMQTransport):
  16. @staticmethod
  17. def log_consumed(payload):
  18. from tests.dj_replica.models import Event
  19. Event.objects.create(
  20. pid=os.getpid(),
  21. cqrs_id=payload.cqrs_id,
  22. cqrs_revision=int(payload.instance_data['cqrs_revision']),
  23. )
  24. class KombuTransportWithEvents(KombuTransport):
  25. @staticmethod
  26. def log_consumed(payload):
  27. from tests.dj_replica.models import Event
  28. Event.objects.create(
  29. pid=os.getpid(),
  30. cqrs_id=payload.cqrs_id,
  31. cqrs_revision=int(payload.instance_data['cqrs_revision']),
  32. )