123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import logging
- from inspect import getfullargspec, isfunction
- from django.utils.module_loading import import_string
- from dj_cqrs.constants import (
- DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
- DEFAULT_MASTER_MESSAGE_TTL,
- DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
- DEFAULT_REPLICA_MAX_RETRIES,
- DEFAULT_REPLICA_RETRY_DELAY,
- )
- from dj_cqrs.registries import MasterRegistry, ReplicaRegistry
- from dj_cqrs.transport import BaseTransport
- logger = logging.getLogger('django-cqrs')
- def validate_settings(settings):
- is_master = bool(MasterRegistry.models)
- is_replica = bool(ReplicaRegistry.models)
- if (not is_master) and (not is_replica): # pragma: no cover
- return
- assert hasattr(settings, 'CQRS'), 'CQRS configuration must be set in Django project settings.'
- cqrs_settings = settings.CQRS
- assert isinstance(cqrs_settings, dict), 'CQRS configuration must be dict.'
- _validate_transport(cqrs_settings)
- if is_master or ('master' in cqrs_settings):
- _validate_master(cqrs_settings)
- if is_replica or ('replica' in cqrs_settings):
- _validate_replica(cqrs_settings)
- def _validate_transport(cqrs_settings):
- transport_cls_location = cqrs_settings.get('transport')
- if not transport_cls_location:
- raise AssertionError('CQRS transport is not set.')
- transport = import_string(transport_cls_location)
- if not issubclass(transport, BaseTransport):
- raise AssertionError(
- 'CQRS transport must be inherited from `dj_cqrs.transport.BaseTransport`.',
- )
- def _validate_master(cqrs_settings):
- default_master_settings = {
- 'master': {
- 'CQRS_AUTO_UPDATE_FIELDS': DEFAULT_MASTER_AUTO_UPDATE_FIELDS,
- 'CQRS_MESSAGE_TTL': DEFAULT_MASTER_MESSAGE_TTL,
- 'correlation_function': None,
- 'meta_function': None,
- },
- }
- if 'master' not in cqrs_settings:
- cqrs_settings.update(default_master_settings)
- return
- master_settings = cqrs_settings['master']
- assert isinstance(master_settings, dict), 'CQRS master configuration must be dict.'
- _validate_master_auto_update_fields(master_settings)
- _validate_master_message_ttl(master_settings)
- _validate_master_correlation_func(master_settings)
- _validate_master_meta_func(master_settings)
- def _validate_master_auto_update_fields(master_settings):
- if 'CQRS_AUTO_UPDATE_FIELDS' in master_settings:
- assert isinstance(
- master_settings['CQRS_AUTO_UPDATE_FIELDS'],
- bool,
- ), 'CQRS master CQRS_AUTO_UPDATE_FIELDS must be bool.'
- else:
- master_settings['CQRS_AUTO_UPDATE_FIELDS'] = DEFAULT_MASTER_AUTO_UPDATE_FIELDS
- def _validate_master_message_ttl(master_settings):
- if 'CQRS_MESSAGE_TTL' in master_settings:
- min_message_ttl = 1
- message_ttl = master_settings['CQRS_MESSAGE_TTL']
- if (message_ttl is not None) and (
- not isinstance(message_ttl, int) or message_ttl < min_message_ttl
- ):
- # No error is raised for backward compatibility
- # TODO: raise error in 2.0.0
- logger.warning(
- 'Settings CQRS_MESSAGE_TTL=%s is invalid, using default %s.',
- message_ttl,
- DEFAULT_MASTER_MESSAGE_TTL,
- )
- master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL
- else:
- master_settings['CQRS_MESSAGE_TTL'] = DEFAULT_MASTER_MESSAGE_TTL
- def _validate_master_correlation_func(master_settings):
- correlation_func = master_settings.get('correlation_function')
- if not correlation_func:
- master_settings['correlation_function'] = None
- elif not callable(correlation_func):
- raise AssertionError('CQRS master correlation_function must be callable.')
- def _validate_master_meta_func(master_settings):
- meta_func = master_settings.get('meta_function')
- if not meta_func:
- master_settings['meta_function'] = None
- return
- if isinstance(meta_func, str):
- try:
- meta_func = import_string(meta_func)
- except ImportError:
- raise AssertionError('CQRS master meta_function import error.')
- if not isfunction(meta_func):
- raise AssertionError('CQRS master meta_function must be function.')
- r = getfullargspec(meta_func)
- if not r.varkw:
- raise AssertionError('CQRS master meta_function must support **kwargs.')
- master_settings['meta_function'] = meta_func
- def _validate_replica(cqrs_settings):
- queue = cqrs_settings.get('queue')
- assert queue, 'CQRS queue is not set.'
- assert isinstance(queue, str), 'CQRS queue must be string.'
- default_replica_settings = {
- 'replica': {
- 'CQRS_MAX_RETRIES': DEFAULT_REPLICA_MAX_RETRIES,
- 'CQRS_RETRY_DELAY': DEFAULT_REPLICA_RETRY_DELAY,
- 'delay_queue_max_size': DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
- },
- }
- if 'replica' not in cqrs_settings:
- cqrs_settings.update(default_replica_settings)
- return
- replica_settings = cqrs_settings['replica']
- assert isinstance(replica_settings, dict), 'CQRS replica configuration must be dict.'
- _validate_replica_max_retries(replica_settings)
- _validate_replica_retry_delay(replica_settings)
- _validate_replica_delay_queue_max_size(replica_settings)
- def _validate_replica_max_retries(replica_settings):
- if 'CQRS_MAX_RETRIES' in replica_settings:
- min_retries = 0
- max_retries = replica_settings['CQRS_MAX_RETRIES']
- if (max_retries is not None) and (
- not isinstance(max_retries, int) or max_retries < min_retries
- ):
- # No error is raised for backward compatibility
- # TODO: raise error in 2.0.0
- logger.warning(
- 'Replica setting CQRS_MAX_RETRIES=%s is invalid, using default %s.',
- max_retries,
- DEFAULT_REPLICA_MAX_RETRIES,
- )
- replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES
- else:
- replica_settings['CQRS_MAX_RETRIES'] = DEFAULT_REPLICA_MAX_RETRIES
- def _validate_replica_retry_delay(replica_settings):
- min_retry_delay = 0
- retry_delay = replica_settings.get('CQRS_RETRY_DELAY')
- if 'CQRS_RETRY_DELAY' not in replica_settings:
- replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY
- elif not isinstance(retry_delay, int) or retry_delay < min_retry_delay:
- # No error is raised for backward compatibility
- # TODO: raise error in 2.0.0
- logger.warning(
- 'Replica setting CQRS_RETRY_DELAY=%s is invalid, using default %s.',
- retry_delay,
- DEFAULT_REPLICA_RETRY_DELAY,
- )
- replica_settings['CQRS_RETRY_DELAY'] = DEFAULT_REPLICA_RETRY_DELAY
- def _validate_replica_delay_queue_max_size(replica_settings):
- min_qsize = 0
- max_qsize = replica_settings.get('delay_queue_max_size')
- if 'delay_queue_max_size' not in replica_settings:
- max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE
- elif (max_qsize is not None) and (not isinstance(max_qsize, int) or max_qsize <= min_qsize):
- # No error is raised for backward compatibility
- # TODO: raise error in 2.0.0
- logger.warning(
- 'Settings delay_queue_max_size=%s is invalid, using default %s.',
- max_qsize,
- DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE,
- )
- max_qsize = DEFAULT_REPLICA_DELAY_QUEUE_MAX_SIZE
- replica_settings['delay_queue_max_size'] = max_qsize
|