Introduction
RabbitMQ consumer lag occurs when messages are published to queues faster than consumers can process them, causing unbounded queue growth, increased memory usage, and eventual broker instability. Consumer lag is measured by queue depth (messages pending delivery) and consumer lag (messages delivered but unacknowledged). When lag grows unchecked, RabbitMQ hits memory watermarks, blocks publishers, and may crash or page messages to disk causing severe performance degradation. The fix requires understanding queue metrics, consumer throughput analysis, prefetch configuration, parallel consumer scaling, message acknowledgment patterns, dead letter handling, and flow control mechanisms. This guide provides production-proven troubleshooting for RabbitMQ consumer lag scenarios across standalone brokers, clusters, and federated architectures.
Symptoms
- RabbitMQ management UI shows growing queue depth (Ready messages increasing)
- Consumer lag metrics show unacknowledged messages growing
- Queue memory usage increases continuously
- Publishers blocked due to flow control
- Consumers show high processing latency or timeouts
- Messages paged to disk (gc_num checks in management UI)
- Broker memory hits high watermark (0.4 default = 40% of RAM)
- Consumer applications show OutOfMemoryError or processing timeouts
- Dead letter queue filling up with rejected messages
Common Causes
- Consumer processing slower than producer rate (throughput mismatch)
- Prefetch count too high, overwhelming consumers
- Insufficient consumer instances or concurrency
- Consumer processing logic has performance bottleneck (database, API calls)
- Messages not acknowledged (missing ack/nack)
- Consumer crashed while processing, messages requeued indefinitely
- Long-running messages blocking prefetch slot
- Queue has single consumer but needs parallel processing
- Memory limits causing flow control
- Dead letter handling not configured, failed messages requeued
Step-by-Step Fix
### 1. Confirm consumer lag diagnosis
Check queue metrics in management UI:
```bash # Use RabbitMQ Management API # Enable management plugin first rabbitmq-plugins enable rabbitmq_management
# Get queue metrics curl -u guest:guest http://localhost:15672/api/queues/%2F/my-queue
# Key metrics in response: # { # "messages": 50000, # Total messages # "messages_ready": 45000, # Ready to be delivered # "messages_unacknowledged": 5000, # Delivered but not acked # "consumers": 2, # Active consumers # "memory": 524288000, # Memory used (bytes) # "garbage_collection": { # "gc_num": 100, # Times messages paged to disk # "minor_gcs": 500 # } # }
# Check all queues for lag curl -u guest:guest http://localhost:15672/api/queues \ | jq '.[] | {name: .name, messages: .messages, consumers: .consumers, memory: .memory}'
# Check consumer details curl -u guest:guest http://localhost:15672/api/consumers \ | jq '.[] | {queue: .queue.name, channel: .channel.number, prefetch: .prefetch_count}' ```
Use rabbitmqadmin CLI:
```bash # Download rabbitmqadmin curl -O http://localhost:15672/cli/rabbitmqadmin chmod +x rabbitmqadmin
# List queues with message counts ./rabbitmqadmin list queues name messages messages_ready messages_unacknowledged consumers
# List consumers ./rabbitmqadmin list consumers queue prefetch_count
# Check channel details ./rabbitmqadmin list channels name prefetch_count consumer_count messages_unacknowledged ```
Monitor lag in real-time:
```bash # Watch queue depth watch -n 1 'curl -s -u guest:guest http://localhost:15672/api/queues/%2F/my-queue | jq ".messages_ready"'
# Watch consumer count and unack watch -n 1 'curl -s -u guest:guest http://localhost:15672/api/queues/%2F/my-queue | jq "{ready: .messages_ready, unack: .messages_unacknowledged, consumers: .consumers}"'
# Check if publishers are blocked curl -s -u guest:guest http://localhost:15672/api/alarms | jq '.[] | {node: .node, reason: .reason}' # Output: {"node":"rabbit@node1","reason":"broker_disk_free_limit"} ```
### 2. Tune prefetch count
Prefetch controls how many messages are delivered before waiting for acks:
```python # WRONG: No prefetch limit (consumer overwhelmed) channel = connection.channel() channel.basic_consume(queue='my-queue', callback=process_message) channel.start_consuming()
# CORRECT: Set prefetch count channel = connection.channel() channel.basic_qos(prefetch_count=10) # Max 10 unacked messages channel.basic_consume(queue='my-queue', callback=process_message) channel.start_consuming()
# Prefetch guidelines: # - prefetch_count=1: Process one at a time (slow but safe) # - prefetch_count=10: Good for most workloads # - prefetch_count=100: High throughput, risk of overwhelming # - prefetch_count=0: Unlimited (dangerous!) ```
Java Spring AMQP:
```java @Configuration public class RabbitConfig {
@Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory( ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory);
// Prefetch configuration factory.setPrefetchCount(10); // Max unacked per consumer factory.setConcurrentConsumers(3); // Initial consumers factory.setMaxConcurrentConsumers(10); // Scale up to 10
// Ack mode factory.setAcknowledgeMode(AcknowledgeMode.AUTO); // Auto ack after success // Or manual: AcknowledgeMode.MANUAL
return factory; } } ```
### 3. Scale consumer concurrency
Add more consumers to process in parallel:
```python # Single consumer (slow) channel.basic_consume(queue='my-queue', callback=callback)
# Multiple consumers with threads import threading from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
def consume_with_thread(channel, consumer_tag): def callback(ch, method, properties, body): # Process in thread pool executor.submit(process_message, body) ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(queue='my-queue', on_message_callback=callback) channel.start_consuming()
# Start multiple consumer threads for i in range(5): thread = threading.Thread(target=consume_with_thread, args=(channel, f'consumer-{i}')) thread.start() ```
Spring Boot consumer scaling:
yaml
# application.yml
spring:
rabbitmq:
listener:
simple:
prefetch: 10
concurrency: 3 # Initial consumers
max-concurrency: 10 # Scale up to 10
acknowledge-mode: auto
retry:
enabled: true
initial-interval: 1000
max-attempts: 3
max-interval: 10000
Kubernetes consumer autoscaling:
yaml
# KEDA scaler for RabbitMQ
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: rabbitmq-consumer-scaler
spec:
scaleTargetRef:
name: consumer-deployment
minReplicaCount: 2
maxReplicaCount: 20
triggers:
- type: rabbitmq
metadata:
host: rabbitmq.default.svc.cluster.local
queueName: my-queue
queueLength: "100" # Scale to keep 100 messages per pod
protocol: amqp
### 4. Fix message acknowledgment
Ensure messages are properly acknowledged:
```python # WRONG: Message not acknowledged (will be requeued on disconnect) def process_message(ch, method, properties, body): do_work(body) # Missing: ch.basic_ack()!
# CORRECT: Acknowledge after successful processing def process_message(ch, method, properties, body): try: do_work(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: # Reject message (send to DLQ or discard) ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) logger.error(f"Message processing failed: {e}")
# Manual ack with transaction def process_message(ch, method, properties, body): try: # Process message result = do_work(body)
# Verify result before ack if result.success: ch.basic_ack(delivery_tag=method.delivery_tag) else: # Send to DLQ ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) except Exception as e: # On error, reject without requeue ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) raise ```
Dead letter queue configuration:
```python # Declare queue with DLQ channel.queue_declare( queue='my-queue', arguments={ 'x-dead-letter-exchange': 'dlx-exchange', 'x-dead-letter-routing-key': 'my-queue-dlq', 'x-message-ttl': 86400000, # 24 hours } )
# Declare DLQ channel.queue_declare(queue='my-queue-dlq', durable=True) channel.exchange_declare(exchange='dlx-exchange', exchange_type='direct') channel.queue_bind( queue='my-queue-dlq', exchange='dlx-exchange', routing_key='my-queue-dlq' ) ```
### 5. Analyze consumer throughput
Identify processing bottlenecks:
```python # Add timing to consumer import time from prometheus_client import Counter, Histogram
# Metrics messages_processed = Counter('rabbitmq_messages_processed_total', 'Total messages processed') message_processing_time = Histogram('rabbitmq_message_processing_seconds', 'Message processing time') message_errors = Counter('rabbitmq_message_errors_total', 'Message processing errors')
def process_message(ch, method, properties, body): start_time = time.time()
try: with message_processing_time.time(): do_work(body)
ch.basic_ack(delivery_tag=method.delivery_tag) messages_processed.inc()
except Exception as e: message_errors.inc() ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) logger.error(f"Error: {e}", exc_info=True)
finally: elapsed = time.time() - start_time if elapsed > 5.0: # Log slow messages logger.warning(f"Slow message processing: {elapsed:.2f}s") ```
Database bottleneck detection:
```python # If consumer is database-bound def do_work(body): # Check database connection pool start = time.time() conn = get_db_connection() db_wait_time = time.time() - start
if db_wait_time > 1.0: logger.warning(f"Database connection took {db_wait_time:.2f}s")
# Check query execution start = time.time() result = execute_query(conn, body) query_time = time.time() - start
if query_time > 2.0: logger.warning(f"Query took {query_time:.2f}s")
# If database is bottleneck: # 1. Increase connection pool size # 2. Optimize slow queries # 3. Add database read replicas # 4. Batch database operations ```
### 6. Handle memory pressure
Configure RabbitMQ memory limits:
```bash # Check current memory usage rabbitmqctl status | grep -A10 "Memory"
# Memory high watermark (default 40% of RAM) # When exceeded, publishers are blocked
# Configure memory limit in /etc/rabbitmq/rabbitmq.conf # Default: 40% of available RAM vm_memory_high_watermark.relative = 0.4
# Or absolute value # vm_memory_high_watermark.absolute = 2GB
# When memory hits watermark: # 1. Publishers are blocked (flow control) # 2. Messages may be paged to disk # 3. GC runs more aggressively
# Check flow control status rabbitmqctl status | grep -A5 "Flow Control"
# If publishers blocked: # - Scale consumers # - Reduce producer rate # - Increase memory limit (if RAM available) ```
Disk space for message paging:
```bash # Check disk free limit (default 2GB) rabbitmqctl status | grep -A5 "Disk Free"
# Configure in rabbitmq.conf # When disk falls below this, publishers blocked disk_free_limit.absolute = 2GB
# Relative to memory # disk_free_limit.relative = 1.0 # Same as memory watermark
# Monitor disk usage watch 'df -h /var/lib/rabbitmq'
# If disk fills: # - Delete old messages # - Scale consumers # - Add disk space ```
### 7. Handle poison messages
Messages that always fail block the queue:
```python # WRONG: Requeue failed messages forever (poison pill) def process_message(ch, method, properties, body): try: do_work(body) ch.basic_ack(delivery_tag=method.delivery_tag) except: # This requeues the message - it will be delivered again! ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# CORRECT: Track failures and send to DLQ from redis import Redis
redis_client = Redis()
def process_message(ch, method, properties, body): message_id = properties.message_id retry_key = f"message_retry:{message_id}"
# Get retry count retry_count = int(redis_client.get(retry_key) or 0)
try: do_work(body) ch.basic_ack(delivery_tag=method.delivery_tag) redis_client.delete(retry_key) # Clear retry count except Exception as e: retry_count += 1
if retry_count >= 3: # Max retries exceeded, send to DLQ logger.error(f"Message {message_id} failed {retry_count} times, sending to DLQ") ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) redis_client.delete(retry_key) else: # Increment retry count and requeue redis_client.setex(retry_key, 3600, retry_count) # Expire after 1 hour ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True) ```
Dead letter queue consumer:
```python # Process DLQ messages def process_dlq_message(ch, method, properties, body): # Log for manual inspection logger.error(f"DLQ message: {body}")
# Optionally: send to error queue for manual processing # Optionally: attempt recovery with different logic
# Always ack DLQ messages to prevent infinite loop ch.basic_ack(delivery_tag=method.delivery_tag)
# Subscribe to DLQ dlq_channel.basic_consume( queue='my-queue-dlq', on_message_callback=process_dlq_message ) ```
### 8. Implement batch processing
Process multiple messages efficiently:
```python # Batch processing for high throughput import pika from pika.adapters.blocking_connection import BlockingConnection
class BatchConsumer: def __init__(self, connection, queue, batch_size=100, batch_timeout=5.0): self.connection = connection self.queue = queue self.batch_size = batch_size self.batch_timeout = batch_timeout
def consume(self): channel = self.connection.channel() channel.basic_qos(prefetch_count=self.batch_size)
while True: batch = [] start_time = time.time()
# Collect batch while len(batch) < self.batch_size: remaining = self.batch_timeout - (time.time() - start_time) if remaining <= 0: break
method, properties, body = channel.basic_get(queue=self.queue, auto_ack=False) if method: batch.append((method, properties, body)) else: time.sleep(0.1) # No messages, wait
if not batch: continue
# Process batch try: self.process_batch(batch) # Ack all messages for method, _, _ in batch: channel.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"Batch processing failed: {e}") # Reject all messages for method, _, _ in batch: channel.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
def process_batch(self, batch): # Process all messages in batch (database batch insert, etc.) bodies = [body for _, _, body in batch] bulk_insert(bodies) ```
### 9. Monitor and alert on lag
Set up comprehensive monitoring:
```python # Prometheus metrics exporter for RabbitMQ from prometheus_client import start_http_server, Gauge
queue_depth = Gauge('rabbitmq_queue_depth', 'Queue depth (ready messages)', ['queue']) queue_consumers = Gauge('rabbitmq_queue_consumers', 'Number of consumers', ['queue']) queue_memory = Gauge('rabbitmq_queue_memory_bytes', 'Queue memory usage', ['queue']) consumer_lag = Gauge('rabbitmq_consumer_lag', 'Consumer lag (unacked messages)', ['queue'])
def collect_metrics(): while True: try: response = requests.get('http://localhost:15672/api/queues', auth=('guest', 'guest')) queues = response.json()
for queue in queues: queue_depth.labels(queue=queue['name']).set(queue['messages_ready']) queue_consumers.labels(queue=queue['name']).set(queue['consumers']) queue_memory.labels(queue=queue['name']).set(queue['memory']) consumer_lag.labels(queue=queue['name']).set(queue['messages_unacknowledged'])
time.sleep(10) except Exception as e: logger.error(f"Metrics collection failed: {e}") time.sleep(30)
# Start metrics server start_http_server(8000) ```
Prometheus alerting rules:
```yaml # alerting_rules.yml groups: - name: rabbitmq_lag rules: - alert: RabbitMQQueueLagHigh expr: rabbitmq_queue_depth > 10000 for: 5m labels: severity: warning annotations: summary: "RabbitMQ queue {{ $labels.queue }} has high lag" description: "Queue depth is {{ $value }} messages"
- alert: RabbitMQQueueLagCritical
- expr: rabbitmq_queue_depth > 100000
- for: 5m
- labels:
- severity: critical
- annotations:
- summary: "RabbitMQ queue {{ $labels.queue }} has critical lag"
- description: "Queue depth is {{ $value }} messages - consumers cannot keep up"
- alert: RabbitMQNoConsumers
- expr: rabbitmq_queue_consumers == 0
- for: 2m
- labels:
- severity: critical
- annotations:
- summary: "RabbitMQ queue {{ $labels.queue }} has no consumers"
- description: "Queue has no active consumers - messages will accumulate"
- alert: RabbitMQMemoryHigh
- expr: rabbitmq_queue_memory_bytes > 1073741824 # 1GB
- for: 10m
- labels:
- severity: warning
- annotations:
- summary: "RabbitMQ queue {{ $labels.queue }} memory high"
- description: "Queue memory usage is {{ $value | humanize }} bytes"
`
### 10. Handle queue backlog recovery
Clear existing backlog:
```bash # Option 1: Purge queue (lose all messages) rabbitmqadmin purge queue name=my-queue
# Option 2: Add temporary consumers to drain queue # Deploy consumer scaling (see step 3)
# Option 3: Reduce message TTL rabbitmqadmin set_parameter policy temporary-ttl '{"pattern":"^my-queue$","definition":{"message-ttl":60000}}' # Messages expire after 1 minute
# Option 4: Export messages and reimport later # For critical messages that can't be lost
# Check queue status during recovery watch 'rabbitmqadmin list queues name messages messages_ready messages_unacknowledged' ```
Prevention for future backlogs:
yaml
# Queue configuration with limits
channel.queue_declare(
queue='my-queue',
durable=True,
arguments={
'x-max-length': 100000, # Max messages before overflow
'x-max-length-bytes': 1073741824, # Max 1GB
'x-overflow': 'reject-publish', # Reject new messages when full
# Or: 'x-overflow': 'drop-head' # Drop oldest messages
'x-message-ttl': 86400000, # 24 hour TTL
}
)
Prevention
- Set prefetch count to limit unacknowledged messages per consumer
- Scale consumers based on queue depth (autoscaling)
- Configure dead letter queues for failed messages
- Set message TTL to prevent indefinite queuing
- Monitor queue depth, consumer count, and memory usage
- Implement batch processing for high-throughput scenarios
- Track retry counts to handle poison messages
- Set queue length limits to prevent unbounded growth
- Test consumer throughput against expected producer rates
- Document consumer scaling procedures for incident response
Related Errors
- **broker_disk_free_limit**: Disk space below threshold
- **channel_closed**: Consumer channel closed unexpectedly
- **consumer_cancelled**: Consumer cancelled by broker
- **memory_high_watermark_reached**: Memory limit hit, publishers blocked
- **connection_blocked**: Connection blocked due to flow control