cqrs_deleted_diff_replica.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import ujson
  3. from django.core.exceptions import FieldError
  4. from django.core.management.base import BaseCommand, CommandError
  5. from django.utils.timezone import now
  6. from dj_cqrs.management.utils import batch_qs
  7. from dj_cqrs.registries import ReplicaRegistry
  8. class Command(BaseCommand):
  9. help = 'Streaming diff of CQRS model pks from replica service to check for deleted objects.'
  10. @classmethod
  11. def serialize_package(cls, package):
  12. return ujson.dumps(package)
  13. def add_arguments(self, parser):
  14. parser.add_argument(
  15. '--cqrs-id',
  16. '-cid',
  17. help='CQRS_ID of the replica model',
  18. type=str,
  19. required=True,
  20. )
  21. parser.add_argument(
  22. '--filter',
  23. '-f',
  24. help='Filter kwargs',
  25. type=str,
  26. default=None,
  27. )
  28. parser.add_argument(
  29. '--batch',
  30. '-b',
  31. help='Batch size',
  32. type=int,
  33. default=10000,
  34. )
  35. def handle(self, *args, **options):
  36. model = self._get_model(options)
  37. batch_size = self._get_batch_size(options)
  38. qs = model._default_manager.values().order_by()
  39. if options['filter']:
  40. try:
  41. kwargs = ujson.loads(options['filter'])
  42. if not isinstance(kwargs, dict):
  43. raise ValueError
  44. except ValueError:
  45. raise CommandError('Bad filter kwargs!')
  46. try:
  47. qs = qs.filter(**kwargs)
  48. except FieldError as e:
  49. raise CommandError('Bad filter kwargs! {0}'.format(str(e)))
  50. if not qs.exists():
  51. self.stderr.write('No objects found for filter!')
  52. return
  53. current_dt = now()
  54. self.stdout.write('{0},{1}'.format(model.CQRS_ID, str(current_dt)))
  55. for bqs in batch_qs(qs.values_list('pk', flat=True), batch_size=batch_size):
  56. self.stdout.write(self.serialize_package(list(bqs)))
  57. @staticmethod
  58. def _get_model(options):
  59. cqrs_id = options['cqrs_id']
  60. model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
  61. if not model:
  62. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  63. return model
  64. @staticmethod
  65. def _get_batch_size(options):
  66. return options['batch']