123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- # Copyright © 2022 Ingram Micro Inc. All rights reserved.
- import logging
- import signal
- import threading
- from pathlib import Path
- from django.core.management.base import BaseCommand, CommandError
- from watchfiles import watch
- from watchfiles.filters import PythonFilter
- from watchfiles.run import start_process
- from dj_cqrs.registries import ReplicaRegistry
- logger = logging.getLogger('django-cqrs')
- def consume(**kwargs):
- import django
- django.setup()
- from dj_cqrs.transport import current_transport
- try:
- current_transport.consume(**kwargs)
- except KeyboardInterrupt:
- pass
- def _display_path(path):
- try:
- return f'"{path.relative_to(Path.cwd())}"'
- except ValueError: # pragma: no cover
- return f'"{path}"'
- class WorkersManager:
- def __init__(
- self,
- consume_kwargs,
- workers=1,
- reload=False,
- ignore_paths=None,
- sigint_timeout=5,
- sigkill_timeout=1,
- ):
- self.pool = []
- self.workers = workers
- self.reload = reload
- self.consume_kwargs = consume_kwargs
- self.stop_event = threading.Event()
- self.sigint_timeout = sigint_timeout
- self.sigkill_timeout = sigkill_timeout
- if self.reload:
- self.watch_filter = PythonFilter(ignore_paths=ignore_paths)
- self.watcher = watch(
- Path.cwd(),
- watch_filter=self.watch_filter,
- stop_event=self.stop_event,
- yield_on_timeout=True,
- )
- def handle_signal(self, *args, **kwargs):
- self.stop_event.set()
- def run(self):
- for sig in [signal.SIGINT, signal.SIGTERM]:
- signal.signal(sig, self.handle_signal)
- if self.reload:
- signal.signal(signal.SIGHUP, self.restart)
- self.start()
- if self.reload:
- for files_changed in self:
- if files_changed:
- logger.warning(
- 'Detected changes in %s. Reloading...',
- ', '.join(map(_display_path, files_changed)),
- )
- self.restart()
- else:
- self.stop_event.wait()
- self.terminate()
- def start(self):
- for _ in range(self.workers):
- process = start_process(
- consume,
- 'function',
- (),
- self.consume_kwargs,
- )
- self.pool.append(process)
- logger.info(f'Consumer process with pid {process.pid} started')
- def terminate(self, *args, **kwargs):
- while self.pool:
- process = self.pool.pop()
- process.stop(sigint_timeout=self.sigint_timeout, sigkill_timeout=self.sigkill_timeout)
- logger.info(f'Consumer process with pid {process.pid} stopped.')
- def restart(self, *args, **kwargs):
- self.terminate()
- self.start()
- def __iter__(self):
- return self
- def __next__(self):
- changes = next(self.watcher)
- if changes:
- return list({Path(c[1]) for c in changes})
- return None
- class Command(BaseCommand):
- help = 'Starts CQRS worker, which consumes messages from message queue.'
- def add_arguments(self, parser):
- parser.add_argument(
- '--workers',
- '-w',
- help='Number of workers',
- type=int,
- default=1,
- )
- parser.add_argument(
- '--cqrs-id',
- '-cid',
- nargs='*',
- type=str,
- help='Choose model(s) by CQRS_ID for consuming',
- )
- parser.add_argument(
- '--reload',
- '-r',
- help=('Enable reload signal SIGHUP and autoreload ' 'on file changes'),
- action='store_true',
- default=False,
- )
- parser.add_argument(
- '--ignore-paths',
- nargs='?',
- type=str,
- help=(
- 'Specify directories to ignore, '
- 'to ignore multiple paths use a comma as separator, '
- 'e.g. "env" or "env,node_modules"'
- ),
- )
- parser.add_argument(
- '--sigint-timeout',
- nargs='?',
- type=int,
- default=5,
- help='How long to wait for the sigint timeout before sending sigkill.',
- )
- parser.add_argument(
- '--sigkill-timeout',
- nargs='?',
- type=int,
- default=1,
- help='How long to wait for the sigkill timeout before issuing a timeout exception.',
- )
- def handle(
- self,
- *args,
- workers=1,
- cqrs_id=None,
- reload=False,
- ignore_paths=None,
- sigint_timeout=5,
- sigkill_timeout=1,
- **options,
- ):
- paths_to_ignore = None
- if ignore_paths:
- paths_to_ignore = [Path(p).resolve() for p in ignore_paths.split(',')]
- workers_manager = WorkersManager(
- workers=workers,
- consume_kwargs=self.get_consume_kwargs(cqrs_id),
- reload=reload,
- ignore_paths=paths_to_ignore,
- sigint_timeout=sigint_timeout,
- sigkill_timeout=sigkill_timeout,
- )
- workers_manager.run()
- def get_consume_kwargs(self, ids_list):
- consume_kwargs = {}
- if ids_list:
- cqrs_ids = set()
- for cqrs_id in ids_list:
- model = ReplicaRegistry.get_model_by_cqrs_id(cqrs_id)
- if not model:
- raise CommandError('Wrong CQRS ID: {0}!'.format(cqrs_id))
- cqrs_ids.add(cqrs_id)
- consume_kwargs['cqrs_ids'] = cqrs_ids
- return consume_kwargs
|