What's Actually Happening

NATS JetStream consumer accumulates pending messages that are not being processed. Message backlog grows over time, causing delayed processing and potential storage issues.

The Error You'll See

High pending count:

```bash $ nats consumer info ORDERS my-consumer

Consumer: my-consumer Pending: 150000 Ack Pending: 5000 Redelivered: 200 Waiting: 1 ```

Slow processing:

```bash $ nats consumer report ORDERS

Consumer Pending Ack Pending Redelivered my-consumer 150000 5000 200 # Pending growing continuously ```

Store exceeded:

bash
ERROR: Maximum store of 1GB exceeded for stream ORDERS

Why This Happens

  1. 1.Slow processing - Consumer cannot keep up with message rate
  2. 2.Ack not sent - Messages not acknowledged properly
  3. 3.Consumer down - No consumer running to process messages
  4. 4.Ack timeout too short - Messages redelivered before processing complete
  5. 5.Max ack pending - Too many unacknowledged messages
  6. 6.Processing errors - Consumer fails and messages retry

Step 1: Check Consumer Status

```bash # List all consumers: nats consumer ls ORDERS

# Get consumer info: nats consumer info ORDERS my-consumer

# Key fields: # Pending: Messages not yet delivered # Ack Pending: Delivered but not acked # Redelivered: Messages redelivered due to timeout

# View consumer report: nats consumer report ORDERS

# Check stream info: nats stream info ORDERS

# Stream state shows: # Messages: Total messages # Bytes: Total storage # First/Last Sequence: Message range ```

Step 2: Check Consumer Configuration

```bash # View consumer configuration: nats consumer info ORDERS my-consumer --json

# Key settings: { "ack_policy": "explicit", // Must ack each message "ack_wait": 30000000000, // 30 seconds before redelivery "max_ack_pending": 1000, // Max unacked messages "max_deliver": 10, // Max redelivery attempts "deliver_policy": "all" // Deliver all messages }

# If ack_wait too short, messages redelivered: nats consumer update ORDERS my-consumer --ack-wait 60s

# Increase max ack pending: nats consumer update ORDERS my-consumer --max-pending 5000

# Check delivery policy: nats consumer update ORDERS my-consumer --deliver all

# Create optimized consumer: nats consumer add ORDERS my-consumer \ --deliver all \ --ack explicit \ --ack-wait 60s \ --max-pending 5000 \ --replay instant ```

Step 3: Check Ack Handling

```bash # Messages must be acknowledged:

# Correct ack handling (Go): sub, _ := js.Subscribe("ORDERS.my-consumer", func(msg *nats.Msg) { // Process message processMessage(msg) // Acknowledge msg.Ack() }, nats.Durable("my-consumer"), nats.ManualAck())

# Common issues: # 1. Forgetting to ack: sub.Subscribe("subject", func(msg *nats.Msg) { processMessage(msg) // Missing: msg.Ack() })

# 2. Ack after async operation: sub.Subscribe("subject", func(msg *nats.Msg) { go func() { processMessage(msg) msg.Ack() // May race or be missed }() })

# 3. Error handling without ack/nak: sub.Subscribe("subject", func(msg *nats.Msg) { if err := processMessage(msg); err != nil { // Missing: msg.Nak() return } msg.Ack() })

# Proper error handling: sub.Subscribe("subject", func(msg *nats.Msg) { if err := processMessage(msg); err != nil { msg.Nak() // Negative ack for reprocessing return } msg.Ack() }) ```

Step 4: Check Processing Rate

```bash # Monitor message rate: nats server report

# Check message stats: nats stream info ORDERS --json | jq '.state.messages'

# Calculate rate: # Run twice, 60 seconds apart: nats stream info ORDERS --json | jq '.state.messages' sleep 60 nats stream info ORDERS --json | jq '.state.messages'

# If producer rate > consumer rate, backlog grows

# Check consumer lag: nats consumer report ORDERS

# Lag = Pending messages # Growing lag = consumer too slow

# Scale consumers: # Option 1: Multiple consumer instances # Option 2: Queue group for parallel processing js.QueueSubscribe("ORDERS", "worker-group", handler, nats.Durable("my-consumer"))

# Option 3: Increase consumer capacity # Optimize processing code, use parallelism ```

Step 5: Check Consumer Health

```bash # Check if consumer process running: ps aux | grep nats-consumer

# Check NATS connections: nats server connections

# Look for consumer connections: nats server connections --json | jq '.connections[] | select(.name == "my-consumer")'

# Check connection state: nats server connections --json | jq '.connections[] | {name: .name, state: .state}'

# If consumer down: # Restart consumer application systemctl restart my-nats-consumer

# Or if using systemd: journalctl -u my-nats-consumer -f

# Check for errors in consumer logs: grep -i "error|exception" /var/log/my-consumer.log ```

Step 6: Adjust Ack Wait Time

