utils.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from datetime import date, datetime, timedelta
  4. from uuid import UUID
  5. from django.conf import settings
  6. from django.db import transaction
  7. from django.utils import timezone
  8. from dj_cqrs.constants import DB_VENDOR_PG, SUPPORTED_TIMEOUT_DB_VENDORS
  9. from dj_cqrs.logger import install_last_query_capturer
  10. logger = logging.getLogger('django-cqrs')
  11. def get_message_expiration_dt(message_ttl=None):
  12. """Calculates when message should expire.
  13. :param int or None message_ttl:
  14. :return: Expiration datetime or None if infinite
  15. :rtype: datetime.datetime or None
  16. """
  17. message_ttl = message_ttl or settings.CQRS['master']['CQRS_MESSAGE_TTL']
  18. if message_ttl is None:
  19. # Infinite
  20. return
  21. return timezone.now() + timedelta(seconds=message_ttl)
  22. def get_delay_queue_max_size():
  23. """Returns max allowed number of "waiting" messages in the delay queue.
  24. :return: Positive integer number or None if infinite
  25. :rtype: int
  26. """
  27. if 'replica' not in settings.CQRS:
  28. return None
  29. return settings.CQRS['replica']['delay_queue_max_size']
  30. def get_messages_prefetch_count_per_worker():
  31. """Returns max allowed number of unacked messages, that can be consumed by a single worker.
  32. :return: Positive integer number or 0 if infinite
  33. :rtype: int
  34. """
  35. delay_queue_max_size = get_delay_queue_max_size()
  36. if delay_queue_max_size is None:
  37. # Infinite
  38. return 0
  39. return delay_queue_max_size + 1
  40. def get_json_valid_value(value):
  41. return str(value) if isinstance(value, (date, datetime, UUID)) else value
  42. def apply_query_timeouts(model_cls): # pragma: no cover
  43. query_timeout = int(settings.CQRS['replica'].get('CQRS_QUERY_TIMEOUT', 0))
  44. if query_timeout <= 0:
  45. return
  46. model_db = model_cls._default_manager.db
  47. conn = transaction.get_connection(using=model_db)
  48. conn_vendor = getattr(conn, 'vendor', '')
  49. if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS:
  50. return
  51. if conn_vendor == DB_VENDOR_PG:
  52. statement = 'SET statement_timeout TO %s'
  53. else:
  54. statement = 'SET SESSION MAX_EXECUTION_TIME=%s'
  55. with conn.cursor() as cursor:
  56. cursor.execute(statement, params=(query_timeout,))
  57. install_last_query_capturer(model_cls)