Introduction

RabbitMQ consumer cancel and nack loops occur when messages are continuously redelivered without successful processing, creating an infinite cycle of failures. This happens when consumers reject messages with nack (negative acknowledgment) or throw exceptions before acknowledgment, causing RabbitMQ to redeliver the message to another consumer or the same consumer on restart. The loop consumes resources, blocks queue processing, and can cascade into broker instability.

Symptoms

  • Queue messages_ready count stays constant or grows despite active consumers
  • Consumer logs show same message_id or correlation_id repeatedly
  • redelivered: true flag set on consumed messages
  • Consumers disconnect with CANCEL or channel exceptions
  • Queue consumer_count fluctuates as consumers disconnect and reconnect
  • Issue appears after deploy that changed message format, consumer logic, or queue configuration

Common Causes

  • Consumer throws exception before sending acknowledgment (auto-requeue on nack)
  • requeue: true on nack/reject causes infinite redelivery of poison messages
  • Message processing timeout exceeds consumer timeout, causing channel close
  • Queue has no dead letter exchange (DLX) configured for failed messages
  • Consumer acknowledgment mode set to AUTO but processing can fail intermittently
  • Multiple consumers with different processing capabilities (some always fail)

Step-by-Step Fix

### 1. Identify poison messages by redelivery count

Track messages that have been redelivered multiple times:

```bash # RabbitMQ CLI: check queue stats rabbitmqctl list_queues name messages_ready messages_unacknowledged consumers

# Check individual message redelivery (requires rabbitmq_management plugin) curl -u guest:guest "http://localhost:15672/api/queues/%2F/<queue-name>/messages" \ -H "content-type: application/json" \ -d '{"count": 10, "encoding": "auto", "truncate": 50000}'

# Look for messages with redelivered: true multiple times ```

In application code, log redelivery count:

```python # Python/pika example def callback(ch, method, properties, body): redelivery_count = properties.headers.get('x-death', [{}])[0].get('count', 0) if properties.headers else 0

if redelivery_count > 3: logger.error(f"Poison message after {redelivery_count} deliveries: {properties.message_id}") # Route to dead letter or alert

# Process message... ```

### 2. Check consumer acknowledgment mode

Verify how acknowledgments are configured:

```python # WRONG: AUTO ack with exception-prone processing channel.basic_consume(queue='my-queue', on_message_callback=callback, auto_ack=True)

# CORRECT: Manual acknowledgment with error handling channel.basic_consume(queue='my-queue', on_message_callback=callback, auto_ack=False)

def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"Processing failed: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) # Send to DLX ```

Acknowledgment modes: - AUTO (auto_ack=True): Message acked before processing (data loss risk) - MANUAL (auto_ack=False): Application controls ack/nack (recommended) - NONE: No acknowledgment (use for fire-and-forget scenarios)

### 3. Configure dead letter exchange for failed messages

Dead letter exchanges (DLX) capture failed messages and prevent loops:

```bash # Declare queue with DLX rabbitmqadmin declare queue name=my-queue durable=true \ arguments='{"x-dead-letter-exchange": "dlx.exchange", "x-dead-letter-routing-key": "failed"}'

# Declare dead letter queue rabbitmqadmin declare queue name=dead-letter-queue durable=true

# Bind DLX to dead letter queue rabbitmqadmin declare binding source=dlx.exchange destination=dead-letter-queue routing_key=failed ```

Queue arguments for DLX:

python # Python/pika example channel.queue_declare( queue='my-queue', durable=True, arguments={ 'x-dead-letter-exchange': 'dlx.exchange', 'x-dead-letter-routing-key': 'failed', 'x-message-ttl': 86400000, # 24 hours before dead lettering 'x-max-length': 10000 # Max queue size before oldest messages dead lettered } )

### 4. Set message TTL and queue length limits

Prevent unbounded queue growth from poison messages:

```bash # Set per-queue message TTL (time before message expires) rabbitmqctl set_policy TTL ".*" '{"message-ttl": 86400000}' --apply-to queues

# Set max queue length rabbitmqctl set_policy max-length "my-queue" '{"max-length": 10000}' --apply-to queues ```

TTL configuration options:

