123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import copy
- import logging
- from contextlib import ExitStack
- from django.conf import settings
- from django.db import Error, close_old_connections, transaction
- from dj_cqrs.constants import SignalType
- from dj_cqrs.logger import log_timed_out_queries
- from dj_cqrs.registries import ReplicaRegistry
- from dj_cqrs.utils import apply_query_timeouts
- logger = logging.getLogger('django-cqrs')
- def consume(payload):
- """Consumer controller.
- :param dj_cqrs.dataclasses.TransportPayload payload: Consumed payload from master service.
- """
- payload = copy.deepcopy(payload)
- return route_signal_to_replica_model(
- payload.signal_type,
- payload.cqrs_id,
- payload.instance_data,
- previous_data=payload.previous_data,
- meta=payload.meta,
- queue=payload.queue,
- )
- def route_signal_to_replica_model(
- signal_type,
- cqrs_id,
- instance_data,
- previous_data=None,
- meta=None,
- queue=None,
- ):
- """Routes signal to model method to create/update/delete replica instance.
- :param dj_cqrs.constants.SignalType signal_type: Consumed signal type.
- :param str cqrs_id: Replica model CQRS unique identifier.
- :param dict instance_data: Master model data.
- :param dict or None previous_data: Previous model data for changed tracked fields, if exists.
- :param dict or None meta: Payload metadata, if exists.
- :param str or None queue: Synced queue.
- """
- if signal_type not in (SignalType.DELETE, SignalType.SAVE, SignalType.SYNC):
- logger.error('Bad signal type "{0}" for CQRS_ID "{1}".'.format(signal_type, cqrs_id))
- return
- model_cls = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
- if not model_cls:
- return
- this_queue = settings.CQRS['queue']
- if signal_type == SignalType.SYNC and model_cls.CQRS_ONLY_DIRECT_SYNCS and queue != this_queue:
- return True
- db_is_needed = not model_cls.CQRS_NO_DB_OPERATIONS
- if db_is_needed:
- close_old_connections()
- is_meta_supported = model_cls.CQRS_META
- try:
- if db_is_needed:
- apply_query_timeouts(model_cls)
- with transaction.atomic(savepoint=False) if db_is_needed else ExitStack():
- if signal_type == SignalType.DELETE:
- if is_meta_supported:
- return model_cls.cqrs_delete(instance_data, meta=meta)
- return model_cls.cqrs_delete(instance_data)
- f_kw = {'previous_data': previous_data}
- if is_meta_supported:
- f_kw['meta'] = meta
- if signal_type == SignalType.SAVE:
- return model_cls.cqrs_save(instance_data, **f_kw)
- if signal_type == SignalType.SYNC:
- f_kw['sync'] = True
- return model_cls.cqrs_save(instance_data, **f_kw)
- except Error as e:
- pk_name = getattr(model_cls._meta.pk, 'name', 'id')
- pk_value = instance_data.get(pk_name)
- cqrs_revision = instance_data.get('cqrs_revision')
- logger.error(
- '{0}\nCQRS {1} error: pk = {2}, cqrs_revision = {3} ({4}).'.format(
- str(e),
- signal_type,
- pk_value,
- cqrs_revision,
- model_cls.CQRS_ID,
- ),
- )
- log_timed_out_queries(e, model_cls)
|