cqrs_sync.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import datetime
  3. import sys
  4. import time
  5. import ujson
  6. from django.core.exceptions import FieldError
  7. from django.core.management.base import BaseCommand, CommandError
  8. from django.db import close_old_connections, connections
  9. from dj_cqrs.management.utils import batch_qs
  10. from dj_cqrs.registries import MasterRegistry
  11. DEFAULT_BATCH = 10000
  12. DEFAULT_PROGRESS = False
  13. class Command(BaseCommand):
  14. help = 'Filter synchronization of certain CQRS model rows over transport to replicas.'
  15. def add_arguments(self, parser):
  16. parser.add_argument(
  17. '--cqrs-id',
  18. '-cid',
  19. help='CQRS_ID of the master model',
  20. type=str,
  21. required=True,
  22. )
  23. parser.add_argument(
  24. '--filter',
  25. '-f',
  26. help='Filter kwargs',
  27. type=str,
  28. default=None,
  29. )
  30. parser.add_argument(
  31. '--queue',
  32. '-q',
  33. help='Name of the specific replica queue',
  34. type=str,
  35. default=None,
  36. )
  37. parser.add_argument(
  38. '--batch',
  39. '-b',
  40. help='Batch size',
  41. type=int,
  42. default=DEFAULT_BATCH,
  43. )
  44. parser.add_argument(
  45. '--progress',
  46. '-p',
  47. help='Display progress',
  48. action='store_true',
  49. )
  50. def handle(self, *args, **options):
  51. model = self._get_model(options)
  52. progress = self._get_progress(options)
  53. batch_size = self._get_batch_size(options)
  54. qs = self._prepare_qs(model, options)
  55. db_count = qs.count()
  56. if db_count == 0:
  57. print('No objects found for filter!')
  58. return
  59. counter, success_counter = 0, 0
  60. if progress:
  61. print('Processing {0} records with batch size {1}'.format(db_count, batch_size))
  62. for qs_ in batch_qs(model.relate_cqrs_serialization(qs), batch_size=batch_size):
  63. ts = time.time()
  64. cs = counter
  65. # check if must reconnect
  66. if not connections[qs_.db].is_usable():
  67. connections[qs_.db].connect()
  68. for instance in qs_:
  69. counter += 1
  70. try:
  71. instance.cqrs_sync(queue=options['queue'])
  72. success_counter += 1
  73. except Exception as e:
  74. print(
  75. '\nSync record failed for pk={0}: {1}: {2}'.format(
  76. instance.pk,
  77. type(e).__name__,
  78. str(e),
  79. ),
  80. )
  81. close_old_connections()
  82. if progress:
  83. rate = (counter - cs) / (time.time() - ts)
  84. percent = 100 * counter / db_count
  85. eta = datetime.timedelta(seconds=int((db_count - counter) / rate))
  86. sys.stdout.write(
  87. '\r{0} of {1} processed - {2}% with '
  88. 'rate {3:.1f} rps, to go {4} ...{5:20}'.format(
  89. counter,
  90. db_count,
  91. int(percent),
  92. rate,
  93. str(eta),
  94. ' ',
  95. ),
  96. )
  97. sys.stdout.flush()
  98. print(
  99. 'Done!\n{0} instance(s) synced.\n{1} instance(s) processed.'.format(
  100. success_counter,
  101. counter,
  102. ),
  103. )
  104. @staticmethod
  105. def _prepare_qs(model, options):
  106. if hasattr(model, 'objects_all'): # wxl 2023-8-29
  107. qs = model.objects_all.none()
  108. else:
  109. qs = model._default_manager.none()
  110. if options['filter']:
  111. try:
  112. kwargs = ujson.loads(options['filter'])
  113. if not isinstance(kwargs, dict):
  114. raise ValueError
  115. except ValueError:
  116. raise CommandError('Bad filter kwargs!')
  117. try:
  118. if hasattr(model, 'objects_all'): # wxl 2023-8-29
  119. qs = model.objects_all.filter(**kwargs).order_by()
  120. else:
  121. qs = model._default_manager.filter(**kwargs).order_by()
  122. except FieldError as e:
  123. raise CommandError('Bad filter kwargs! {0}'.format(str(e)))
  124. return qs
  125. @staticmethod
  126. def _get_model(options):
  127. cqrs_id = options['cqrs_id']
  128. model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
  129. if not model:
  130. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  131. return model
  132. @staticmethod
  133. def _get_batch_size(options):
  134. return options.get('batch', DEFAULT_BATCH)
  135. @staticmethod
  136. def _get_progress(options):
  137. return bool(options.get('progress', DEFAULT_PROGRESS))