1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- # Copyright © 2023 Ingram Micro Inc. All rights reserved.
- import sys
- from django.core.management.base import BaseCommand, CommandError
- from dj_cqrs.constants import NO_QUEUE
- from dj_cqrs.management.commands.cqrs_sync import (
- DEFAULT_BATCH,
- DEFAULT_PROGRESS,
- Command as SyncCommand,
- )
- from dj_cqrs.registries import MasterRegistry
- class Command(BaseCommand):
- help = 'Diff synchronizer from CQRS replica stream.'
- def add_arguments(self, parser):
- parser.add_argument(
- '--batch',
- '-b',
- help='Batch size',
- type=int,
- default=DEFAULT_BATCH,
- )
- parser.add_argument(
- '--progress',
- '-p',
- help='Display progress',
- action='store_true',
- )
- def handle(self, *args, **options):
- progress = self._get_progress(options)
- batch_size = self._get_batch_size(options)
- with sys.stdin as f:
- first_line = f.readline().strip()
- model = self._get_model(first_line)
- queue = self._get_queue(first_line)
- for pks_line in f:
- sync_kwargs = {
- 'cqrs_id': model.CQRS_ID,
- 'filter': '{{"id__in": {0}}}'.format(pks_line.strip()),
- 'progress': progress,
- 'batch': batch_size,
- }
- if queue:
- sync_kwargs['queue'] = queue
- SyncCommand().handle(**sync_kwargs)
- @staticmethod
- def _get_model(first_line):
- cqrs_id = first_line.split(',')[0]
- model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
- if not model:
- raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
- return model
- @staticmethod
- def _get_queue(first_line):
- queue = first_line.split(',')[-1]
- if queue != NO_QUEUE:
- return queue
- @staticmethod
- def _get_batch_size(options):
- return options.get('batch', DEFAULT_BATCH)
- @staticmethod
- def _get_progress(options):
- return bool(options.get('progress', DEFAULT_PROGRESS))
|