cqrs_diff_master.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import ujson
  3. from django.core.exceptions import FieldError
  4. from django.core.management.base import BaseCommand, CommandError
  5. from django.utils.timezone import now
  6. from dj_cqrs.management.utils import batch_qs
  7. from dj_cqrs.registries import MasterRegistry
  8. class Command(BaseCommand):
  9. help = 'Streaming diff of CQRS models from master service.'
  10. @classmethod
  11. def serialize_package(cls, package):
  12. return ujson.dumps(package)
  13. def add_arguments(self, parser):
  14. parser.add_argument(
  15. '--cqrs-id',
  16. '-cid',
  17. help='CQRS_ID of the master model',
  18. type=str,
  19. required=True,
  20. )
  21. parser.add_argument(
  22. '--filter',
  23. '-f',
  24. help='Filter kwargs',
  25. type=str,
  26. default=None,
  27. )
  28. parser.add_argument(
  29. '--batch',
  30. '-b',
  31. help='Batch size',
  32. type=int,
  33. default=10000,
  34. )
  35. def handle(self, *args, **options):
  36. model = self._get_model(options)
  37. batch_size = self._get_batch_size(options)
  38. qs = model._default_manager.all().order_by()
  39. if options['filter']:
  40. try:
  41. kwargs = ujson.loads(options['filter'])
  42. if not isinstance(kwargs, dict):
  43. raise ValueError
  44. except ValueError:
  45. raise CommandError('Bad filter kwargs!')
  46. try:
  47. qs = qs.filter(**kwargs)
  48. except FieldError as e:
  49. raise CommandError('Bad filter kwargs! {0}'.format(str(e)))
  50. if not qs.exists():
  51. self.stderr.write('No objects found for filter!')
  52. return
  53. current_dt = now()
  54. self.stdout.write('{0},{1}'.format(model.CQRS_ID, str(current_dt)))
  55. for bqs in batch_qs(qs, batch_size=batch_size):
  56. package = [
  57. [instance.pk, instance.cqrs_revision]
  58. for instance in bqs
  59. if instance.is_sync_instance()
  60. ]
  61. self.stdout.write(self.serialize_package(package))
  62. @staticmethod
  63. def _get_model(options):
  64. cqrs_id = options['cqrs_id']
  65. model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
  66. if not model:
  67. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  68. return model
  69. @staticmethod
  70. def _get_batch_size(options):
  71. return options['batch']