123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import datetime
- import sys
- import time
- import ujson
- from django.core.exceptions import FieldError
- from django.core.management.base import BaseCommand, CommandError
- from django.db import close_old_connections, connections
- from dj_cqrs.management.utils import batch_qs
- from dj_cqrs.registries import MasterRegistry
- DEFAULT_BATCH = 10000
- DEFAULT_PROGRESS = False
- class Command(BaseCommand):
- help = 'Filter synchronization of certain CQRS model rows over transport to replicas.'
- def add_arguments(self, parser):
- parser.add_argument(
- '--cqrs-id',
- '-cid',
- help='CQRS_ID of the master model',
- type=str,
- required=True,
- )
- parser.add_argument(
- '--filter',
- '-f',
- help='Filter kwargs',
- type=str,
- default=None,
- )
- parser.add_argument(
- '--queue',
- '-q',
- help='Name of the specific replica queue',
- type=str,
- default=None,
- )
- parser.add_argument(
- '--batch',
- '-b',
- help='Batch size',
- type=int,
- default=DEFAULT_BATCH,
- )
- parser.add_argument(
- '--progress',
- '-p',
- help='Display progress',
- action='store_true',
- )
- def handle(self, *args, **options):
- model = self._get_model(options)
- progress = self._get_progress(options)
- batch_size = self._get_batch_size(options)
- qs = self._prepare_qs(model, options)
- db_count = qs.count()
- if db_count == 0:
- print('No objects found for filter!')
- return
- counter, success_counter = 0, 0
- if progress:
- print('Processing {0} records with batch size {1}'.format(db_count, batch_size))
- for qs_ in batch_qs(model.relate_cqrs_serialization(qs), batch_size=batch_size):
- ts = time.time()
- cs = counter
- # check if must reconnect
- if not connections[qs_.db].is_usable():
- connections[qs_.db].connect()
- for instance in qs_:
- counter += 1
- try:
- instance.cqrs_sync(queue=options['queue'])
- success_counter += 1
- except Exception as e:
- print(
- '\nSync record failed for pk={0}: {1}: {2}'.format(
- instance.pk,
- type(e).__name__,
- str(e),
- ),
- )
- close_old_connections()
- if progress:
- rate = (counter - cs) / (time.time() - ts)
- percent = 100 * counter / db_count
- eta = datetime.timedelta(seconds=int((db_count - counter) / rate))
- sys.stdout.write(
- '\r{0} of {1} processed - {2}% with '
- 'rate {3:.1f} rps, to go {4} ...{5:20}'.format(
- counter,
- db_count,
- int(percent),
- rate,
- str(eta),
- ' ',
- ),
- )
- sys.stdout.flush()
- print(
- 'Done!\n{0} instance(s) synced.\n{1} instance(s) processed.'.format(
- success_counter,
- counter,
- ),
- )
- @staticmethod
- def _prepare_qs(model, options):
- qs = model._default_manager.none()
- if options['filter']:
- try:
- kwargs = ujson.loads(options['filter'])
- if not isinstance(kwargs, dict):
- raise ValueError
- except ValueError:
- raise CommandError('Bad filter kwargs!')
- try:
- qs = model._default_manager.filter(**kwargs).order_by()
- except FieldError as e:
- raise CommandError('Bad filter kwargs! {0}'.format(str(e)))
- return qs
- @staticmethod
- def _get_model(options):
- cqrs_id = options['cqrs_id']
- model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
- if not model:
- raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
- return model
- @staticmethod
- def _get_batch_size(options):
- return options.get('batch', DEFAULT_BATCH)
- @staticmethod
- def _get_progress(options):
- return bool(options.get('progress', DEFAULT_PROGRESS))
|