!!! warning
Expiration, retrying and 'dead letters' queueing supported in
`RabbitMQTransport` only (**on** by default).
django-cqrs since version 1.11 provides mechanism for reliable message delivery.
Name | Default | Description |
---|---|---|
CQRS_MESSAGE_TTL | 86400 | Limits message lifetime in seconds, then it will be moved to 'dead letters' queue. |
# settings.py
CQRS = {
...
'master': {
'CQRS_MESSAGE_TTL': 86400, # 1 day
},
}
Message assumed as failed when a consumer raises an exception or returns negative boolean value (False, None, etc).
# models.py
class Example(ReplicaMixin, models.Model):
CQRS_ID = 'example'
...
@classmethod
def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
raise Exception("Some issue during create") # exception could be caught at should_retry_cqrs() method
def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
return None # returning negative boolean for retrying
Name | Default | Description |
---|---|---|
CQRS_MAX_RETRIES | 30 | Maximum number of retry attempts. Infinite if None, 0 to disable retries. |
CQRS_RETRY_DELAY | 2 | Constant delay in seconds between message failure and requeueing. |
CQRS_DELAY_QUEUE_MAX_SIZE | 1000 | Maximum number of delayed messages per worker. Infinite if None. |
# settings.py
CQRS = {
...
'replica': {
'CQRS_MAX_RETRIES': 30, # attempts
'CQRS_RETRY_DELAY': 2, # seconds
'CQRS_DELAY_QUEUE_MAX_SIZE': 1000,
},
}
The dj_cqrs.mixins.ReplicaMixin
allows to take full control on retrying.
# models.py
class Example(ReplicaMixin, models.Model):
CQRS_ID = 'example'
...
@classmethod
def get_cqrs_retry_delay(cls, current_retry=0):
# Linear delay growth
return (current_retry + 1) * 60
@classmethod
def should_retry_cqrs(cls, current_retry, exception=None):
# Retry 10 times or until we have troubles with database
return (
current_retry < 10
or isinstance(exception, django.db.OperationalError)
)
Expired or failed messages which should not be retried are moved to 'dead letters' queue.
Name | Default | Description |
---|---|---|
dead_letter_queue | \'dead_letter_\' + queue | Queue name for dead letters. |
dead_message_ttl | 864000 | Expiration seconds. Infinite if None. |
# settings.py
CQRS = {
...
'queue': 'example',
'replica': {
...
'dead_letter_queue': 'dead_letter_example', # generated from CQRS.queue
'dead_message_ttl': 864000, # 10 days
},
}
Dumps all dead letters to stdout.
$ python manage.py cqrs_dead_letters dump
{"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":30,"expires":"2021-05-01T11:50:00+00:00"}
Retry all dead letters. Message body retries and expires fields are downgraded.
$ python manage.py cqrs_dead_letters retry
Total dead letters: 1
Retrying: 1/1
{"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":0,"expires":"2021-05-02T12:30:00+00:00"}
Removes all dead letters.
$ python manage.py cqrs_dead_letters purge
Total dead letters: 1
Purged