What's Actually Happening
Your Redis Stream consumer group is experiencing lag where messages pile up in the pending list, consumers aren't processing messages fast enough, or some messages get stuck with inactive consumers and are never processed.
The Error You'll See
Pending messages growing:
```bash $ redis-cli XPENDING mystream mygroup
1) (integer) 15000 # 15,000 pending messages! 2) "1704067200000-0" # First pending ID 3) "1704153600000-0" # Last pending ID 4) 1) 1) "consumer-1" 2) (integer) 5000 2) 1) "consumer-2" 2) (integer) 5000 3) 1) "consumer-3" # This consumer is idle! 2) (integer) 5000 ```
Consumer not claiming messages:
```bash $ redis-cli XINFO CONSUMERS mystream mygroup
1) 1) name 2) "consumer-1" 3) pending 4) (integer) 5000 5) idle 6) (integer) 3600000 # Idle for 1 hour! ```
Processing falling behind:
```bash $ redis-cli XINFO STREAM mystream
1) length 2) (integer) 100000 # Stream has 100K messages 3) groups 4) (integer) 1 5) last-generated-id 6) "1704153600000-0" 7) first-entry 8) 1) "1704067200000-0" 2) 1) "field" 2) "value"
# Consumer group is far behind last-generated-id ```
Why This Happens
- 1.Slow consumer processing - Consumers can't keep up with message rate
- 2.Consumer crash without ACK - Messages stuck pending with dead consumer
- 3.Idle consumers holding messages - Consumers not processing their pending messages
- 4.No message reclamation - Pending messages never reclaimed from idle consumers
- 5.Wrong consumer configuration - Too few consumers for message volume
- 6.Blocking read timeout - XREADGROUP timeout too short
- 7.Batch size too small - Reading one message at a time
- 8.Network latency - High latency between app and Redis
Step 1: Analyze Stream and Consumer Group Status
```bash # Get stream info: redis-cli XINFO STREAM mystream
# Output analysis: # length: Total messages in stream # groups: Number of consumer groups # last-generated-id: Most recent message ID # first-entry: Oldest message still in stream
# Get consumer group info: redis-cli XINFO GROUPS mystream
# Output: # name: Group name # consumers: Number of consumers # pending: Number of pending (unacknowledged) messages # last-delivered-id: Last ID delivered to group
# Get consumer info: redis-cli XINFO CONSUMERS mystream mygroup
# Output for each consumer: # name: Consumer name # pending: Number of pending messages # idle: Time since last interaction (ms)
# Get pending messages summary: redis-cli XPENDING mystream mygroup
# Output: # 1) Total pending count # 2) First pending ID # 3) Last pending ID # 4) Consumer breakdown
# Get detailed pending entries: redis-cli XPENDING mystream mygroup - + 10
# Output shows: # ID, consumer, idle time, delivery count
# Check stream length vs pending: STREAM_LEN=$(redis-cli XLEN mystream) PENDING=$(redis-cli XPENDING mystream mygroup | head -1) echo "Stream: $STREAM_LEN, Pending: $PENDING" LAG=$((STREAM_LEN - PENDING)) echo "Lag: $LAG messages"
# Monitor in real-time: watch -n 1 'redis-cli XPENDING mystream mygroup | head -1' ```
Step 2: Identify Idle Consumers and Claim Messages
```bash # Find idle consumers: redis-cli XINFO CONSUMERS mystream mygroup | \ awk '/name/{name=$3} /idle/{if($3>60000) print name, $3}'
# Output: consumers idle > 60 seconds
# Claim messages from idle consumer: # XCLAIM stream group consumer min-idle-time ID [ID...]
redis-cli XCLAIM mystream mygroup active-consumer 60000 0-1 0-2 0-3
# Claims messages idle > 60 seconds
# Claim with XAUTOCLAIM (Redis 6.2+): # XAUTOCLAIM stream group consumer min-idle-time start [count] [JUSTID]
redis-cli XAUTOCLAIM mystream mygroup active-consumer 60000 0 COUNT 100
# Returns: # 1) Next start ID for further claiming # 2) List of claimed messages
# Script to reclaim all messages from idle consumers: #!/bin/bash STREAM="mystream" GROUP="mygroup" MIN_IDLE=60000 # 60 seconds BATCH_SIZE=100
# Get all consumers CONSUMERS=$(redis-cli XINFO CONSUMERS $STREAM $GROUP | \ grep "name" | awk '{print $3}')
for CONSUMER in $CONSUMERS; do IDLE=$(redis-cli XINFO CONSUMERS $STREAM $GROUP | \ grep -A1 "name.*$CONSUMER" | grep "idle" | awk '{print $3}')
if [ "$IDLE" -gt "$MIN_IDLE" ]; then echo "Reclaiming from $CONSUMER (idle: ${IDLE}ms)"
# Claim all pending messages START="0" while true; do RESULT=$(redis-cli XAUTOCLAIM $STREAM $GROUP \ active-worker $MIN_IDLE $START COUNT $BATCH_SIZE)
NEXT_START=$(echo "$RESULT" | head -1) CLAIMED=$(echo "$RESULT" | tail -n +2)
if [ -z "$CLAIMED" ] || [ "$NEXT_START" = "0-0" ]; then break fi
START=$NEXT_START echo "Claimed batch, next start: $START" done fi done
# Delete idle consumer after claiming: redis-cli XGROUP DELCONSUMER mystream mygroup idle-consumer ```
Step 3: Implement Proper Message Claiming in Code
```python # Python implementation with redis-py import redis import time import threading
class StreamConsumer: def __init__(self, stream_name, group_name, consumer_name, idle_timeout=60000, claim_interval=30000): self.redis = redis.Redis(host='localhost', port=6379) self.stream = stream_name self.group = group_name self.consumer = consumer_name self.idle_timeout = idle_timeout # ms self.claim_interval = claim_interval # ms
def process_message(self, message_id, message_data): """Process a single message - override this""" print(f"Processing {message_id}: {message_data}") time.sleep(0.1) # Simulate work return True
def read_messages(self, count=10, block=5000): """Read new messages from stream""" messages = self.redis.xreadgroup( groupname=self.group, consumername=self.consumer, streams={self.stream: '>'}, # '>' means new messages only count=count, block=block ) return messages or []
def read_pending_messages(self, count=10): """Read own pending messages""" messages = self.redis.xreadgroup( groupname=self.group, consumername=self.consumer, streams={self.stream: '0'}, # '0' means pending messages count=count ) return messages or []
def claim_idle_messages(self, count=100): """Claim messages from idle consumers""" # Use XAUTOCLAIM for efficiency result = self.redis.execute_command( 'XAUTOCLAIM', self.stream, self.group, self.consumer, str(self.idle_timeout), '0', 'COUNT', str(count) )
if result and len(result) > 1: # result[0] is next cursor # result[1] is list of claimed messages return result[1] return []
def ack_message(self, message_id): """Acknowledge processed message""" self.redis.xack(self.stream, self.group, message_id)
def run(self): """Main consumer loop""" # Start claim thread claim_thread = threading.Thread(target=self._claim_loop, daemon=True) claim_thread.start()
while True: # First, process own pending messages pending = self.read_pending_messages(count=10) if pending: for stream, messages in pending: for msg_id, msg_data in messages: if self.process_message(msg_id, msg_data): self.ack_message(msg_id) continue # Process all pending before new
# Then read new messages messages = self.read_messages(count=10) if messages: for stream, msgs in messages: for msg_id, msg_data in msgs: if self.process_message(msg_id, msg_data): self.ack_message(msg_id)
def _claim_loop(self): """Periodically claim idle messages""" while True: time.sleep(self.claim_interval / 1000)
try: claimed = self.claim_idle_messages(count=100) if claimed: print(f"Claimed {len(claimed)} idle messages") for msg in claimed: msg_id = msg[0] msg_data = msg[1] if self.process_message(msg_id, msg_data): self.ack_message(msg_id) except Exception as e: print(f"Claim error: {e}")
# Usage consumer = StreamConsumer('mystream', 'mygroup', 'worker-1') consumer.run() ```
Step 4: Configure Optimal Batch Sizes and Timeouts
```bash # Calculate optimal configuration:
# 1. Measure message processing time: START=$(date +%s%N) redis-cli XRANGE mystream - + 1 # Process message END=$(date +%s%N) PROC_TIME=$(( ($END - $START) / 1000000 )) echo "Processing time: ${PROC_TIME}ms"
# 2. Calculate messages per second per consumer: MSG_PER_SEC=$(( 1000 / PROC_TIME )) echo "Messages/sec per consumer: $MSG_PER_SEC"
# 3. Determine needed consumers: TARGET_RATE=10000 # messages per second NEEDED_CONSUMERS=$(( TARGET_RATE / MSG_PER_SEC + 1 )) echo "Needed consumers: $NEEDED_CONSUMERS"
# 4. Configure batch size: # Higher batch = better throughput, higher latency # Rule: batch_size = min(100, max(10, msg_per_sec / 10))
BATCH_SIZE=50
# 5. Configure block timeout: # Should be > expected message arrival interval # But not too long to miss claiming opportunities BLOCK_TIMEOUT=5000 # 5 seconds
# Redis configuration: # Increase output buffer for large batches redis-cli CONFIG SET client-output-buffer-limit "normal 0 0 0" redis-cli CONFIG SET client-output-buffer-limit "replica 0 0 0"
# Optimize stream (trim old messages): # Keep last 100,000 messages redis-cli XTRIM mystream MAXLEN ~ 100000
# Or use XADD with MAXLEN: redis-cli XADD mystream MAXLEN ~ 100000 * field value ```
```python # Optimal consumer configuration class OptimizedStreamConsumer: def __init__(self, stream_name, group_name, consumer_name): self.redis = redis.Redis( host='localhost', port=6379, socket_timeout=10, socket_connect_timeout=5, retry_on_timeout=True, health_check_interval=30 ) self.stream = stream_name self.group = group_name self.consumer = consumer_name
# Configuration based on message processing time self.batch_size = 50 self.block_timeout = 5000 self.idle_timeout = 60000 self.claim_batch_size = 100 self.claim_interval = 30000
# Metrics self.processed = 0 self.claimed = 0 self.errors = 0
def read_batch(self): """Read batch of messages efficiently""" # Use pipeline for efficiency pipe = self.redis.pipeline()
# Read new messages pipe.xreadgroup( groupname=self.group, consumername=self.consumer, streams={self.stream: '>'}, count=self.batch_size, block=self.block_timeout )
result = pipe.execute() return result[0] if result else []
def ack_batch(self, message_ids): """Acknowledge multiple messages""" if not message_ids: return
pipe = self.redis.pipeline() for msg_id in message_ids: pipe.xack(self.stream, self.group, msg_id) pipe.execute()
def process_batch(self, messages): """Process batch of messages""" processed_ids = []
for stream, msgs in messages: for msg_id, msg_data in msgs: try: # Process message if self._process_single(msg_id, msg_data): processed_ids.append(msg_id) self.processed += 1 except Exception as e: self.errors += 1 print(f"Error processing {msg_id}: {e}")
# Ack all processed at once self.ack_batch(processed_ids)
return len(processed_ids)
def _process_single(self, msg_id, msg_data): """Override for actual processing""" # Your processing logic here return True ```
Step 5: Monitor and Alert on Lag
```bash # Monitoring script: #!/bin/bash
STREAM="mystream" GROUP="mygroup" ALERT_THRESHOLD=1000 REDIS_HOST="localhost"
# Get metrics PENDING=$(redis-cli -h $REDIS_HOST XPENDING $STREAM $GROUP 2>/dev/null | head -1) STREAM_LEN=$(redis-cli -h $REDIS_HOST XLEN $STREAM) CONSUMERS=$(redis-cli -h $REDIS_HOST XINFO CONSUMERS $STREAM $GROUP 2>/dev/null)
# Calculate lag if [ -n "$PENDING" ] && [ "$PENDING" -gt "$ALERT_THRESHOLD" ]; then echo "ALERT: Pending messages ($PENDING) exceeds threshold ($ALERT_THRESHOLD)"
# Find idle consumers echo "$CONSUMERS" | awk ' /name/{name=$3} /pending/{pending=$3} /idle/{ idle=$3 if(idle>60000) { print "Consumer " name " has " pending " pending messages, idle for " idle "ms" } } ' fi
# Export to Prometheus format: cat << EOF > /var/lib/node_exporter/textfile_collector/redis_stream.prom redis_stream_pending_messages{stream="$STREAM",group="$GROUP"} $PENDING redis_stream_length{stream="$STREAM"} $STREAM_LEN redis_stream_lag_messages{stream="$STREAM",group="$GROUP"} $((STREAM_LEN - PENDING)) EOF ```
```python # Python monitoring with alerts import redis import time from prometheus_client import Counter, Gauge, start_http_server
class StreamMonitor: def __init__(self, stream_name, group_name, alert_threshold=1000): self.redis = redis.Redis() self.stream = stream_name self.group = group_name self.alert_threshold = alert_threshold
# Prometheus metrics self.pending_gauge = Gauge( 'redis_stream_pending_messages', 'Pending messages in stream', ['stream', 'group'] ) self.lag_gauge = Gauge( 'redis_stream_lag_messages', 'Lag between stream and consumer group', ['stream', 'group'] ) self.consumer_idle_gauge = Gauge( 'redis_stream_consumer_idle_seconds', 'Consumer idle time', ['stream', 'group', 'consumer'] )
def collect_metrics(self): # Get pending count pending_info = self.redis.xpending(self.stream, self.group) pending_count = pending_info[0] if pending_info else 0
# Get stream length stream_len = self.redis.xlen(self.stream)
# Calculate lag lag = stream_len - pending_count
# Update metrics self.pending_gauge.labels(self.stream, self.group).set(pending_count) self.lag_gauge.labels(self.stream, self.group).set(lag)
# Check threshold if pending_count > self.alert_threshold: self.send_alert(pending_count)
# Get consumer info consumers = self.redis.xinfo_consumers(self.stream, self.group) for consumer in consumers: name = consumer['name'] idle_ms = consumer['idle'] self.consumer_idle_gauge.labels( self.stream, self.group, name ).set(idle_ms / 1000)
if idle_ms > 60000: # Idle > 60s self.send_consumer_alert(name, idle_ms)
def send_alert(self, pending_count): print(f"ALERT: {pending_count} pending messages in {self.stream}") # Send to alerting system (PagerDuty, Slack, etc.)
def send_consumer_alert(self, consumer, idle_ms): print(f"ALERT: Consumer {consumer} idle for {idle_ms}ms")
def run(self): start_http_server(8000) while True: self.collect_metrics() time.sleep(10)
# Usage monitor = StreamMonitor('mystream', 'mygroup') monitor.run() ```
Step 6: Handle Consumer Failover Gracefully
```python import redis import os import signal import sys import time from threading import Event
class GracefulConsumer: def __init__(self, stream_name, group_name): self.redis = redis.Redis() self.stream = stream_name self.group = group_name self.consumer = f"worker-{os.getpid()}" self.shutdown_event = Event()
# Register signal handlers signal.signal(signal.SIGTERM, self.handle_shutdown) signal.signal(signal.SIGINT, self.handle_shutdown)
def handle_shutdown(self, signum, frame): print(f"Received signal {signum}, shutting down gracefully...") self.shutdown_event.set()
def process_with_shutdown_check(self, messages): """Process messages with ability to shutdown mid-batch""" processed = [] for msg_id, msg_data in messages: if self.shutdown_event.is_set(): # Don't ack, let another consumer claim print("Shutdown requested, leaving messages pending") break
try: self._process_message(msg_data) processed.append(msg_id) except Exception as e: print(f"Error: {e}")
return processed
def run(self): print(f"Starting consumer {self.consumer}")
# Create consumer group if not exists try: self.redis.xgroup_create(self.stream, self.group, id='0', mkstream=True) except redis.ResponseError as e: if "BUSYGROUP" not in str(e): raise
while not self.shutdown_event.is_set(): try: # Read messages messages = self.redis.xreadgroup( groupname=self.group, consumername=self.consumer, streams={self.stream: '>'}, count=10, block=1000 )
if messages: for stream, msgs in messages: processed = self.process_with_shutdown_check(msgs)
# Ack processed if processed: pipe = self.redis.pipeline() for msg_id in processed: pipe.xack(self.stream, self.group, msg_id) pipe.execute()
except redis.ConnectionError: print("Redis connection lost, retrying...") time.sleep(5) except Exception as e: print(f"Error: {e}") time.sleep(1)
# Cleanup print("Cleaning up...") self.redis.xgroup_delconsumer(self.stream, self.group, self.consumer) print("Shutdown complete")
# Kubernetes deployment with graceful shutdown: # spec: # terminationGracePeriodSeconds: 30 # containers: # - name: consumer # lifecycle: # preStop: # exec: # command: ["/bin/sh", "-c", "kill -SIGTERM 1"] ```
Step 7: Optimize Redis Configuration
```bash # Redis configuration for high-throughput streams:
# Memory management maxmemory 4gb maxmemory-policy noeviction
# Persistence (adjust based on requirements) # For high performance, use RDB only save 900 1 save 300 10 save 60 10000
# Or AOF for durability appendonly yes appendfsync everysec
# Network tcp-backlog 511 timeout 0 tcp-keepalive 300
# Client limits maxclients 10000
# Slow log (for debugging) slowlog-log-slower-than 10000 slowlog-max-len 128
# Stream specific optimizations # Use MAXLEN when adding to prevent unbounded growth # XADD stream MAXLEN ~ 100000 * field value
# Configure output buffers CONFIG SET client-output-buffer-limit "normal 0 0 0" CONFIG SET client-output-buffer-limit "replica 256mb 64mb 60"
# Enable latency monitoring CONFIG SET latency-monitor-threshold 100
# Check Redis stats redis-cli INFO stats | grep -E "total_commands|instantaneous"
# Check memory usage redis-cli INFO memory | grep -E "used_memory|used_memory_rss"
# Monitor slow queries redis-cli SLOWLOG GET 10 ```
Step 8: Implement Backpressure
```python import redis import time from collections import deque
class BackpressureConsumer: def __init__(self, stream_name, group_name, max_pending=1000): self.redis = redis.Redis() self.stream = stream_name self.group = group_name self.max_pending = max_pending
# Local queue for backpressure self.local_queue = deque(maxlen=10000)
def should_read_more(self): """Check if we should read more messages""" pending_info = self.redis.xpending(self.stream, self.group) pending_count = pending_info[0] if pending_info else 0
return pending_count < self.max_pending
def run(self): while True: # Check backpressure if not self.should_read_more(): print("Too many pending, waiting...") time.sleep(1) continue
# Read messages messages = self.redis.xreadgroup( groupname=self.group, consumername='worker', streams={self.stream: '>'}, count=10, block=5000 )
if messages: for stream, msgs in messages: for msg_id, msg_data in msgs: # Process self._process_message(msg_data)
# Ack self.redis.xack(self.stream, self.group, msg_id)
# Check local queue while self.local_queue: msg = self.local_queue.popleft() self._process_message(msg)
# Rate limiting with streams class RateLimitedConsumer: def __init__(self, stream_name, group_name, rate_limit=100): self.redis = redis.Redis() self.stream = stream_name self.group = group_name self.rate_limit = rate_limit # messages per second self.last_time = time.time() self.processed = 0
def run(self): while True: current_time = time.time() elapsed = current_time - self.last_time
# Check rate limit if elapsed >= 1.0: self.last_time = current_time self.processed = 0
if self.processed >= self.rate_limit: sleep_time = 1.0 - elapsed time.sleep(max(0, sleep_time)) continue
# Read message messages = self.redis.xreadgroup( groupname=self.group, consumername='worker', streams={self.stream: '>'}, count=1, block=100 )
if messages: for stream, msgs in messages: for msg_id, msg_data in msgs: self._process_message(msg_data) self.redis.xack(self.stream, self.group, msg_id) self.processed += 1 ```
Step 9: Scale Consumers Dynamically
```python import redis import os import time import threading
class DynamicScaler: def __init__(self, stream_name, group_name, min_consumers=1, max_consumers=10, scale_up_threshold=5000, scale_down_threshold=500): self.redis = redis.Redis() self.stream = stream_name self.group = group_name self.min_consumers = min_consumers self.max_consumers = max_consumers self.scale_up_threshold = scale_up_threshold self.scale_down_threshold = scale_down_threshold
self.consumer_processes = {} self.next_consumer_id = 0
def get_pending_count(self): pending = self.redis.xpending(self.stream, self.group) return pending[0] if pending else 0
def get_active_consumers(self): consumers = self.redis.xinfo_consumers(self.stream, self.group) active = [c['name'] for c in consumers if c['idle'] < 60000] return len(active)
def scale_up(self): if len(self.consumer_processes) >= self.max_consumers: return
consumer_id = f"consumer-{self.next_consumer_id}" self.next_consumer_id += 1
# Start new consumer (could be subprocess, thread, or k8s pod) process = self.start_consumer(consumer_id) self.consumer_processes[consumer_id] = process
print(f"Scaled up: started {consumer_id}")
def scale_down(self): if len(self.consumer_processes) <= self.min_consumers: return
# Find least busy consumer consumers = self.redis.xinfo_consumers(self.stream, self.group) if consumers: consumers.sort(key=lambda c: c['pending']) to_remove = consumers[0]['name']
# Stop consumer self.stop_consumer(to_remove) del self.consumer_processes[to_remove]
print(f"Scaled down: stopped {to_remove}")
def start_consumer(self, consumer_id): """Override to start consumer process""" thread = threading.Thread( target=self._run_consumer, args=(consumer_id,), daemon=True ) thread.start() return thread
def stop_consumer(self, consumer_id): """Override to stop consumer process""" # Set flag for consumer to stop pass
def _run_consumer(self, consumer_id): """Consumer run loop""" while True: messages = self.redis.xreadgroup( groupname=self.group, consumername=consumer_id, streams={self.stream: '>'}, count=10, block=5000 )
if messages: for stream, msgs in messages: for msg_id, msg_data in msgs: self._process_message(msg_data) self.redis.xack(self.stream, self.group, msg_id)
def _process_message(self, msg_data): """Override for message processing""" pass
def run(self): while True: pending = self.get_pending_count() active = self.get_active_consumers()
print(f"Pending: {pending}, Active consumers: {active}")
if pending > self.scale_up_threshold and active < self.max_consumers: self.scale_up() elif pending < self.scale_down_threshold and active > self.min_consumers: self.scale_down()
time.sleep(10)
# Usage scaler = DynamicScaler( stream_name='mystream', group_name='mygroup', min_consumers=2, max_consumers=10 ) scaler.run() ```
Step 10: Production Best Practices
```bash # Complete production setup script:
# 1. Create stream with maxlen redis-cli XGROUP CREATE mystream mygroup 0 MKSTREAM
# 2. Set up monitoring cat << 'EOF' > /etc/redis/stream-monitor.conf [program:redis-stream-monitor] command=/usr/local/bin/stream-monitor.sh autostart=true autorestart=true stderr_logfile=/var/log/stream-monitor.err.log stdout_logfile=/var/log/stream-monitor.out.log EOF
# 3. Create systemd service for consumer cat << 'EOF' > /etc/systemd/system/stream-consumer.service [Unit] Description=Redis Stream Consumer After=network.target redis.service
[Service] Type=simple User=app Group=app ExecStart=/usr/bin/python3 /opt/consumer/main.py Restart=always RestartSec=5 TimeoutStopSec=30 KillSignal=SIGTERM
[Install] WantedBy=multi-user.target EOF
systemctl daemon-reload systemctl enable stream-consumer systemctl start stream-consumer
# 4. Set up log rotation cat << 'EOF' > /etc/logrotate.d/redis-stream /var/log/stream-*.log { daily rotate 7 compress delaycompress missingok notifempty create 0644 app app } EOF
# 5. Backup script cat << 'EOF' > /usr/local/bin/backup-stream.sh #!/bin/bash BACKUP_DIR="/backup/redis-streams" STREAM="mystream" DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p $BACKUP_DIR
# Export stream data redis-cli XRANGE $STREAM - + > $BACKUP_DIR/${STREAM}_${DATE}.txt
# Keep only last 7 days find $BACKUP_DIR -name "*.txt" -mtime +7 -delete EOF
chmod +x /usr/local/bin/backup-stream.sh
# 6. Add to crontab # 0 2 * * * /usr/local/bin/backup-stream.sh ```
Redis Stream Consumer Group Checklist
| Check | Command | Expected |
|---|---|---|
| Pending count | XPENDING | Below threshold |
| Consumer idle | XINFO CONSUMERS | < 60 seconds |
| Active consumers | XINFO GROUPS | Sufficient count |
| Processing rate | Monitor | Matches input rate |
| Claiming working | XAUTOCLAIM | Reclaiming idle |
| Backpressure | Custom script | Applied correctly |
Verify the Fix
```bash # After implementing fixes:
# 1. Check pending messages stable redis-cli XPENDING mystream mygroup | head -1 # Output: Stable count, not growing
# 2. Check consumer idle times redis-cli XINFO CONSUMERS mystream mygroup # Output: All consumers have low idle time
# 3. Verify claiming works redis-cli XAUTOCLAIM mystream mygroup active 60000 0 COUNT 10 # Output: Idle messages claimed
# 4. Check lag decreasing STREAM_LEN=$(redis-cli XLEN mystream) PENDING=$(redis-cli XPENDING mystream mygroup | head -1) echo "Stream: $STREAM_LEN, Pending: $PENDING" # Output: Pending much less than stream length
# 5. Monitor processing rate watch -n 1 'redis-cli XPENDING mystream mygroup | head -1' # Output: Count stable or decreasing
# 6. Check metrics endpoint curl http://localhost:8000/metrics | grep redis_stream # Output: All metrics healthy
# Compare before/after: # Before: 15000 pending, consumers idle, lag growing # After: Stable pending, active consumers, lag under control ```
Related Issues
- [Fix Redis Memory Limit Exceeded](/articles/fix-redis-memory-limit-exceeded)
- [Fix Redis Connection Refused](/articles/fix-redis-connection-refused)
- [Fix Redis Persistence Failure](/articles/fix-redis-persistence-failure)