What's Actually Happening
Your RabbitMQ consumer stops processing messages even though messages are in the queue. The consumer appears connected but doesn't acknowledge or process new messages, causing message backlog and delivery failures.
The Error You'll See
Consumer not processing:
```bash $ rabbitmqctl list_queues name consumers messages_unacknowledged
Listing queues ... myqueue 1 500 # 500 unacked messages, 1 consumer not processing ```
Queue growing:
```bash $ rabbitmqctl list_queues name messages
myqueue 10000 # Messages piling up ```
Consumer channel blocked:
```bash $ rabbitmqctl list_channels pid state
<channel.1.0> flow # Channel is blocked ```
Why This Happens
- 1.Prefetch too high - Consumer has too many unacked messages
- 2.Consumer processing slow - Message processing takes too long
- 3.Network partition - Consumer disconnected but not detected
- 4.Memory alarm - Broker blocked publishers
- 5.Consumer exception - Application error stopped consuming
- 6.Channel closed - Unexpected channel closure
- 7.Poison message - Message causes infinite retry
- 8.Deadlock in app - Consumer thread blocked
Step 1: Diagnose Queue and Consumer Status
```bash # Check queue status: rabbitmqctl list_queues name messages consumers messages_unacknowledged
# Check consumer details: rabbitmqctl list_consumers
# Output: # queue_name consumer_tag ack_required prefetch_count arguments
# Check channel state: rabbitmqctl list_channels pid state messages_unacknowledged
# Check connections: rabbitmqctl list_connections pid state channels
# Check node status: rabbitmqctl status
# Use rabbitmq_management UI: # Navigate to Queues tab # Look for "Ready" vs "Unacked" message counts ```
Step 2: Check and Fix Prefetch Settings
```python # Python pika consumer with prefetch:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
# WRONG: Prefetch too high (default can be unlimited) # channel.basic_qos(prefetch_count=0) # Unlimited - causes issues
# CORRECT: Set reasonable prefetch: channel.basic_qos(prefetch_count=10)
def callback(ch, method, properties, body): try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(f"Error: {e}") # Nack and requeue or reject ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='myqueue', on_message_callback=callback) channel.start_consuming() ```
```java // Java RabbitMQ consumer: ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
// Set prefetch count: channel.basicQos(10); // Only 10 unacked messages at a time
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); try { processMessage(message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true); } };
channel.basicConsume("myqueue", false, deliverCallback, consumerTag -> {}); ```
Step 3: Handle Unacked Messages
```bash # Check unacked messages: rabbitmqctl list_queues name messages_unacknowledged
# Option 1: Cancel consumer to release messages: rabbitmqctl close_connection "<connection_pid>" "Consumer stuck"
# Option 2: Purge queue (removes all messages - use carefully): rabbitmqctl purge_queue myqueue
# Option 3: Delete and recreate queue: rabbitmqctl delete_queue myqueue # Then recreate with application
# For application-level handling: # 1. Restart consumer application # 2. Messages will be re-queued automatically ```
Step 4: Fix Consumer Timeouts
```python # Add heartbeat and timeout:
import pika
# Connection with heartbeat: connection = pika.BlockingConnection( pika.ConnectionParameters( host='localhost', heartbeat=600, # 10 minutes blocked_connection_timeout=300 # 5 minutes ) )
# Use asynchronous connection for long processing: def on_open(connection): connection.channel(on_open_callback=on_channel_open)
def on_channel_open(channel): channel.basic_qos(prefetch_count=10) channel.basic_consume(queue='myqueue', on_message_callback=callback)
parameters = pika.ConnectionParameters( host='localhost', heartbeat=600, blocked_connection_timeout=300 ) connection = pika.SelectConnection(parameters, on_open_callback=on_open)
try: connection.ioloop.start() except KeyboardInterrupt: connection.close() connection.ioloop.start() ```
Step 5: Handle Poison Messages
```python # Implement dead letter queue:
def callback(ch, method, properties, body): max_retries = 3 retry_count = properties.headers.get('x-retry-count', 0)
try: process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: if retry_count < max_retries: # Retry with incremented count properties.headers = properties.headers or {} properties.headers['x-retry-count'] = retry_count + 1 ch.basic_publish( exchange='', routing_key='myqueue', body=body, properties=properties ) ch.basic_ack(delivery_tag=method.delivery_tag) else: # Send to dead letter queue ch.basic_publish( exchange='', routing_key='myqueue.dlq', body=body, properties=properties ) ch.basic_ack(delivery_tag=method.delivery_tag) ```
# Create dead letter queue in RabbitMQ:
rabbitmqctl eval '
rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, "myqueue.dlq"),
true, false, [], none, <<"guest">>
).'Step 6: Monitor Consumer Health
```bash # Monitoring script: cat << 'EOF' > /usr/local/bin/monitor-rabbitmq.sh #!/bin/bash
echo "=== RabbitMQ Consumer Health ==="
echo -e "\nQueue Status:" rabbitmqctl list_queues name messages consumers messages_unacknowledged
echo -e "\nUnacked per Consumer:" rabbitmqctl list_consumers
echo -e "\nChannel State:" rabbitmqctl list_channels pid state messages_unacknowledged
echo -e "\nBlocked Connections:" rabbitmqctl list_connections pid state | grep -v "running"
echo -e "\nMemory Alarm:" rabbitmqctl status | grep -A 5 "memory_alarm"
echo -e "\nDisk Alarm:" rabbitmqctl status | grep -A 5 "disk_free_alarm" EOF
chmod +x /usr/local/bin/monitor-rabbitmq.sh
# Check specific queue: rabbitmqctl list_queues name messages consumers --formatter pretty_table ```
Step 7: Fix Memory and Disk Alarms
```bash # Check memory usage: rabbitmqctl status | grep -A 20 "Memory"
# Check memory watermark: rabbitmqctl eval 'vm_memory_monitor:get_memory_limit().'
# Increase memory limit: rabbitmqctl set_vm_memory_high_watermark 0.6 # 60% of RAM
# Or in rabbitmq.conf: # vm_memory_high_watermark.relative = 0.6 # vm_memory_high_watermark.absolute = 4GB
# Check disk space: rabbitmqctl status | grep -A 5 "disk_free"
# Set disk free limit: rabbitmqctl set_disk_free_limit 5GB
# Or in rabbitmq.conf: # disk_free_limit.absolute = 5GB
# Clear alarms: rabbitmqctl clear_memory_alarm rabbitmqctl clear_disk_free_alarm ```
Step 8: Implement Consumer Recovery
```python # Robust consumer with auto-recovery:
import pika import time import logging
logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__)
class RobustConsumer: def __init__(self, host, queue): self.host = host self.queue = queue self.connection = None self.channel = None self.closing = False
def connect(self): while not self.closing: try: self.connection = pika.BlockingConnection( pika.ConnectionParameters( host=self.host, heartbeat=600, blocked_connection_timeout=300 ) ) self.channel = self.connection.channel() self.channel.basic_qos(prefetch_count=10) self.channel.basic_consume( queue=self.queue, on_message_callback=self.on_message ) logger.info("Connected and consuming") self.channel.start_consuming() except pika.exceptions.AMQPConnectionError as e: logger.error(f"Connection error: {e}, retrying in 5s") time.sleep(5) except Exception as e: logger.error(f"Unexpected error: {e}") time.sleep(5)
def on_message(self, ch, method, properties, body): try: self.process_message(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: logger.error(f"Processing error: {e}") ch.basic_nack( delivery_tag=method.delivery_tag, requeue=True )
def process_message(self, body): # Your processing logic logger.info(f"Processing: {body[:50]}") time.sleep(0.1) # Simulate work
def close(self): self.closing = True if self.connection: self.connection.close()
# Usage: consumer = RobustConsumer('localhost', 'myqueue') try: consumer.connect() finally: consumer.close() ```
Step 9: Configure for High Throughput
```bash # RabbitMQ configuration for performance: # /etc/rabbitmq/rabbitmq.conf
# Increase file descriptors: # systemd: # [Service] # LimitNOFILE=65536
# In rabbitmq.conf: consumer_timeout = 1800000 # 30 minutes channel_tick_interval = 5000
# Erlang VM tuning: # /etc/rabbitmq/advanced.config [ {rabbit, [ {consumer_prefetch, [{default, 10}]} ]}, {kernel, [ {net_ticktime, 120} ]} ].
# Apply configuration: rabbitmqctl stop_app rabbitmqctl start_app ```
Step 10: Production Best Practices
```yaml # Complete consumer configuration:
import pika import json import logging from typing import Callable
class ProductionConsumer: def __init__(self, config: dict): self.config = config self.logger = logging.getLogger(__name__) self.connection = None self.channel = None
def connect(self): credentials = pika.PlainCredentials( self.config['username'], self.config['password'] ) parameters = pika.ConnectionParameters( host=self.config['host'], port=self.config['port'], virtual_host=self.config['vhost'], credentials=credentials, heartbeat=self.config.get('heartbeat', 600), blocked_connection_timeout=self.config.get('blocked_timeout', 300) )
self.connection = pika.BlockingConnection(parameters) self.channel = self.connection.channel()
# Set prefetch self.channel.basic_qos( prefetch_count=self.config.get('prefetch', 10) )
def consume(self, queue: str, callback: Callable): def wrapper(ch, method, properties, body): try: message = json.loads(body) callback(message) ch.basic_ack(delivery_tag=method.delivery_tag) self.logger.info(f"Processed message: {message.get('id')}") except json.JSONDecodeError as e: self.logger.error(f"Invalid JSON: {e}") ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: self.logger.error(f"Processing error: {e}") ch.basic_nack( delivery_tag=method.delivery_tag, requeue=self.config.get('requeue_on_error', False) )
self.channel.basic_consume( queue=queue, on_message_callback=wrapper )
self.logger.info(f"Consuming from {queue}") self.channel.start_consuming()
# Usage: config = { 'host': 'localhost', 'port': 5672, 'username': 'guest', 'password': 'guest', 'vhost': '/', 'prefetch': 10, 'heartbeat': 600, 'requeue_on_error': False }
consumer = ProductionConsumer(config) consumer.connect()
def process_message(message): print(f"Processing: {message}")
consumer.consume('myqueue', process_message) ```
RabbitMQ Consumer Troubleshooting Checklist
| Check | Command | Expected |
|---|---|---|
| Queue messages | list_queues | Not growing |
| Unacked count | list_queues | Below prefetch |
| Consumer active | list_consumers | Consumer listed |
| Channel state | list_channels | Not blocked |
| Memory alarm | status | No alarm |
| Prefetch set | list_consumers | Prefetch > 0 |
Verify the Fix
```bash # After fixing consumer:
# 1. Check unacked decreasing rabbitmqctl list_queues name messages_unacknowledged # Output: Decreasing unacked count
# 2. Check consumers active rabbitmqctl list_consumers # Output: Consumer with active tag
# 3. Monitor queue draining watch 'rabbitmqctl list_queues name messages' # Output: Messages decreasing
# 4. Verify no blocks rabbitmqctl list_channels pid state # Output: All channels running
# Compare before/after: # Before: 500 unacked, consumer stuck # After: 10 unacked, messages processing ```
Related Issues
- [Fix RabbitMQ Connection Refused](/articles/fix-rabbitmq-connection-refused)
- [Fix Kafka Consumer Lag](/articles/fix-kafka-consumer-lag)
- [Fix ActiveMQ Slow Consumer](/articles/fix-activemq-slow-consumer)