cqrs_sync.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import datetime
  3. import sys
  4. import time
  5. import ujson
  6. from django.core.exceptions import FieldError
  7. from django.core.management.base import BaseCommand, CommandError
  8. from django.db import close_old_connections, connections
  9. from dj_cqrs.management.utils import batch_qs
  10. from dj_cqrs.registries import MasterRegistry
  11. DEFAULT_BATCH = 10000
  12. DEFAULT_PROGRESS = False
  13. class Command(BaseCommand):
  14. help = 'Filter synchronization of certain CQRS model rows over transport to replicas.'
  15. def add_arguments(self, parser):
  16. parser.add_argument(
  17. '--cqrs-id',
  18. '-cid',
  19. help='CQRS_ID of the master model',
  20. type=str,
  21. required=True,
  22. )
  23. parser.add_argument(
  24. '--filter',
  25. '-f',
  26. help='Filter kwargs',
  27. type=str,
  28. default=None,
  29. )
  30. parser.add_argument(
  31. '--queue',
  32. '-q',
  33. help='Name of the specific replica queue',
  34. type=str,
  35. default=None,
  36. )
  37. parser.add_argument(
  38. '--batch',
  39. '-b',
  40. help='Batch size',
  41. type=int,
  42. default=DEFAULT_BATCH,
  43. )
  44. parser.add_argument(
  45. '--progress',
  46. '-p',
  47. help='Display progress',
  48. action='store_true',
  49. )
  50. def handle(self, *args, **options):
  51. model = self._get_model(options)
  52. progress = self._get_progress(options)
  53. batch_size = self._get_batch_size(options)
  54. qs = self._prepare_qs(model, options)
  55. db_count = qs.count()
  56. if db_count == 0:
  57. print('No objects found for filter!')
  58. return
  59. counter, success_counter = 0, 0
  60. if progress:
  61. print('Processing {0} records with batch size {1}'.format(db_count, batch_size))
  62. for qs_ in batch_qs(model.relate_cqrs_serialization(qs), batch_size=batch_size):
  63. ts = time.time()
  64. cs = counter
  65. # check if must reconnect
  66. if not connections[qs_.db].is_usable():
  67. connections[qs_.db].connect()
  68. for instance in qs_:
  69. counter += 1
  70. try:
  71. instance.cqrs_sync(queue=options['queue'])
  72. success_counter += 1
  73. except Exception as e:
  74. print(
  75. '\nSync record failed for pk={0}: {1}: {2}'.format(
  76. instance.pk,
  77. type(e).__name__,
  78. str(e),
  79. ),
  80. )
  81. close_old_connections()
  82. if progress:
  83. rate = (counter - cs) / (time.time() - ts)
  84. percent = 100 * counter / db_count
  85. eta = datetime.timedelta(seconds=int((db_count - counter) / rate))
  86. sys.stdout.write(
  87. '\r{0} of {1} processed - {2}% with '
  88. 'rate {3:.1f} rps, to go {4} ...{5:20}'.format(
  89. counter,
  90. db_count,
  91. int(percent),
  92. rate,
  93. str(eta),
  94. ' ',
  95. ),
  96. )
  97. sys.stdout.flush()
  98. print(
  99. 'Done!\n{0} instance(s) synced.\n{1} instance(s) processed.'.format(
  100. success_counter,
  101. counter,
  102. ),
  103. )
  104. @staticmethod
  105. def _prepare_qs(model, options):
  106. qs = model._default_manager.none()
  107. if options['filter']:
  108. try:
  109. kwargs = ujson.loads(options['filter'])
  110. if not isinstance(kwargs, dict):
  111. raise ValueError
  112. except ValueError:
  113. raise CommandError('Bad filter kwargs!')
  114. try:
  115. qs = model._default_manager.filter(**kwargs).order_by()
  116. except FieldError as e:
  117. raise CommandError('Bad filter kwargs! {0}'.format(str(e)))
  118. return qs
  119. @staticmethod
  120. def _get_model(options):
  121. cqrs_id = options['cqrs_id']
  122. model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
  123. if not model:
  124. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  125. return model
  126. @staticmethod
  127. def _get_batch_size(options):
  128. return options.get('batch', DEFAULT_BATCH)
  129. @staticmethod
  130. def _get_progress(options):
  131. return bool(options.get('progress', DEFAULT_PROGRESS))