cqrs_bulk_dump.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import datetime
  3. import os
  4. import sys
  5. import time
  6. import ujson
  7. from django.core.management.base import BaseCommand, CommandError
  8. from dj_cqrs.management.utils import batch_qs
  9. from dj_cqrs.registries import MasterRegistry
  10. class Command(BaseCommand):
  11. help = 'Bulk dump of a CQRS model from master service.'
  12. def add_arguments(self, parser):
  13. parser.add_argument(
  14. '--cqrs-id',
  15. '-c',
  16. help='CQRS_ID of the master model',
  17. type=str,
  18. required=True,
  19. )
  20. parser.add_argument(
  21. '--output',
  22. '-o',
  23. help='Output file for dumping (- for writing to stdout)',
  24. type=str,
  25. default=None,
  26. )
  27. parser.add_argument(
  28. '--batch',
  29. '-b',
  30. help='Batch size',
  31. type=int,
  32. default=10000,
  33. )
  34. parser.add_argument(
  35. '--progress',
  36. '-p',
  37. help='Display progress',
  38. action='store_true',
  39. )
  40. parser.add_argument(
  41. '--force',
  42. '-f',
  43. help='Override output file',
  44. action='store_true',
  45. )
  46. def handle(self, *args, **options):
  47. model = self._get_model(options)
  48. out_fname = self._get_output_filename(options)
  49. progress = self._get_progress(options)
  50. batch_size = self._get_batch_size(options)
  51. with sys.stdout if out_fname == '-' else open(out_fname, 'w') as f:
  52. f.write(model.CQRS_ID)
  53. counter, success_counter = 0, 0
  54. db_count = model._default_manager.count()
  55. if progress:
  56. print(
  57. 'Processing {0} records with batch size {1}'.format(db_count, batch_size),
  58. file=sys.stderr,
  59. )
  60. for qs in batch_qs(
  61. model.relate_cqrs_serialization(model._default_manager.order_by().all()),
  62. batch_size=batch_size,
  63. ):
  64. ts = time.time()
  65. cs = counter
  66. for instance in qs:
  67. counter += 1
  68. try:
  69. f.write(
  70. '\n' + ujson.dumps(instance.to_cqrs_dict()),
  71. )
  72. success_counter += 1
  73. except Exception as e:
  74. print(
  75. '\nDump record failed for pk={0}: {1}: {2}'.format(
  76. instance.pk,
  77. type(e).__name__,
  78. str(e),
  79. ),
  80. file=sys.stderr,
  81. )
  82. if progress:
  83. rate = (counter - cs) / (time.time() - ts)
  84. percent = 100 * counter / db_count
  85. eta = datetime.timedelta(seconds=int((db_count - counter) / rate))
  86. sys.stderr.write(
  87. '\r{0} of {1} processed - {2}% with '
  88. 'rate {3:.1f} rps, to go {4} ...{5:20}'.format(
  89. counter,
  90. db_count,
  91. int(percent),
  92. rate,
  93. str(eta),
  94. ' ',
  95. ),
  96. )
  97. sys.stderr.flush()
  98. print(
  99. 'Done!\n{0} instance(s) saved.\n{1} instance(s) processed.'.format(
  100. success_counter,
  101. counter,
  102. ),
  103. file=sys.stderr,
  104. )
  105. @staticmethod
  106. def _get_model(options):
  107. cqrs_id = options['cqrs_id']
  108. model = MasterRegistry.get_model_by_cqrs_id(cqrs_id)
  109. if not model:
  110. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  111. return model
  112. @staticmethod
  113. def _get_output_filename(options):
  114. f_name = options['output']
  115. if f_name is None:
  116. f_name = '{0}.dump'.format(options['cqrs_id'])
  117. if f_name != '-' and os.path.exists(f_name) and not (options['force']):
  118. raise CommandError('File {0} exists!'.format(f_name))
  119. return f_name
  120. @staticmethod
  121. def _get_progress(options):
  122. return bool(options['progress'])
  123. @staticmethod
  124. def _get_batch_size(options):
  125. return options['batch']