cqrs_consume.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. # Copyright © 2022 Ingram Micro Inc. All rights reserved.
  2. import logging
  3. import signal
  4. import threading
  5. from pathlib import Path
  6. from django.core.management.base import BaseCommand, CommandError
  7. from watchfiles import watch
  8. from watchfiles.filters import PythonFilter
  9. from watchfiles.run import start_process
  10. from dj_cqrs.registries import ReplicaRegistry
  11. logger = logging.getLogger('django-cqrs')
  12. def consume(**kwargs):
  13. import django
  14. django.setup()
  15. from dj_cqrs.transport import current_transport
  16. try:
  17. current_transport.consume(**kwargs)
  18. except KeyboardInterrupt:
  19. pass
  20. def _display_path(path):
  21. try:
  22. return f'"{path.relative_to(Path.cwd())}"'
  23. except ValueError: # pragma: no cover
  24. return f'"{path}"'
  25. class WorkersManager:
  26. def __init__(
  27. self,
  28. consume_kwargs,
  29. workers=1,
  30. reload=False,
  31. ignore_paths=None,
  32. sigint_timeout=5,
  33. sigkill_timeout=1,
  34. ):
  35. self.pool = []
  36. self.workers = workers
  37. self.reload = reload
  38. self.consume_kwargs = consume_kwargs
  39. self.stop_event = threading.Event()
  40. self.sigint_timeout = sigint_timeout
  41. self.sigkill_timeout = sigkill_timeout
  42. if self.reload:
  43. self.watch_filter = PythonFilter(ignore_paths=ignore_paths)
  44. self.watcher = watch(
  45. Path.cwd(),
  46. watch_filter=self.watch_filter,
  47. stop_event=self.stop_event,
  48. yield_on_timeout=True,
  49. )
  50. def handle_signal(self, *args, **kwargs):
  51. self.stop_event.set()
  52. def run(self):
  53. for sig in [signal.SIGINT, signal.SIGTERM]:
  54. signal.signal(sig, self.handle_signal)
  55. if self.reload:
  56. signal.signal(signal.SIGHUP, self.restart)
  57. self.start()
  58. if self.reload:
  59. for files_changed in self:
  60. if files_changed:
  61. logger.warning(
  62. 'Detected changes in %s. Reloading...',
  63. ', '.join(map(_display_path, files_changed)),
  64. )
  65. self.restart()
  66. else:
  67. self.stop_event.wait()
  68. self.terminate()
  69. def start(self):
  70. for _ in range(self.workers):
  71. process = start_process(
  72. consume,
  73. 'function',
  74. (),
  75. self.consume_kwargs,
  76. )
  77. self.pool.append(process)
  78. logger.info(f'Consumer process with pid {process.pid} started')
  79. def terminate(self, *args, **kwargs):
  80. while self.pool:
  81. process = self.pool.pop()
  82. process.stop(sigint_timeout=self.sigint_timeout, sigkill_timeout=self.sigkill_timeout)
  83. logger.info(f'Consumer process with pid {process.pid} stopped.')
  84. def restart(self, *args, **kwargs):
  85. self.terminate()
  86. self.start()
  87. def __iter__(self):
  88. return self
  89. def __next__(self):
  90. changes = next(self.watcher)
  91. if changes:
  92. return list({Path(c[1]) for c in changes})
  93. return None
  94. class Command(BaseCommand):
  95. help = 'Starts CQRS worker, which consumes messages from message queue.'
  96. def add_arguments(self, parser):
  97. parser.add_argument(
  98. '--workers',
  99. '-w',
  100. help='Number of workers',
  101. type=int,
  102. default=1,
  103. )
  104. parser.add_argument(
  105. '--cqrs-id',
  106. '-cid',
  107. nargs='*',
  108. type=str,
  109. help='Choose model(s) by CQRS_ID for consuming',
  110. )
  111. parser.add_argument(
  112. '--reload',
  113. '-r',
  114. help=('Enable reload signal SIGHUP and autoreload ' 'on file changes'),
  115. action='store_true',
  116. default=False,
  117. )
  118. parser.add_argument(
  119. '--ignore-paths',
  120. nargs='?',
  121. type=str,
  122. help=(
  123. 'Specify directories to ignore, '
  124. 'to ignore multiple paths use a comma as separator, '
  125. 'e.g. "env" or "env,node_modules"'
  126. ),
  127. )
  128. parser.add_argument(
  129. '--sigint-timeout',
  130. nargs='?',
  131. type=int,
  132. default=5,
  133. help='How long to wait for the sigint timeout before sending sigkill.',
  134. )
  135. parser.add_argument(
  136. '--sigkill-timeout',
  137. nargs='?',
  138. type=int,
  139. default=1,
  140. help='How long to wait for the sigkill timeout before issuing a timeout exception.',
  141. )
  142. def handle(
  143. self,
  144. *args,
  145. workers=1,
  146. cqrs_id=None,
  147. reload=False,
  148. ignore_paths=None,
  149. sigint_timeout=5,
  150. sigkill_timeout=1,
  151. **options,
  152. ):
  153. paths_to_ignore = None
  154. if ignore_paths:
  155. paths_to_ignore = [Path(p).resolve() for p in ignore_paths.split(',')]
  156. workers_manager = WorkersManager(
  157. workers=workers,
  158. consume_kwargs=self.get_consume_kwargs(cqrs_id),
  159. reload=reload,
  160. ignore_paths=paths_to_ignore,
  161. sigint_timeout=sigint_timeout,
  162. sigkill_timeout=sigkill_timeout,
  163. )
  164. workers_manager.run()
  165. def get_consume_kwargs(self, ids_list):
  166. consume_kwargs = {}
  167. if ids_list:
  168. cqrs_ids = set()
  169. for cqrs_id in ids_list:
  170. model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
  171. if not model:
  172. raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
  173. cqrs_ids.add(cqrs_id)
  174. consume_kwargs['cqrs_ids'] = cqrs_ids
  175. return consume_kwargs