cqrs_bulk_load.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. # Copyright © 2023 Ingram Micro Inc. All rights reserved.
  2. import os
  3. import sys
  4. import ujson
  5. from django.core.management.base import BaseCommand, CommandError
  6. from django.db import DatabaseError, transaction
  7. from dj_cqrs.registries import ReplicaRegistry
  8. class Command(BaseCommand):
  9. help = 'Bulk load of a CQRS model to a replica service.'
  10. def add_arguments(self, parser):
  11. parser.add_argument(
  12. '--input',
  13. '-i',
  14. help='Input file for loading (- for reading from stdin)',
  15. type=str,
  16. required=True,
  17. )
  18. parser.add_argument(
  19. '--clear',
  20. '-c',
  21. help='Delete existing models',
  22. type=bool,
  23. required=False,
  24. default=False,
  25. )
  26. parser.add_argument(
  27. '--batch',
  28. '-b',
  29. help='Batch size',
  30. type=int,
  31. default=10000,
  32. )
  33. def handle(self, *args, **options):
  34. batch_size = self._get_batch_size(options)
  35. f_name = options['input']
  36. if f_name != '-' and not os.path.exists(f_name):
  37. raise CommandError("File {0} doesn't exist!".format(f_name))
  38. with sys.stdin if f_name == '-' else open(f_name, 'r') as f:
  39. try:
  40. cqrs_id = next(f).strip()
  41. except StopIteration:
  42. cqrs_id = None
  43. if not cqrs_id:
  44. raise CommandError('File {0} is empty!'.format(f_name))
  45. model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
  46. if not model:
  47. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  48. with transaction.atomic():
  49. if options['clear']:
  50. try:
  51. model._default_manager.all().delete()
  52. except DatabaseError:
  53. raise CommandError('Delete operation fails!')
  54. self._process(f, model, batch_size)
  55. @classmethod
  56. def _process(cls, stream, model, batch_size):
  57. success_counter = 0
  58. line_number = 2
  59. while True:
  60. with transaction.atomic():
  61. try:
  62. for _ in range(0, batch_size):
  63. line = stream.readline()
  64. success = cls._process_line(line_number, line, model)
  65. success_counter += int(bool(success))
  66. line_number += 1
  67. except EOFError:
  68. break
  69. print('Done!\n{0} instance(s) loaded.'.format(success_counter), file=sys.stderr)
  70. @staticmethod
  71. def _process_line(line_number, line, model):
  72. if not line:
  73. raise EOFError
  74. try:
  75. try:
  76. master_data = ujson.loads(line.strip())
  77. except ValueError:
  78. print(
  79. "Dump file can't be parsed: line {0}!".format(line_number),
  80. file=sys.stderr,
  81. )
  82. return False
  83. instance = model.cqrs_save(master_data)
  84. if not instance:
  85. print(
  86. "Instance can't be saved: line {0}!".format(line_number),
  87. file=sys.stderr,
  88. )
  89. else:
  90. return True
  91. except Exception as e:
  92. print(
  93. 'Unexpected error: line {0}! {1}'.format(line_number, str(e)),
  94. file=sys.stderr,
  95. )
  96. return False
  97. @staticmethod
  98. def _get_batch_size(options):
  99. return options['batch']