What's Actually Happening

RabbitMQ queue message count continuously increasing. Messages are not being consumed fast enough, leading to backlog.

The Error You'll See

```bash $ rabbitmqctl list_queues name messages

myqueue 150000 # Growing continuously ```

Management UI warning:

bash
Queue 'myqueue' has 50000 messages
Consumer utilization: 20%

Memory warning:

bash
memory alarm on node rabbit@server
Queue 'myqueue' is growing without bound

Consumer lag:

bash
Publish rate: 1000 msg/s
Consume rate: 500 msg/s  # Cannot keep up

Why This Happens

  1. 1.Consumer too slow - Processing time per message too high
  2. 2.Too few consumers - Not enough consumer instances
  3. 3.Consumer crashed - Consumers stopped processing
  4. 4.Prefetch too high - Messages stuck with slow consumer
  5. 5.Network issues - Consumer connection problems
  6. 6.Queue TTL issues - Messages not expiring

Step 1: Check Queue Status

```bash # List all queues: rabbitmqctl list_queues name messages consumers

# Detailed queue info: rabbitmqctl list_queues name messages consumers message_stats

# Check queue via API: curl -u guest:guest http://localhost:15672/api/queues

# Check specific queue: curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue

# Monitor queue depth: watch -n 5 'rabbitmqctl list_queues name messages'

# Check queue growth: rabbitmqctl list_queues name message_stats.publish_details.rate

# Check consumer count: rabbitmqctl list_queues name consumers ```

Step 2: Check Consumer Status

```bash # Check connected consumers: rabbitmqctl list_connections

# Check consumer details: curl -u guest:guest http://localhost:15672/api/consumers

# Check channel details: rabbitmqctl list_channels

# Check consumer tags: rabbitmqctl list_queues name consumer_tags

# Check consumer utilization: rabbitmqctl list_queues name consumer_utilisation

# Check if consumers active: curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | jq .consumers

# Monitor consumer changes: rabbitmqctl list_connections | grep -c "running" ```

Step 3: Check Message Rates

```bash # Check publish rate: rabbitmqctl list_queues name message_stats.publish_details.rate

# Check deliver rate: rabbitmqctl list_queues name message_stats.deliver_get_details.rate

# Check ack rate: rabbitmqctl list_queues name message_stats.ack_details.rate

# Check unacked messages: rabbitmqctl list_queues name messages_unacknowledged

# Check via API: curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | \ jq '{publish: .message_stats.publish_details.rate, deliver: .message_stats.deliver_get_details.rate}'

# Calculate backlog growth: # If publish > deliver, backlog grows

# Monitor rates: watch -n 5 'curl -s -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | jq ".message_stats"' ```

Step 4: Optimize Prefetch Settings

```bash # Check current prefetch: rabbitmqctl list_channels prefetch_count

# Prefetch affects message distribution: # Too high: slow consumer blocks messages # Too low: inefficient batching

# Optimal prefetch: 10-50 typically

# Set prefetch in consumer code: # Python (pika): channel.basic_qos(prefetch_count=10)

# Java: channel.basicQos(10);

# Node.js (amqplib): channel.prefetch(10);

# Go (amqp): channel.Qos(10, 0, false)

# For multiple consumers, use lower prefetch: # 3 consumers, prefetch 10 = max 30 unacked ```

Step 5: Add More Consumers

```bash # Scale consumer instances:

# For Kubernetes: kubectl scale deployment consumer --replicas=5

# For Docker: docker-compose up -d --scale consumer=5

# For systemd services: systemctl start consumer@1 systemctl start consumer@2

# Verify consumers connected: rabbitmqctl list_queues name consumers

# Check consumer distribution: # Each consumer should get fair share of messages

# Ensure queue is shared (not exclusive): rabbitmqctl list_queues name exclusive # Exclusive queue = single consumer only ```

Step 6: Optimize Consumer Performance

```python # Increase processing speed:

# 1. Batch processing: def process_batch(ch, method, properties, body): # Process multiple messages batch.append(body) if len(batch) >= BATCH_SIZE: process_batch_data(batch) batch.clear() # Acknowledge all at once ch.basic_ack(delivery_tag=method.delivery_tag, multiple=True)

# 2. Async processing: import asyncio

async def process_message(body): # Async processing await process_data(body)

def callback(ch, method, properties, body): asyncio.run(process_message(body)) ch.basic_ack(delivery_tag=method.delivery_tag)

# 3. Parallel consumers: from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)

def callback(ch, method, properties, body): executor.submit(process_and_ack, ch, method, body)

# 4. Connection pooling: # Use multiple connections for parallel processing ```

Step 7: Configure Queue Limits

```bash # Set max queue length: rabbitmqctl set_policy max-length "^myqueue$" \ '{"max-length":100000}' --apply-to queues

# Set max length in bytes: rabbitmqctl set_policy max-length-bytes "^myqueue$" \ '{"max-length-bytes":"1GB"}' --apply-to queues

# Set message TTL: rabbitmqctl set_policy message-ttl "^myqueue$" \ '{"message-ttl":86400000}' --apply-to queues # 24 hours

# Combine policies: rabbitmqctl set_policy queue-limits "^myqueue$" \ '{"max-length":100000,"message-ttl":86400000}' --apply-to queues

# Configure via API: curl -u guest:guest -X PUT http://localhost:15672/api/policies/%2F/max-length \ -H "Content-Type: application/json" \ -d '{"pattern":"^myqueue$","definition":{"max-length":100000},"apply-to":"queues"}'

# Check policy applied: rabbitmqctl list_policies ```

