| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 | #  Copyright © 2023 Ingram Micro Inc. All rights reserved.import datetimeimport sysimport timeimport ujsonfrom django.core.exceptions import FieldErrorfrom django.core.management.base import BaseCommand, CommandErrorfrom django.db import close_old_connections, connectionsfrom dj_cqrs.management.utils import batch_qsfrom dj_cqrs.registries import MasterRegistryDEFAULT_BATCH = 10000DEFAULT_PROGRESS = Falseclass Command(BaseCommand):    help = 'Filter synchronization of certain CQRS model rows over transport to replicas.'    def add_arguments(self, parser):        parser.add_argument(            '--cqrs-id',            '-cid',            help='CQRS_ID of the master model',            type=str,            required=True,        )        parser.add_argument(            '--filter',            '-f',            help='Filter kwargs',            type=str,            default=None,        )        parser.add_argument(            '--queue',            '-q',            help='Name of the specific replica queue',            type=str,            default=None,        )        parser.add_argument(            '--batch',            '-b',            help='Batch size',            type=int,            default=DEFAULT_BATCH,        )        parser.add_argument(            '--progress',            '-p',            help='Display progress',            action='store_true',        )    def handle(self, *args, **options):        model = self._get_model(options)        progress = self._get_progress(options)        batch_size = self._get_batch_size(options)        qs = self._prepare_qs(model, options)        db_count = qs.count()        if db_count == 0:            print('No objects found for filter!')            return        counter, success_counter = 0, 0        if progress:            print('Processing {0} records with batch size {1}'.format(db_count, batch_size))        for qs_ in batch_qs(model.relate_cqrs_serialization(qs), batch_size=batch_size):            ts = time.time()            cs = counter            # check if must reconnect            if not connections[qs_.db].is_usable():                connections[qs_.db].connect()            for instance in qs_:                counter += 1                try:                    instance.cqrs_sync(queue=options['queue'])                    success_counter += 1                except Exception as e:                    print(                        '\nSync record failed for pk={0}: {1}: {2}'.format(                            instance.pk,                            type(e).__name__,                            str(e),                        ),                    )                    close_old_connections()            if progress:                rate = (counter - cs) / (time.time() - ts)                percent = 100 * counter / db_count                eta = datetime.timedelta(seconds=int((db_count - counter) / rate))                sys.stdout.write(                    '\r{0} of {1} processed - {2}% with '                    'rate {3:.1f} rps, to go {4} ...{5:20}'.format(                        counter,                        db_count,                        int(percent),                        rate,                        str(eta),                        ' ',                    ),                )                sys.stdout.flush()        print(            'Done!\n{0} instance(s) synced.\n{1} instance(s) processed.'.format(                success_counter,                counter,            ),        )    @staticmethod    def _prepare_qs(model, options):        qs = model._default_manager.none()        if options['filter']:            try:                kwargs = ujson.loads(options['filter'])                if not isinstance(kwargs, dict):                    raise ValueError            except ValueError:                raise CommandError('Bad filter kwargs!')            try:                qs = model._default_manager.filter(**kwargs).order_by()            except FieldError as e:                raise CommandError('Bad filter kwargs! {0}'.format(str(e)))        return qs    @staticmethod    def _get_model(options):        cqrs_id = options['cqrs_id']        model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)        if not model:            raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))        return model    @staticmethod    def _get_batch_size(options):        return options.get('batch', DEFAULT_BATCH)    @staticmethod    def _get_progress(options):        return bool(options.get('progress', DEFAULT_PROGRESS))
 |