What's Actually Happening
RabbitMQ queue message count continuously increasing. Messages are not being consumed fast enough, leading to backlog.
The Error You'll See
```bash $ rabbitmqctl list_queues name messages
myqueue 150000 # Growing continuously ```
Management UI warning:
Queue 'myqueue' has 50000 messages
Consumer utilization: 20%Memory warning:
memory alarm on node rabbit@server
Queue 'myqueue' is growing without boundConsumer lag:
Publish rate: 1000 msg/s
Consume rate: 500 msg/s # Cannot keep upWhy This Happens
- 1.Consumer too slow - Processing time per message too high
- 2.Too few consumers - Not enough consumer instances
- 3.Consumer crashed - Consumers stopped processing
- 4.Prefetch too high - Messages stuck with slow consumer
- 5.Network issues - Consumer connection problems
- 6.Queue TTL issues - Messages not expiring
Step 1: Check Queue Status
```bash # List all queues: rabbitmqctl list_queues name messages consumers
# Detailed queue info: rabbitmqctl list_queues name messages consumers message_stats
# Check queue via API: curl -u guest:guest http://localhost:15672/api/queues
# Check specific queue: curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue
# Monitor queue depth: watch -n 5 'rabbitmqctl list_queues name messages'
# Check queue growth: rabbitmqctl list_queues name message_stats.publish_details.rate
# Check consumer count: rabbitmqctl list_queues name consumers ```
Step 2: Check Consumer Status
```bash # Check connected consumers: rabbitmqctl list_connections
# Check consumer details: curl -u guest:guest http://localhost:15672/api/consumers
# Check channel details: rabbitmqctl list_channels
# Check consumer tags: rabbitmqctl list_queues name consumer_tags
# Check consumer utilization: rabbitmqctl list_queues name consumer_utilisation
# Check if consumers active: curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | jq .consumers
# Monitor consumer changes: rabbitmqctl list_connections | grep -c "running" ```
Step 3: Check Message Rates
```bash # Check publish rate: rabbitmqctl list_queues name message_stats.publish_details.rate
# Check deliver rate: rabbitmqctl list_queues name message_stats.deliver_get_details.rate
# Check ack rate: rabbitmqctl list_queues name message_stats.ack_details.rate
# Check unacked messages: rabbitmqctl list_queues name messages_unacknowledged
# Check via API: curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | \ jq '{publish: .message_stats.publish_details.rate, deliver: .message_stats.deliver_get_details.rate}'
# Calculate backlog growth: # If publish > deliver, backlog grows
# Monitor rates: watch -n 5 'curl -s -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | jq ".message_stats"' ```
Step 4: Optimize Prefetch Settings
```bash # Check current prefetch: rabbitmqctl list_channels prefetch_count
# Prefetch affects message distribution: # Too high: slow consumer blocks messages # Too low: inefficient batching
# Optimal prefetch: 10-50 typically
# Set prefetch in consumer code: # Python (pika): channel.basic_qos(prefetch_count=10)
# Java: channel.basicQos(10);
# Node.js (amqplib): channel.prefetch(10);
# Go (amqp): channel.Qos(10, 0, false)
# For multiple consumers, use lower prefetch: # 3 consumers, prefetch 10 = max 30 unacked ```
Step 5: Add More Consumers
```bash # Scale consumer instances:
# For Kubernetes: kubectl scale deployment consumer --replicas=5
# For Docker: docker-compose up -d --scale consumer=5
# For systemd services: systemctl start consumer@1 systemctl start consumer@2
# Verify consumers connected: rabbitmqctl list_queues name consumers
# Check consumer distribution: # Each consumer should get fair share of messages
# Ensure queue is shared (not exclusive): rabbitmqctl list_queues name exclusive # Exclusive queue = single consumer only ```
Step 6: Optimize Consumer Performance
```python # Increase processing speed:
# 1. Batch processing: def process_batch(ch, method, properties, body): # Process multiple messages batch.append(body) if len(batch) >= BATCH_SIZE: process_batch_data(batch) batch.clear() # Acknowledge all at once ch.basic_ack(delivery_tag=method.delivery_tag, multiple=True)
# 2. Async processing: import asyncio
async def process_message(body): # Async processing await process_data(body)
def callback(ch, method, properties, body): asyncio.run(process_message(body)) ch.basic_ack(delivery_tag=method.delivery_tag)
# 3. Parallel consumers: from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
def callback(ch, method, properties, body): executor.submit(process_and_ack, ch, method, body)
# 4. Connection pooling: # Use multiple connections for parallel processing ```
Step 7: Configure Queue Limits
```bash # Set max queue length: rabbitmqctl set_policy max-length "^myqueue$" \ '{"max-length":100000}' --apply-to queues
# Set max length in bytes: rabbitmqctl set_policy max-length-bytes "^myqueue$" \ '{"max-length-bytes":"1GB"}' --apply-to queues
# Set message TTL: rabbitmqctl set_policy message-ttl "^myqueue$" \ '{"message-ttl":86400000}' --apply-to queues # 24 hours
# Combine policies: rabbitmqctl set_policy queue-limits "^myqueue$" \ '{"max-length":100000,"message-ttl":86400000}' --apply-to queues
# Configure via API: curl -u guest:guest -X PUT http://localhost:15672/api/policies/%2F/max-length \ -H "Content-Type: application/json" \ -d '{"pattern":"^myqueue$","definition":{"max-length":100000},"apply-to":"queues"}'
# Check policy applied: rabbitmqctl list_policies ```
Step 8: Check for Poison Messages
```bash # Messages causing consumer crashes:
# Set up dead letter exchange: rabbitmqctl set_policy dlx "^myqueue$" \ '{"dead-letter-exchange":"dlx","dead-letter-routing-key":"failed"}' \ --apply-to queues
# Create dead letter queue: rabbitmqadmin declare queue name=failed_messages durable=true
# Check dead letter queue: rabbitmqctl list_queues name messages | grep failed
# Handle failed messages: # 1. Requeue with delay # 2. Send to error queue # 3. Log and discard
# Set up retry with delay: # Use x-delayed-message plugin or delayed exchange ```
Step 9: Monitor RabbitMQ Metrics
```bash # Enable management plugin: rabbitmq-plugins enable rabbitmq_management
# Check metrics via API: curl -u guest:guest http://localhost:15672/api/overview | jq .
# Key metrics to monitor: # - queue_messages (backlog) # - queue_messages_ready # - queue_messages_unacknowledged # - message_stats.publish_details.rate # - message_stats.deliver_get_details.rate # - consumer_utilisation
# Prometheus integration: # Enable prometheus plugin: rabbitmq-plugins enable rabbitmq_prometheus
# Metrics at: curl http://localhost:15692/metrics
# Grafana dashboard: # Import RabbitMQ dashboard # Set alerts for queue depth
# Alert thresholds: # Warning: queue depth > 10000 # Critical: queue depth > 50000 ```
Step 10: RabbitMQ Verification Script
```bash # Create verification script: cat << 'EOF' > /usr/local/bin/check-rabbitmq-queue.sh #!/bin/bash
QUEUE=${1:-""}
echo "=== RabbitMQ Status ===" rabbitmqctl status | head -20
echo "" echo "=== Queue Overview ===" rabbitmqctl list_queues name messages consumers message_stats
echo "" echo "=== Connection Count ===" rabbitmqctl list_connections | grep -c "running"
echo "" echo "=== Consumer Count ===" rabbitmqctl list_queues name consumers
echo "" echo "=== Message Rates ===" curl -s -u guest:guest http://localhost:15672/api/queues | \ jq '.[] | {name: .name, messages: .messages, consumers: .consumers, publish_rate: .message_stats.publish_details.rate, deliver_rate: .message_stats.deliver_get_details.rate}'
echo "" echo "=== Memory Usage ===" rabbitmqctl memory
echo "" echo "=== Policies ===" rabbitmqctl list_policies
if [ -n "$QUEUE" ]; then echo "" echo "=== Details for $QUEUE ===" curl -s -u guest:guest "http://localhost:15672/api/queues/%2F/$QUEUE" | jq . fi
echo "" echo "=== Backlog Growth Check ===" echo "Wait 10 seconds..." MSG1=$(rabbitmqctl list_queues name messages | grep "^myqueue" | awk '{print $2}') sleep 10 MSG2=$(rabbitmqctl list_queues name messages | grep "^myqueue" | awk '{print $2}') DIFF=$((MSG2 - MSG1)) echo "Messages changed by: $DIFF" if [ $DIFF -gt 0 ]; then echo "Backlog is GROWING" elif [ $DIFF -lt 0 ]; then echo "Backlog is SHRINKING" else echo "Backlog is STABLE" fi EOF
chmod +x /usr/local/bin/check-rabbitmq-queue.sh
# Usage: /usr/local/bin/check-rabbitmq-queue.sh myqueue
# Monitor: watch -n 10 /usr/local/bin/check-rabbitmq-queue.sh ```
RabbitMQ Queue Checklist
| Check | Command | Expected |
|---|---|---|
| Queue depth | list_queues | Stable or decreasing |
| Consumers | list_queues consumers | Adequate count |
| Publish rate | message_stats | Matches consume rate |
| Prefetch | list_channels | 10-50 typically |
| Consumer utilization | consumer_utilisation | > 70% |
| Policies | list_policies | Limits configured |
Verify the Fix
```bash # After fixing queue backlog
# 1. Check queue depth rabbitmqctl list_queues name messages // Depth decreasing
# 2. Check consumers rabbitmqctl list_queues name consumers // Adequate consumers active
# 3. Monitor rates curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | jq .message_stats // Publish <= Deliver rate
# 4. Check consumer utilization // > 80%
# 5. Watch for 5 minutes watch -n 10 'rabbitmqctl list_queues name messages' // Backlog shrinking
# 6. Check for stability # No consumer crashes // Stable processing ```
Related Issues
- [Fix RabbitMQ Queue Stuck](/articles/fix-rabbitmq-queue-stuck)
- [Fix Kafka Consumer Lag High](/articles/fix-kafka-consumer-lag-high)
- [Fix RabbitMQ Connection Refused](/articles/fix-rabbitmq-connection-refused)