123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import ujson
- from django.core.exceptions import FieldError
- from django.core.management.base import BaseCommand, CommandError
- from django.utils.timezone import now
- from dj_cqrs.management.utils import batch_qs
- from dj_cqrs.registries import MasterRegistry
- class Command(BaseCommand):
- help = 'Streaming diff of CQRS models from master service.'
- @classmethod
- def serialize_package(cls, package):
- return ujson.dumps(package)
- def add_arguments(self, parser):
- parser.add_argument(
- '--cqrs-id',
- '-cid',
- help='CQRS_ID of the master model',
- type=str,
- required=True,
- )
- parser.add_argument(
- '--filter',
- '-f',
- help='Filter kwargs',
- type=str,
- default=None,
- )
- parser.add_argument(
- '--batch',
- '-b',
- help='Batch size',
- type=int,
- default=10000,
- )
- def handle(self, *args, **options):
- model = self._get_model(options)
- batch_size = self._get_batch_size(options)
- qs = model._default_manager.all().order_by()
- if options['filter']:
- try:
- kwargs = ujson.loads(options['filter'])
- if not isinstance(kwargs, dict):
- raise ValueError
- except ValueError:
- raise CommandError('Bad filter kwargs!')
- try:
- qs = qs.filter(**kwargs)
- except FieldError as e:
- raise CommandError('Bad filter kwargs! {0}'.format(str(e)))
- if not qs.exists():
- self.stderr.write('No objects found for filter!')
- return
- current_dt = now()
- self.stdout.write('{0},{1}'.format(model.CQRS_ID, str(current_dt)))
- for bqs in batch_qs(qs, batch_size=batch_size):
- package = [
- [instance.pk, instance.cqrs_revision]
- for instance in bqs
- if instance.is_sync_instance()
- ]
- self.stdout.write(self.serialize_package(package))
- @staticmethod
- def _get_model(options):
- cqrs_id = options['cqrs_id']
- model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
- if not model:
- raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
- return model
- @staticmethod
- def _get_batch_size(options):
- return options['batch']
|