python queue_arguments = { 'x-message-ttl': 3600000, # 1 hour - message expires 'x-expires': 7200000, # 2 hours - queue auto-delete if unused 'x-max-length': 10000, # Max messages before oldest dropped 'x-max-length-bytes': 104857600 # 100MB max queue size }

### 5. Implement consumer-side retry limits

Track retry count in message headers and reject after threshold:

```python import json

def callback(ch, method, properties, body): # Get retry count from headers retry_count = 0 if properties.headers and 'x-retry-count' in properties.headers: retry_count = properties.headers['x-retry-count']

max_retries = 3

if retry_count >= max_retries: logger.error(f"Max retries ({max_retries}) exceeded for message {properties.message_id}") # Route to dead letter without requeue ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) return

try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.warning(f"Retry {retry_count + 1}/{max_retries}: {e}")

# Republish with incremented retry count new_headers = properties.headers.copy() if properties.headers else {} new_headers['x-retry-count'] = retry_count + 1

ch.basic_publish( exchange=properties.exchange, routing_key=properties.routing_key, body=body, properties=pika.BasicProperties( headers=new_headers, delivery_mode=2 # Persistent ) )

# Acknowledge original to remove from queue ch.basic_ack(delivery_tag=method.delivery_tag) ```

### 6. Check consumer timeout configuration

Consumer timeout can cause channel close before processing completes:

```bash # RabbitMQ server configuration (rabbitmq.conf) # Default consumer timeout is 30 minutes consumer_timeout = 300000

# For long-running message processing, increase timeout consumer_timeout = 600000 # 10 hours ```

Application-side timeout settings:

```python # Python/pika connection parameters connection_params = pika.ConnectionParameters( host='rabbitmq', client_properties={'connection_timeout': 600}, # 10 minutes heartbeat=600 # 10 minutes heartbeat )

# Java Spring AMQP # application.yml spring: rabbitmq: listener: simple: acknowledge-mode: manual concurrency: 3 max-concurrency: 10 prefetch: 1 retry: enabled: true initial-interval: 1000 max-attempts: 3 max-interval: 10000 ```

### 7. Monitor consumer health and queue metrics

Set up monitoring for early detection:

```bash # Prometheus metrics (via rabbitmq_prometheus plugin) # Key metrics to alert on:

# Queue depth growing rabbitmq_queue_messages_ready > 1000

# Consumers disconnecting rate(rabbitmq_queue_consumers[5m]) < 0

# Message redelivery rate rate(rabbitmq_queue_messages_redelivered_total[5m]) > 10

# Consumer cancel rate rate(rabbitmq_queue_consumer_cancel_rate[5m]) > 0.5 ```

Grafana dashboard panels: - Queue depth over time (ready, unacknowledged, total) - Consumer count and cancel rate - Message publish vs. consume rate - Redelivery percentage

### 8. Check for queue type and quorum queue behavior

Quorum queues handle failures differently than classic queues:

```bash # Check queue type rabbitmqctl list_queues name type

# Classic queue: redelivers to any available consumer # Quorum queue: redelivers to same consumer on restart

# For quorum queues, configure consumer restart strategy ```

Quorum queue considerations:

```python # Quorum queue declaration channel.queue_declare( queue='my-quorum-queue', durable=True, queue_type='quorum' # Or 'stream' for streaming use cases )

# Quorum queues always persist and replicate # Better for critical messages, but redelivery behavior differs ```

### 9. Implement circuit breaker pattern for consumers

Prevent cascade failures when downstream is unavailable:

```python from circuitbreaker import circuit

@circuit(failure_threshold=5, recovery_timeout=60) def process_message(body): # Call downstream service response = requests.post('http://downstream-service/process', json=body) response.raise_for_status()

def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) except CircuitOpen as e: logger.warning(f"Circuit open, rejecting message: {e}") # Do not requeue - circuit is open for a reason ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) except Exception as e: logger.error(f"Processing failed: {e}") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) ```

### 10. Analyze consumer logs for root cause

Correlate message IDs across consumer instances:

```bash # Grep for specific message ID across all consumer logs grep "message_id=abc123" /var/log/consumer/*.log

# Look for exception patterns grep -E "Exception|Error|Timeout" /var/log/consumer/app.log | tail -50

# Check for consumer lifecycle events grep -E "connected|disconnected|CANCEL|basic_consume" /var/log/consumer/app.log ```

Common log patterns indicating loop: - Same correlation_id appearing every few seconds - basic_nack followed immediately by basic_deliver with same tag - Channel exceptions with PRECONDITION_FAILED (ack mismatch)

Prevention

  • Always configure dead letter exchanges for production queues
  • Set x-message-ttl based on message freshness requirements
  • Implement idempotent message processing where possible
  • Log message ID and redelivery count for debugging
  • Use manual acknowledgment mode with proper error handling
  • Set requeue: false after retry threshold exceeded
  • Monitor messages_ready and consumer lag as leading indicators
  • **Consumer timeout**: Channel closed before processing complete
  • **Queue memory alarm**: Broker stops accepting publishes when memory high
  • **Connection blocked**: Flow control due to resource exhaustion