_validation.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. from inspect import getfullargspec, isfunction
  4. from django.utils.module_loading import import_string
  5. from dj_cqrs.constants import (
  6. DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
  7. DEFAULT_MASTER_MESSAGE_TTL,
  8. DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
  9. DEFAULT_REPLICA_MAX_RETRIES,
  10. DEFAULT_REPLICA_RETRY_DELAY,
  11. )
  12. from dj_cqrs.registries import MasterRegistry, ReplicaRegistry
  13. from dj_cqrs.transport import BaseTransport
  14. logger = logging.getLogger('django-cqrs')
  15. def validate_settings(settings):
  16. is_master = bool(MasterRegistry.models)
  17. is_replica = bool(ReplicaRegistry.models)
  18. if (not is_master) and (not is_replica): # pragma: no cover
  19. return
  20. assert hasattr(settings, 'CQRS'), 'CQRS configuration must be set in Django project settings.'
  21. cqrs_settings = settings.CQRS
  22. assert isinstance(cqrs_settings, dict), 'CQRS configuration must be dict.'
  23. _validate_transport(cqrs_settings)
  24. if is_master or ('master' in cqrs_settings):
  25. _validate_master(cqrs_settings)
  26. if is_replica or ('replica' in cqrs_settings):
  27. _validate_replica(cqrs_settings)
  28. def _validate_transport(cqrs_settings):
  29. transport_cls_location = cqrs_settings.get('transport')
  30. if not transport_cls_location:
  31. raise AssertionError('CQRS transport is not set.')
  32. transport = import_string(transport_cls_location)
  33. if not issubclass(transport, BaseTransport):
  34. raise AssertionError(
  35. 'CQRS transport must be inherited from `dj_cqrs.transport.BaseTransport`.',
  36. )
  37. def _validate_master(cqrs_settings):
  38. default_master_settings = {
  39. 'master': {
  40. 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
  41. 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
  42. 'correlation_function': None,
  43. 'meta_function': None,
  44. },
  45. }
  46. if 'master' not in cqrs_settings:
  47. cqrs_settings.update(default_master_settings)
  48. return
  49. master_settings = cqrs_settings['master']
  50. assert isinstance(master_settings, dict), 'CQRS master configuration must be dict.'
  51. _validate_master_auto_update_fields(master_settings)
  52. _validate_master_message_ttl(master_settings)
  53. _validate_master_correlation_func(master_settings)
  54. _validate_master_meta_func(master_settings)
  55. def _validate_master_auto_update_fields(master_settings):
  56. if 'CQRS_AUTO_UPDATE_FIELDS' in master_settings:
  57. assert isinstance(
  58. master_settings['CQRS_AUTO_UPDATE_FIELDS'],
  59. bool,
  60. ), 'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.'
  61. else:
  62. master_settings['CQRS_AUTO_UPDATE_FIELDS'] = DEFAULT_MASTER_AUTO_UPDATE_FIELDS
  63. def _validate_master_message_ttl(master_settings):
  64. if 'CQRS_MESSAGE_TTL' in master_settings:
  65. min_message_ttl = 1
  66. message_ttl = master_settings['CQRS_MESSAGE_TTL']
  67. if (message_ttl is not None) and (
  68. not isinstance(message_ttl, int) or message_ttl < min_message_ttl
  69. ):
  70. # No error is raised for backward compatibility
  71. # TODO: raise error in 2.0.0
  72. logger.warning(
  73. 'Settings CQRS_MESSAGE_TTL=%s is invalid, using default %s.',
  74. message_ttl,
  75. DEFAULT_MASTER_MESSAGE_TTL,
  76. )
  77. master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL
  78. else:
  79. master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL
  80. def _validate_master_correlation_func(master_settings):
  81. correlation_func = master_settings.get('correlation_function')
  82. if not correlation_func:
  83. master_settings['correlation_function'] = None
  84. elif not callable(correlation_func):
  85. raise AssertionError('CQRS master correlation_function must be callable.')
  86. def _validate_master_meta_func(master_settings):
  87. meta_func = master_settings.get('meta_function')
  88. if not meta_func:
  89. master_settings['meta_function'] = None
  90. return
  91. if isinstance(meta_func, str):
  92. try:
  93. meta_func = import_string(meta_func)
  94. except ImportError:
  95. raise AssertionError('CQRS master meta_function import error.')
  96. if not isfunction(meta_func):
  97. raise AssertionError('CQRS master meta_function must be function.')
  98. r = getfullargspec(meta_func)
  99. if not r.varkw:
  100. raise AssertionError('CQRS master meta_function must support **kwargs.')
  101. master_settings['meta_function'] = meta_func
  102. def _validate_replica(cqrs_settings):
  103. queue = cqrs_settings.get('queue')
  104. assert queue, 'CQRS queue is not set.'
  105. assert isinstance(queue, str), 'CQRS queue must be string.'
  106. default_replica_settings = {
  107. 'replica': {
  108. 'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES,
  109. 'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY,
  110. 'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
  111. },
  112. }
  113. if 'replica' not in cqrs_settings:
  114. cqrs_settings.update(default_replica_settings)
  115. return
  116. replica_settings = cqrs_settings['replica']
  117. assert isinstance(replica_settings, dict), 'CQRS replica configuration must be dict.'
  118. _validate_replica_max_retries(replica_settings)
  119. _validate_replica_retry_delay(replica_settings)
  120. _validate_replica_delay_queue_max_size(replica_settings)
  121. def _validate_replica_max_retries(replica_settings):
  122. if 'CQRS_MAX_RETRIES' in replica_settings:
  123. min_retries = 0
  124. max_retries = replica_settings['CQRS_MAX_RETRIES']
  125. if (max_retries is not None) and (
  126. not isinstance(max_retries, int) or max_retries < min_retries
  127. ):
  128. # No error is raised for backward compatibility
  129. # TODO: raise error in 2.0.0
  130. logger.warning(
  131. 'Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s.',
  132. max_retries,
  133. DEFAULT_REPLICA_MAX_RETRIES,
  134. )
  135. replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES
  136. else:
  137. replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES
  138. def _validate_replica_retry_delay(replica_settings):
  139. min_retry_delay = 0
  140. retry_delay = replica_settings.get('CQRS_RETRY_DELAY')
  141. if 'CQRS_RETRY_DELAY' not in replica_settings:
  142. replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY
  143. elif not isinstance(retry_delay, int) or retry_delay < min_retry_delay:
  144. # No error is raised for backward compatibility
  145. # TODO: raise error in 2.0.0
  146. logger.warning(
  147. 'Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s.',
  148. retry_delay,
  149. DEFAULT_REPLICA_RETRY_DELAY,
  150. )
  151. replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY
  152. def _validate_replica_delay_queue_max_size(replica_settings):
  153. min_qsize = 0
  154. max_qsize = replica_settings.get('delay_queue_max_size')
  155. if 'delay_queue_max_size' not in replica_settings:
  156. max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE
  157. elif (max_qsize is not None) and (not isinstance(max_qsize, int) or max_qsize <= min_qsize):
  158. # No error is raised for backward compatibility
  159. # TODO: raise error in 2.0.0
  160. logger.warning(
  161. 'Settings delay_queue_max_size=%s is invalid, using default %s.',
  162. max_qsize,
  163. DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
  164. )
  165. max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE
  166. replica_settings['delay_queue_max_size'] = max_qsize