What's Actually Happening
Kafka consumer lag is continuously increasing. Consumers cannot keep up with message production rate, leading to delayed processing.
The Error You'll See
Consumer lag monitoring:
```bash $ kafka-consumer-groups --describe --group mygroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG mygroup mytopic 0 1000 100000 99000 mygroup mytopic 1 2000 150000 148000 ```
Monitoring alert:
Alert: Kafka consumer lag > 100000 for group mygroup
Current lag: 250000Processing delay:
Messages produced at T are being processed at T+30 minutesWhy This Happens
- 1.Consumer too slow - Processing time per message too high
- 2.Too few consumers - Not enough consumers in group
- 3.Partition imbalance - Uneven partition distribution
- 4.Consumer rebalances - Frequent rebalancing slowing processing
- 5.Downstream bottleneck - Database or API slow
- 6.Resource limits - CPU/memory/network constraints
Step 1: Check Consumer Lag Status
```bash # Check consumer group lag: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup
# List all consumer groups: kafka-consumer-groups --bootstrap-server kafka:9092 --list
# Check all groups with lag: for group in $(kafka-consumer-groups --bootstrap-server kafka:9092 --list); do echo "=== $group ===" kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group $group | grep -v PARTITION done
# Monitor lag in real-time: watch -n 5 'kafka-consumer-groups --bootstrap-server kafka:9092 --describe --group mygroup'
# Calculate total lag: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup | \ awk 'NR>1 {sum+=$NF} END {print "Total lag:", sum}' ```
Step 2: Check Topic Partitions
```bash # Check topic partitions: kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic
# Output shows: # Topic: mytopic PartitionCount: 3 ReplicationFactor: 1
# Check partition count vs consumer count: # Rule: consumers <= partitions # Extra consumers will be idle
# Increase partitions (if needed): kafka-topics --bootstrap-server kafka:9092 \ --alter --topic mytopic --partitions 6
# Note: Cannot decrease partitions!
# Check partition leadership: kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic | grep Leader
# Check in-sync replicas: kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic --under-replicated-partitions ```
Step 3: Check Consumer Group Members
```bash # Check consumer group members: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members --verbose
# Output shows: # CONSUMER-ID HOST PARTITIONS # consumer-1-uuid /10.0.0.1 0,1 # consumer-2-uuid /10.0.0.2 2,3
# Count active consumers: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members | grep -c CONSUMER
# Check partition assignment: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members --verbose | \ awk '{print $1, $NF}' | sort
# Verify all partitions assigned: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup | grep -c PARTITION ```
Step 4: Check Consumer Performance
```bash # Check consumer logs for processing time: grep "processing time" /var/log/consumer.log | tail -100
# Check consumer metrics: # Via JMX: # kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*
# Key metrics: # - records-consumed-rate # - bytes-consumed-rate # - records-lag-max # - fetch-rate
# Check consumer configuration: # In consumer code or properties: # fetch.min.bytes # fetch.max.wait.ms # max.poll.records # max.partition.fetch.bytes
# Test consumer throughput: # Produce test messages and measure consumption rate kafka-producer-perf-test --topic test \ --num-records 100000 --record-size 1000 \ --throughput -1 --producer-props bootstrap.servers=kafka:9092
kafka-consumer-perf-test --topic test \ --messages 100000 --bootstrap-server kafka:9092 ```
Step 5: Optimize Consumer Configuration
```properties # Consumer configuration for high throughput:
# Increase batch size: fetch.min.bytes=1048576 # 1MB minimum per fetch fetch.max.bytes=52428800 # 50MB max per fetch
# Increase records per poll: max.poll.records=10000 # Default: 500
# Increase poll interval (if processing slow): max.poll.interval.ms=600000 # 10 minutes
# Adjust fetch wait time: fetch.max.wait.ms=500 # Max wait for fetch.min.bytes
# Session timeout for stability: session.timeout.ms=30000 # 30 seconds heartbeat.interval.ms=10000 # 10 seconds
# Receive buffer: receive.buffer.bytes=1048576 # 1MB
# For Java consumer: Properties props = new Properties(); props.put("max.poll.records", "10000"); props.put("fetch.min.bytes", "1048576"); ```
Step 6: Scale Consumers
```bash # Add more consumers to group:
# With same consumer group ID: # java -jar consumer.jar --group mygroup
# Kubernetes - scale deployment: kubectl scale deployment consumer --replicas=6
# Ensure partitions >= consumers! # Extra consumers will be idle
# Check consumer count matches partitions: PARTITIONS=$(kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic | grep PartitionCount | awk '{print $2}') CONSUMERS=$(kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members | grep -c CONSUMER) echo "Partitions: $PARTITIONS, Consumers: $CONSUMERS"
# If consumers > partitions, reduce or increase partitions ```
Step 7: Fix Rebalancing Issues
```bash # Frequent rebalances cause lag:
# Check rebalance events: grep "Revoking previously assigned partitions" /var/log/consumer.log
# Check session timeout: # Consumer must send heartbeat within session.timeout.ms
# Increase timeouts: session.timeout.ms=60000 # 60 seconds max.poll.interval.ms=600000 # 10 minutes
# Use static membership (Kafka 2.3+): # Consumer config: group.instance.id=consumer-1
# Static members don't rebalance on short restarts: # In consumer properties: group.instance.id=consumer-1 session.timeout.ms=300000 ```
Step 8: Fix Downstream Bottlenecks
```bash # Check what's slowing processing:
# Database latency: # Monitor DB query times grep "query time" /var/log/consumer.log
# API call latency: # Check API response times
# Add async processing: # Instead of sync processing: # while (true) { # records = consumer.poll(Duration.ofMillis(100)); # for (record : records) { # process(record); // SLOW # } // }
# Use async: # while (true) { # records = consumer.poll(Duration.ofMillis(100)); # CompletableFuture<?>[] futures = records.stream() # .map(r -> CompletableFuture.runAsync(() -> process(r))) # .toArray(CompletableFuture[]::new); # CompletableFuture.allOf(futures).join(); // }
# Use batch inserts: # Instead of single inserts, batch: INSERT INTO table VALUES (...), (...), (...);
# Use connection pooling: # Ensure database connections pooled efficiently ```
Step 9: Monitor Consumer Metrics
```bash # Enable JMX metrics: KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# Key consumer metrics to monitor: # records-lag-max - Maximum lag across partitions # records-consumed-rate - Records consumed per second # bytes-consumed-rate - Bytes consumed per second # commit-rate - Offset commit rate # join-rate - Consumer group join rate (rebalances) # sync-rate - Consumer group sync rate
# Prometheus JMX exporter: # java -javaagent:jmx_prometheus_javaagent.jar=7071:config.yaml -jar consumer.jar
# Grafana dashboard: # Import Kafka consumer dashboard # Monitor lag trend over time
# Set up alerts: # Alert if lag > 10000 for 5 minutes # Alert if lag increasing for 10 minutes ```
Step 10: Kafka Consumer Lag Verification Script
```bash # Create verification script: cat << 'EOF' > /usr/local/bin/check-kafka-lag.sh #!/bin/bash
BROKER=${1:-"kafka:9092"} GROUP=${2:-""}
echo "=== Consumer Groups ===" kafka-consumer-groups --bootstrap-server $BROKER --list
echo "" echo "=== Groups with High Lag ===" for g in $(kafka-consumer-groups --bootstrap-server $BROKER --list); do LAG=$(kafka-consumer-groups --bootstrap-server $BROKER \ --describe --group $g 2>/dev/null | \ awk 'NR>1 {sum+=$NF} END {print sum+0}') if [ "$LAG" -gt 1000 ]; then echo "Group: $g, Total Lag: $LAG" fi done
if [ -n "$GROUP" ]; then echo "" echo "=== Detailed: $GROUP ===" kafka-consumer-groups --bootstrap-server $BROKER \ --describe --group $GROUP
echo "" echo "=== Members: $GROUP ===" kafka-consumer-groups --bootstrap-server $BROKER \ --describe --group $GROUP --members --verbose fi
echo "" echo "=== Topic Partition Summary ===" kafka-topics --bootstrap-server $BROKER --list | head -10 | while read topic; do PARTS=$(kafka-topics --bootstrap-server $BROKER \ --describe --topic $topic 2>/dev/null | \ grep PartitionCount | awk '{print $2}') echo "$topic: $PARTS partitions" done EOF
chmod +x /usr/local/bin/check-kafka-lag.sh
# Usage: /usr/local/bin/check-kafka-lag.sh kafka:9092 mygroup
# Monitor lag: watch -n 10 /usr/local/bin/check-kafka-lag.sh ```
Kafka Consumer Lag Checklist
| Check | Command | Expected |
|---|---|---|
| Lag status | describe group | Lag decreasing |
| Partitions | describe topic | Sufficient for consumers |
| Consumers | describe members | Active consumers |
| Processing rate | metrics | Matches production |
| Rebalances | logs | Minimal rebalances |
| Downstream | latency check | Fast enough |
Verify the Fix
```bash # After fixing consumer lag
# 1. Check lag decreasing kafka-consumer-groups --describe --group mygroup // LAG column decreasing
# 2. Check consumers kafka-consumer-groups --describe --group mygroup --members // All consumers active
# 3. Monitor trend watch -n 10 'kafka-consumer-groups --describe --group mygroup' // Lag trending to zero
# 4. Check processing time grep "processing" consumer.log // Acceptable latency
# 5. Verify throughput # Check metrics // records-consumed-rate stable
# 6. No rebalances grep "rebalance" consumer.log | tail // Stable assignment ```
Related Issues
- [Fix Kafka Consumer Lag High](/articles/fix-kafka-consumer-lag-high)
- [Fix Kafka Broker Not Available](/articles/fix-kafka-broker-not-available)
- [Fix Kafka Producer Failed](/articles/fix-kafka-producer-failed)