cqrs_diff_replica.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import sys
  3. import ujson
  4. from django.conf import settings
  5. from django.core.management.base import BaseCommand, CommandError
  6. from dj_cqrs.registries import ReplicaRegistry
  7. class Command(BaseCommand):
  8. help = 'Diff of CQRS replica models from master diff stream.'
  9. @classmethod
  10. def deserialize_in(cls, package_line):
  11. return dict(ujson.loads(package_line))
  12. @classmethod
  13. def serialize_out(cls, ids):
  14. return ujson.dumps(ids)
  15. def handle(self, *args, **options):
  16. with sys.stdin as f:
  17. first_line = f.readline()
  18. model = self._get_model(first_line)
  19. self.stdout.write('{0},{1}'.format(first_line.strip(), settings.CQRS.get('queue')))
  20. for package_line in f:
  21. master_data = self.deserialize_in(package_line)
  22. qs = (
  23. model._default_manager.filter(
  24. pk__in=master_data.keys(),
  25. )
  26. .order_by()
  27. .only('pk', 'cqrs_revision')
  28. )
  29. replica_data = {instance.pk: instance.cqrs_revision for instance in qs}
  30. diff_ids = set()
  31. for pk, cqrs_revision in master_data.items():
  32. if replica_data.get(pk, -1) != cqrs_revision:
  33. diff_ids.add(pk)
  34. if diff_ids:
  35. self.stdout.write(self.serialize_out(list(diff_ids)))
  36. self.stderr.write('PK to resync: {0}'.format(str(diff_ids)))
  37. @staticmethod
  38. def _get_model(first_line):
  39. cqrs_id = first_line.split(',')[0]
  40. model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
  41. if not model:
  42. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  43. return model