cqrs_diff_sync.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import sys
  3. from django.core.management.base import BaseCommand, CommandError
  4. from dj_cqrs.constants import NO_QUEUE
  5. from dj_cqrs.management.commands.cqrs_sync import (
  6. DEFAULT_BATCH,
  7. DEFAULT_PROGRESS,
  8. Command as SyncCommand,
  9. )
  10. from dj_cqrs.registries import MasterRegistry
  11. class Command(BaseCommand):
  12. help = 'Diff synchronizer from CQRS replica stream.'
  13. def add_arguments(self, parser):
  14. parser.add_argument(
  15. '--batch',
  16. '-b',
  17. help='Batch size',
  18. type=int,
  19. default=DEFAULT_BATCH,
  20. )
  21. parser.add_argument(
  22. '--progress',
  23. '-p',
  24. help='Display progress',
  25. action='store_true',
  26. )
  27. def handle(self, *args, **options):
  28. progress = self._get_progress(options)
  29. batch_size = self._get_batch_size(options)
  30. with sys.stdin as f:
  31. first_line = f.readline().strip()
  32. model = self._get_model(first_line)
  33. queue = self._get_queue(first_line)
  34. for pks_line in f:
  35. sync_kwargs = {
  36. 'cqrs_id': model.CQRS_ID,
  37. 'filter': '{{"id__in": {0}}}'.format(pks_line.strip()),
  38. 'progress': progress,
  39. 'batch': batch_size,
  40. }
  41. if queue:
  42. sync_kwargs['queue'] = queue
  43. SyncCommand().handle(**sync_kwargs)
  44. @staticmethod
  45. def _get_model(first_line):
  46. cqrs_id = first_line.split(',')[0]
  47. model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
  48. if not model:
  49. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  50. return model
  51. @staticmethod
  52. def _get_queue(first_line):
  53. queue = first_line.split(',')[-1]
  54. if queue != NO_QUEUE:
  55. return queue
  56. @staticmethod
  57. def _get_batch_size(options):
  58. return options.get('batch', DEFAULT_BATCH)
  59. @staticmethod
  60. def _get_progress(options):
  61. return bool(options.get('progress', DEFAULT_PROGRESS))