What's Actually Happening
Messages accumulate in RabbitMQ queue but consumers are not processing them. Queue depth grows continuously, consumers appear connected but message count doesn't decrease.
The Error You'll See
Queue depth growing:
```bash $ rabbitmqctl list_queues name messages consumers
my-queue 50000 2 # 50,000 messages, 2 consumers connected but not processing ```
Messages ready but not unacked:
```bash $ rabbitmqctl list_queues name messages_ready messages_unacknowledged
my-queue 50000 0 # 50,000 ready, 0 unacked = consumers not receiving messages ```
Management UI shows:
Queue: my-queue
Messages: 50,000 ready, 0 unacked
Consumers: 2
State: runningWhy This Happens
- 1.Consumer not consuming - Consumer connected but not calling basic.consume
- 2.Prefetch too low - Consumer only receives limited messages
- 3.Channel closed - Consumer channel unexpectedly closed
- 4.Dead letter loop - Messages redirecting to dead letter exchange
- 5.Queue exclusive - Queue locked by different connection
- 6.Message rejection - Consumers rejecting/nacking messages
Step 1: Check Queue Status
```bash # List queues with details rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers
# Check queue arguments rabbitmqctl list_queues name arguments
# Check queue policy rabbitmqctl list_queues name policy
# Check queue state rabbitmqctl list_queues name state
# For specific queue details: rabbitmqctl eval 'rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, <<"my-queue">>)).'
# Check queue bindings rabbitmqctl list_bindings source_name destination_name routing_key
# Management API: curl -u guest:guest http://localhost:15672/api/queues/%2F/my-queue ```
Step 2: Check Consumer Connections
```bash # List connections rabbitmqctl list_connections user peer_host peer_port state channels
# List channels rabbitmqctl list_channels pid connection number consumer_count
# List consumers for queue rabbitmqctl list_consumers
# Output shows: # queue_name channel_pid consumer_tag ack_required prefetch_count
# Check if consumers are actually consuming rabbitmqctl eval ' [{rabbit_amqqueue, name, Q}, {rabbit_amqqueue, consumers, C}] = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, <<"my-queue">>)), io:format("Consumers: ~p~n", [C]).'
# Management API: curl -u guest:guest http://localhost:15672/api/queues/%2F/my-queue/bindings curl -u guest:guest http://localhost:15672/api/consumers ```
Step 3: Check Prefetch Settings
```bash # Check consumer prefetch rabbitmqctl list_consumers
# Output includes prefetch_count: my-queue <channel.1> consumer-1 true 10 # Prefetch = 10 means consumer only gets 10 unacked messages at a time
# If prefetch too low and consumers slow: # - Messages wait for ack before next is sent # - Queue appears stuck
# Increase prefetch in consumer: # Python (pika): channel.basic_qos(prefetch_count=100)
# Java: channel.basicQos(100);
# Node.js (amqplib): channel.prefetch(100);
# Rule: prefetch = processing_rate * processing_time # If 10 msg/sec, 1 sec processing: prefetch = 10 # If processing slow, increase prefetch
# Global prefetch (affects all consumers on channel): channel.basicQos(100, true); // Java channel.prefetch(100, true); // Node.js ```
Step 4: Check Consumer Channel State
```bash # Check channel details rabbitmqctl list_channels pid connection state consumer_count
# Channel states: # - running: normal # - flow: blocked by publisher # - idle: no activity # - closing: being closed
# Check channel details via API: curl -u guest:guest http://localhost:15672/api/channels
# Check specific channel: curl -u guest:guest http://localhost:15672/api/channels/<channel-id>
# Check for blocked channels: rabbitmqctl list_channels | grep -v running
# Check connection state: rabbitmqctl list_connections | grep -v running ```
Step 5: Test Consumer Manually
```bash # Consume messages manually to test queue
# Using rabbitmqadmin: rabbitmqadmin get queue=my-queue count=10
# Using rabbitmqctl: rabbitmqctl eval 'rabbit_amqqueue:basic_get( rabbit_misc:r(<<"/">>, queue, <<"my-queue">>), true, <<"test-consumer">> ).'
# Using Python (pika): python -c " import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel() method, properties, body = channel.basic_get('my-queue', auto_ack=True) print(f'Got message: {body}') connection.close() "
# If manual get works, consumer code is the issue # If manual get fails, queue configuration is the issue ```
Step 6: Check Dead Letter Configuration
```bash # Check queue arguments for dead letter rabbitmqctl list_queues name arguments
# Look for: # x-dead-letter-exchange: messages sent to this exchange on reject/nack # x-dead-letter-routing-key: routing key for dead letter
# If dead letter exchange points back to same queue, creates loop! # Messages rejected -> dead letter -> back to queue -> rejected again
# Check dead letter exchange: rabbitmqctl list_exchanges name type
# Trace message flow: # 1. Message in my-queue # 2. Consumer rejects with requeue=false # 3. Message goes to dead-letter-exchange # 4. Check dead-letter-exchange bindings
# List bindings: rabbitmqctl list_bindings
# If dead letter routing creates loop, fix: # 1. Remove dead letter exchange from queue # 2. Or point to different queue # 3. Or handle rejections properly
# Remove dead letter config: rabbitmqctl eval ' rabbit_amqqueue:delete(rabbit_misc:r(<<"/">>, queue, <<"my-queue">>), false, false).' # Then recreate queue without dead letter config ```
Step 7: Check Message TTL
```bash # Check queue TTL settings rabbitmqctl list_queues name arguments
# Look for: # x-message-ttl: messages expire after this time (ms) # x-expires: queue expires if unused
# If x-message-ttl is set, messages may expire before consumed: # Queue args: {<<"x-message-ttl">>, long, 1000} # 1 second TTL
# Check expired messages: # Expired messages are discarded, not shown in ready count
# Remove TTL or increase: # Delete queue and recreate with higher/no TTL
# Check per-message TTL: # Messages may have expiration header set curl -u guest:guest http://localhost:15672/api/queues/%2F/my-queue/get \ -H "content-type: application/json" \ -d '{"count":1,"ackmode":"ack_requeue_true","encoding":"auto"}' ```
Step 8: Check Exclusive Queue
```bash # Check if queue is exclusive rabbitmqctl list_queues name exclusive
# Exclusive queue can only be consumed by declaring connection # If consumer disconnects, exclusive queue is deleted
# Check queue owner: rabbitmqctl list_queues name exclusive owner_pid
# If queue exclusive and consumer different connection: # - Queue locked to original connection # - New consumer cannot consume
# Fix: Recreate queue as non-exclusive # Delete queue: rabbitmqctl eval ' rabbit_amqqueue:delete(rabbit_misc:r(<<"/">>, queue, <<"my-queue">>), false, false).'
# Recreate without exclusive flag: # In consumer code: channel.queue_declare(queue='my-queue', exclusive=False) ```
Step 9: Check Consumer Acknowledgment
```bash # Check ack mode rabbitmqctl list_consumers
# ack_required column: # true: consumer must ack each message # false: auto-ack, messages acked immediately
# If ack_required=true and consumer never acks: # - Messages stay unacked # - Prefetch limit reached # - No new messages delivered
# Check unacked messages: rabbitmqctl list_queues name messages_unacknowledged
# If high unacked count: # 1. Consumer crashed without acking # 2. Consumer processing very slow # 3. Consumer not sending acks
# Fix consumer code: # Python (pika): def callback(ch, method, properties, body): process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) # Always ack
# Java: channel.basicConsume("my-queue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { processMessage(body); getChannel().basicAck(envelope.getDeliveryTag(), false); } }); ```
Step 10: Monitor Queue and Consumer
```bash # Create monitoring script cat << 'EOF' > monitor_rabbitmq_queue.sh #!/bin/bash QUEUE=my-queue
echo "=== Queue Status ===" rabbitmqctl list_queues name messages messages_ready messages_unacknowledged consumers state | grep $QUEUE
echo "" echo "=== Consumers ===" rabbitmqctl list_consumers | grep $QUEUE
echo "" echo "=== Connections ===" rabbitmqctl list_connections user peer_host state
echo "" echo "=== Channels ===" rabbitmqctl list_channels connection state consumer_count
echo "" echo "=== Memory Usage ===" rabbitmqctl status | grep -A 5 "memory" EOF
chmod +x monitor_rabbitmq_queue.sh
# Monitor via Management API: watch -n 5 'curl -s -u guest:guest http://localhost:15672/api/queues/%2F/my-queue | jq "{messages, messages_ready, messages_unacknowledged, consumers}"'
# Prometheus metrics (rabbitmq_exporter): # rabbitmq_queue_messages_ready # rabbitmq_queue_messages_unacked # rabbitmq_queue_consumers ```
RabbitMQ Queue Consumption Checklist
| Check | Command | Expected |
|---|---|---|
| Consumers connected | list_consumers | Consumers listed |
| Prefetch reasonable | list_consumers | prefetch > 1 |
| Messages unacked | list_queues | Not stuck at prefetch |
| Consumer acking | consumer logs | Acks being sent |
| Dead letter config | list_queues args | No loop |
| TTL appropriate | list_queues args | TTL > processing time |
Verify the Fix
```bash # After fixing consumer issues
# 1. Check consumers active rabbitmqctl list_consumers // Shows consumers for queue
# 2. Monitor message count decreasing watch -n 5 'rabbitmqctl list_queues name messages | grep my-queue' // Messages should decrease
# 3. Check unacked messages reasonable rabbitmqctl list_queues name messages_unacknowledged | grep my-queue // Should be <= prefetch * consumers
# 4. Verify consumer logs # Should show messages being processed and acked
# 5. Test throughput # Publish messages, verify consumption rate rabbitmqadmin publish routing_key=my-queue payload="test" // Should be consumed quickly ```
Related Issues
- [Fix RabbitMQ Queue Not Consuming](/articles/fix-rabbitmq-queue-not-consuming)
- [Fix RabbitMQ Memory High Watermark](/articles/fix-rabbitmq-memory-high-watermark)
- [Fix RabbitMQ Publisher Confirms Timeout](/articles/fix-rabbitmq-publisher-confirms-timeout)