```bash # If messages take long to process, increase ack_wait:

# Check average processing time: # Add metrics to consumer code to track

# Update consumer: nats consumer update ORDERS my-consumer --ack-wait 120s

# Or in code: sub, _ := js.Subscribe("ORDERS", handler, nats.Durable("my-consumer"), nats.AckWait(2*time.Minute), nats.ManualAck())

# Rule: ack_wait > max processing time + margin # Example: Processing takes 30s, set ack_wait to 60s

# Progress ack for long operations: sub.Subscribe("ORDERS", func(msg *nats.Msg) { for i := 0; i < 10; i++ { processChunk(i) msg.InProgress() // Tell NATS we're still working } msg.Ack() }) ```

Step 7: Handle Redelivery

```bash # Check redelivery count: nats consumer info ORDERS my-consumer | grep Redelivered

# High redelivery indicates: # 1. Processing failures # 2. Ack timeout too short # 3. Consumer crashes

# Set max deliver limit: nats consumer update ORDERS my-consumer --max-deliver 5

# After max deliver, messages go to: # - Dead letter queue (if configured) # - Removed from pending

# Configure DLQ: nats stream add ORDERS_DLQ --subjects "ORDERS.dlq" nats consumer update ORDERS my-consumer --deliver-to "ORDERS.dlq"

# Or create with DLQ: nats consumer add ORDERS my-consumer \ --deliver all \ --max-deliver 5 \ --deliver-to "ORDERS.dlq"

# Check DLQ: nats stream info ORDERS_DLQ ```

Step 8: Purge Old Messages

```bash # If backlog too large, purge old messages:

# Purge all pending: nats stream purge ORDERS

# Purge specific consumer: nats consumer update ORDERS my-consumer --deliver last # Starts from last message, skips backlog

# Purge messages older than: nats stream purge ORDERS --keep 10000 # Keep last 10k nats stream purge ORDERS --older 24h # Remove older than 24h

# Set stream limits: nats stream update ORDERS \ --max-msgs 1000000 \ --max-bytes 10GB \ --max-age 168h

# Stream will auto-trim to limits

# Check stream state: nats stream info ORDERS ```

Step 9: Monitor JetStream

```bash # Create monitoring script: cat << 'EOF' > /usr/local/bin/monitor-nats.sh #!/bin/bash

echo "=== Stream Status ===" nats stream report

echo "" echo "=== Consumer Status ===" nats consumer report ORDERS

echo "" echo "=== High Pending Consumers ===" nats consumer ls ORDERS | while read consumer; do pending=$(nats consumer info ORDERS $consumer --json | jq '.num_pending') if [ "$pending" -gt 1000 ]; then echo "WARNING: $consumer has $pending pending messages" fi done

echo "" echo "=== Server Stats ===" nats server info EOF

chmod +x /usr/local/bin/monitor-nats.sh

# NATS exposes Prometheus metrics: # Enable in config: # http_port: 8222

curl http://nats:8222/metrics | grep nats

# Key metrics: # nats_stream_messages - messages in stream # nats_consumer_pending - pending messages # nats_consumer_ack_pending - unacked messages

# Alert for high pending: - alert: NATSConsumerPending expr: nats_consumer_pending > 10000 for: 5m labels: severity: warning annotations: summary: "NATS consumer has high pending messages" ```

Step 10: Optimize Consumer Performance

```bash # Increase max ack pending for parallel processing: nats consumer update ORDERS my-consumer --max-pending 10000

# Use pull consumer for batch processing: nats consumer add ORDERS pull-consumer \ --pull \ --deliver all \ --max-pending 10000

# Pull batch in code: msgs, _ := sub.Fetch(100, nats.MaxWait(5*time.Second)) for _, msg := range msgs { processMessage(msg) msg.Ack() }

# Use ack all for batch: js.Publish("ORDERS", data, nats.AckWaitAll())

# Parallel processing with queue: for i := 0; i < 10; i++ { go func() { sub, _ := js.QueueSubscribe("ORDERS", "workers", handler) defer sub.Unsubscribe() }() }

# Check throughput: nats consumer report ORDERS # Monitor pending trend ```

NATS JetStream Pending Checklist

CheckCommandExpected
Pending countconsumer infoStable/declining
Ack pendingconsumer infoLow
Consumer runningps/processActive
Ack handlingapplicationEach msg acked
Ack waitconsumer config> processing time
Max pendingconsumer configAdequate

Verify the Fix

```bash # After fixing consumer issues

# 1. Check pending decreasing nats consumer report ORDERS // Pending declining or stable

# 2. Verify ack handling # Check application logs // Each message acked

# 3. Test processing rate nats stream info ORDERS --json | jq '.state.messages' sleep 60 nats stream info ORDERS --json | jq '.state.messages' // Rate matches producer

# 4. Check no redeliveries nats consumer info ORDERS my-consumer // Redelivered count low

# 5. Monitor under load # Send test messages nats pub ORDERS "test message" nats consumer report ORDERS // Processing normally

# 6. Check stream health nats stream info ORDERS // Within limits ```

  • [Fix NATS JetStream Storage Limit](/articles/fix-nats-jetstream-storage-limit)
  • [Fix NATS Subscription Permission Denied](/articles/fix-nats-subscription-permission-denied)
  • [Fix Kafka Consumer Lag Growing](/articles/fix-kafka-consumer-lag-growing)