123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- import logging
- from django.conf import settings
- from django.db import OperationalError, transaction
- from dj_cqrs.constants import (
- DB_VENDOR_MYSQL,
- DB_VENDOR_PG,
- MYSQL_TIMEOUT_ERROR_CODE,
- PG_TIMEOUT_FLAG,
- SUPPORTED_TIMEOUT_DB_VENDORS,
- )
- def install_last_query_capturer(model_cls):
- conn = _connection(model_cls)
- if not _get_last_query_capturer(conn):
- conn.execute_wrappers.append(_LastQueryCaptureWrapper())
- def log_timed_out_queries(error, model_cls): # pragma: no cover
- log_q = bool(settings.CQRS['replica'].get('CQRS_LOG_TIMED_OUT_QUERIES', False))
- if not (log_q and isinstance(error, OperationalError) and error.args):
- return
- conn = _connection(model_cls)
- conn_vendor = getattr(conn, 'vendor', '')
- if conn_vendor not in SUPPORTED_TIMEOUT_DB_VENDORS:
- return
- e_arg = error.args[0]
- is_timeout_error = bool(
- (conn_vendor == DB_VENDOR_MYSQL and e_arg == MYSQL_TIMEOUT_ERROR_CODE)
- or (conn_vendor == DB_VENDOR_PG and isinstance(e_arg, str) and PG_TIMEOUT_FLAG in e_arg)
- )
- if is_timeout_error:
- query = getattr(_get_last_query_capturer(conn), 'query', None)
- if query:
- logger_name = settings.CQRS['replica'].get('CQRS_QUERY_LOGGER', '') or 'django-cqrs'
- logger = logging.getLogger(logger_name)
- logger.error('Timed out query:\n%s', query)
- class _LastQueryCaptureWrapper:
- def __init__(self):
- self.query = None
- def __call__(self, execute, sql, params, many, context):
- try:
- execute(sql, params, many, context)
- finally:
- self.query = sql
- def _get_last_query_capturer(conn):
- return next((w for w in conn.execute_wrappers if isinstance(w, _LastQueryCaptureWrapper)), None)
- def _connection(model_cls):
- return transaction.get_connection(using=model_cls._default_manager.db)
|