Step 8: Check for Poison Messages

```bash # Messages causing consumer crashes:

# Set up dead letter exchange: rabbitmqctl set_policy dlx "^myqueue$" \ '{"dead-letter-exchange":"dlx","dead-letter-routing-key":"failed"}' \ --apply-to queues

# Create dead letter queue: rabbitmqadmin declare queue name=failed_messages durable=true

# Check dead letter queue: rabbitmqctl list_queues name messages | grep failed

# Handle failed messages: # 1. Requeue with delay # 2. Send to error queue # 3. Log and discard

# Set up retry with delay: # Use x-delayed-message plugin or delayed exchange ```

Step 9: Monitor RabbitMQ Metrics

```bash # Enable management plugin: rabbitmq-plugins enable rabbitmq_management

# Check metrics via API: curl -u guest:guest http://localhost:15672/api/overview | jq .

# Key metrics to monitor: # - queue_messages (backlog) # - queue_messages_ready # - queue_messages_unacknowledged # - message_stats.publish_details.rate # - message_stats.deliver_get_details.rate # - consumer_utilisation

# Prometheus integration: # Enable prometheus plugin: rabbitmq-plugins enable rabbitmq_prometheus

# Metrics at: curl http://localhost:15692/metrics

# Grafana dashboard: # Import RabbitMQ dashboard # Set alerts for queue depth

# Alert thresholds: # Warning: queue depth > 10000 # Critical: queue depth > 50000 ```

Step 10: RabbitMQ Verification Script

```bash # Create verification script: cat << 'EOF' > /usr/local/bin/check-rabbitmq-queue.sh #!/bin/bash

QUEUE=${1:-""}

echo "=== RabbitMQ Status ===" rabbitmqctl status | head -20

echo "" echo "=== Queue Overview ===" rabbitmqctl list_queues name messages consumers message_stats

echo "" echo "=== Connection Count ===" rabbitmqctl list_connections | grep -c "running"

echo "" echo "=== Consumer Count ===" rabbitmqctl list_queues name consumers

echo "" echo "=== Message Rates ===" curl -s -u guest:guest http://localhost:15672/api/queues | \ jq '.[] | {name: .name, messages: .messages, consumers: .consumers, publish_rate: .message_stats.publish_details.rate, deliver_rate: .message_stats.deliver_get_details.rate}'

echo "" echo "=== Memory Usage ===" rabbitmqctl memory

echo "" echo "=== Policies ===" rabbitmqctl list_policies

if [ -n "$QUEUE" ]; then echo "" echo "=== Details for $QUEUE ===" curl -s -u guest:guest "http://localhost:15672/api/queues/%2F/$QUEUE" | jq . fi

echo "" echo "=== Backlog Growth Check ===" echo "Wait 10 seconds..." MSG1=$(rabbitmqctl list_queues name messages | grep "^myqueue" | awk '{print $2}') sleep 10 MSG2=$(rabbitmqctl list_queues name messages | grep "^myqueue" | awk '{print $2}') DIFF=$((MSG2 - MSG1)) echo "Messages changed by: $DIFF" if [ $DIFF -gt 0 ]; then echo "Backlog is GROWING" elif [ $DIFF -lt 0 ]; then echo "Backlog is SHRINKING" else echo "Backlog is STABLE" fi EOF

chmod +x /usr/local/bin/check-rabbitmq-queue.sh

# Usage: /usr/local/bin/check-rabbitmq-queue.sh myqueue

# Monitor: watch -n 10 /usr/local/bin/check-rabbitmq-queue.sh ```

RabbitMQ Queue Checklist

CheckCommandExpected
Queue depthlist_queuesStable or decreasing
Consumerslist_queues consumersAdequate count
Publish ratemessage_statsMatches consume rate
Prefetchlist_channels10-50 typically
Consumer utilizationconsumer_utilisation> 70%
Policieslist_policiesLimits configured

Verify the Fix

```bash # After fixing queue backlog

# 1. Check queue depth rabbitmqctl list_queues name messages // Depth decreasing

# 2. Check consumers rabbitmqctl list_queues name consumers // Adequate consumers active

# 3. Monitor rates curl -u guest:guest http://localhost:15672/api/queues/%2F/myqueue | jq .message_stats // Publish <= Deliver rate

# 4. Check consumer utilization // > 80%

# 5. Watch for 5 minutes watch -n 10 'rabbitmqctl list_queues name messages' // Backlog shrinking

# 6. Check for stability # No consumer crashes // Stable processing ```

  • [Fix RabbitMQ Queue Stuck](/articles/fix-rabbitmq-queue-stuck)
  • [Fix Kafka Consumer Lag High](/articles/fix-kafka-consumer-lag-high)
  • [Fix RabbitMQ Connection Refused](/articles/fix-rabbitmq-connection-refused)