What's Actually Happening

Kafka consumer lag is continuously increasing. Consumers cannot keep up with message production rate, leading to delayed processing.

The Error You'll See

Consumer lag monitoring:

```bash $ kafka-consumer-groups --describe --group mygroup

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG mygroup mytopic 0 1000 100000 99000 mygroup mytopic 1 2000 150000 148000 ```

Monitoring alert:

bash
Alert: Kafka consumer lag > 100000 for group mygroup
Current lag: 250000

Processing delay:

bash
Messages produced at T are being processed at T+30 minutes

Why This Happens

  1. 1.Consumer too slow - Processing time per message too high
  2. 2.Too few consumers - Not enough consumers in group
  3. 3.Partition imbalance - Uneven partition distribution
  4. 4.Consumer rebalances - Frequent rebalancing slowing processing
  5. 5.Downstream bottleneck - Database or API slow
  6. 6.Resource limits - CPU/memory/network constraints

Step 1: Check Consumer Lag Status

```bash # Check consumer group lag: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup

# List all consumer groups: kafka-consumer-groups --bootstrap-server kafka:9092 --list

# Check all groups with lag: for group in $(kafka-consumer-groups --bootstrap-server kafka:9092 --list); do echo "=== $group ===" kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group $group | grep -v PARTITION done

# Monitor lag in real-time: watch -n 5 'kafka-consumer-groups --bootstrap-server kafka:9092 --describe --group mygroup'

# Calculate total lag: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup | \ awk 'NR>1 {sum+=$NF} END {print "Total lag:", sum}' ```

Step 2: Check Topic Partitions

```bash # Check topic partitions: kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic

# Output shows: # Topic: mytopic PartitionCount: 3 ReplicationFactor: 1

# Check partition count vs consumer count: # Rule: consumers <= partitions # Extra consumers will be idle

# Increase partitions (if needed): kafka-topics --bootstrap-server kafka:9092 \ --alter --topic mytopic --partitions 6

# Note: Cannot decrease partitions!

# Check partition leadership: kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic | grep Leader

# Check in-sync replicas: kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic --under-replicated-partitions ```

Step 3: Check Consumer Group Members

```bash # Check consumer group members: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members --verbose

# Output shows: # CONSUMER-ID HOST PARTITIONS # consumer-1-uuid /10.0.0.1 0,1 # consumer-2-uuid /10.0.0.2 2,3

# Count active consumers: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members | grep -c CONSUMER

# Check partition assignment: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members --verbose | \ awk '{print $1, $NF}' | sort

# Verify all partitions assigned: kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup | grep -c PARTITION ```

Step 4: Check Consumer Performance

```bash # Check consumer logs for processing time: grep "processing time" /var/log/consumer.log | tail -100

# Check consumer metrics: # Via JMX: # kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*

# Key metrics: # - records-consumed-rate # - bytes-consumed-rate # - records-lag-max # - fetch-rate

# Check consumer configuration: # In consumer code or properties: # fetch.min.bytes # fetch.max.wait.ms # max.poll.records # max.partition.fetch.bytes

# Test consumer throughput: # Produce test messages and measure consumption rate kafka-producer-perf-test --topic test \ --num-records 100000 --record-size 1000 \ --throughput -1 --producer-props bootstrap.servers=kafka:9092

kafka-consumer-perf-test --topic test \ --messages 100000 --bootstrap-server kafka:9092 ```

Step 5: Optimize Consumer Configuration

```properties # Consumer configuration for high throughput:

# Increase batch size: fetch.min.bytes=1048576 # 1MB minimum per fetch fetch.max.bytes=52428800 # 50MB max per fetch

# Increase records per poll: max.poll.records=10000 # Default: 500

# Increase poll interval (if processing slow): max.poll.interval.ms=600000 # 10 minutes

# Adjust fetch wait time: fetch.max.wait.ms=500 # Max wait for fetch.min.bytes

# Session timeout for stability: session.timeout.ms=30000 # 30 seconds heartbeat.interval.ms=10000 # 10 seconds

# Receive buffer: receive.buffer.bytes=1048576 # 1MB

# For Java consumer: Properties props = new Properties(); props.put("max.poll.records", "10000"); props.put("fetch.min.bytes", "1048576"); ```

Step 6: Scale Consumers

```bash # Add more consumers to group:

# With same consumer group ID: # java -jar consumer.jar --group mygroup

# Kubernetes - scale deployment: kubectl scale deployment consumer --replicas=6

# Ensure partitions >= consumers! # Extra consumers will be idle

# Check consumer count matches partitions: PARTITIONS=$(kafka-topics --bootstrap-server kafka:9092 \ --describe --topic mytopic | grep PartitionCount | awk '{print $2}') CONSUMERS=$(kafka-consumer-groups --bootstrap-server kafka:9092 \ --describe --group mygroup --members | grep -c CONSUMER) echo "Partitions: $PARTITIONS, Consumers: $CONSUMERS"

# If consumers > partitions, reduce or increase partitions ```

Step 7: Fix Rebalancing Issues

```bash # Frequent rebalances cause lag:

# Check rebalance events: grep "Revoking previously assigned partitions" /var/log/consumer.log

# Check session timeout: # Consumer must send heartbeat within session.timeout.ms

# Increase timeouts: session.timeout.ms=60000 # 60 seconds max.poll.interval.ms=600000 # 10 minutes

# Use static membership (Kafka 2.3+): # Consumer config: group.instance.id=consumer-1

# Static members don't rebalance on short restarts: # In consumer properties: group.instance.id=consumer-1 session.timeout.ms=300000 ```

Step 8: Fix Downstream Bottlenecks

```bash # Check what's slowing processing:

# Database latency: # Monitor DB query times grep "query time" /var/log/consumer.log

# API call latency: # Check API response times

# Add async processing: # Instead of sync processing: # while (true) { # records = consumer.poll(Duration.ofMillis(100)); # for (record : records) { # process(record); // SLOW # } // }

# Use async: # while (true) { # records = consumer.poll(Duration.ofMillis(100)); # CompletableFuture<?>[] futures = records.stream() # .map(r -> CompletableFuture.runAsync(() -> process(r))) # .toArray(CompletableFuture[]::new); # CompletableFuture.allOf(futures).join(); // }

# Use batch inserts: # Instead of single inserts, batch: INSERT INTO table VALUES (...), (...), (...);

# Use connection pooling: # Ensure database connections pooled efficiently ```

Step 9: Monitor Consumer Metrics

```bash # Enable JMX metrics: KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

# Key consumer metrics to monitor: # records-lag-max - Maximum lag across partitions # records-consumed-rate - Records consumed per second # bytes-consumed-rate - Bytes consumed per second # commit-rate - Offset commit rate # join-rate - Consumer group join rate (rebalances) # sync-rate - Consumer group sync rate

# Prometheus JMX exporter: # java -javaagent:jmx_prometheus_javaagent.jar=7071:config.yaml -jar consumer.jar

# Grafana dashboard: # Import Kafka consumer dashboard # Monitor lag trend over time

# Set up alerts: # Alert if lag > 10000 for 5 minutes # Alert if lag increasing for 10 minutes ```

Step 10: Kafka Consumer Lag Verification Script

```bash # Create verification script: cat << 'EOF' > /usr/local/bin/check-kafka-lag.sh #!/bin/bash

BROKER=${1:-"kafka:9092"} GROUP=${2:-""}

echo "=== Consumer Groups ===" kafka-consumer-groups --bootstrap-server $BROKER --list

echo "" echo "=== Groups with High Lag ===" for g in $(kafka-consumer-groups --bootstrap-server $BROKER --list); do LAG=$(kafka-consumer-groups --bootstrap-server $BROKER \ --describe --group $g 2>/dev/null | \ awk 'NR>1 {sum+=$NF} END {print sum+0}') if [ "$LAG" -gt 1000 ]; then echo "Group: $g, Total Lag: $LAG" fi done

if [ -n "$GROUP" ]; then echo "" echo "=== Detailed: $GROUP ===" kafka-consumer-groups --bootstrap-server $BROKER \ --describe --group $GROUP

echo "" echo "=== Members: $GROUP ===" kafka-consumer-groups --bootstrap-server $BROKER \ --describe --group $GROUP --members --verbose fi

echo "" echo "=== Topic Partition Summary ===" kafka-topics --bootstrap-server $BROKER --list | head -10 | while read topic; do PARTS=$(kafka-topics --bootstrap-server $BROKER \ --describe --topic $topic 2>/dev/null | \ grep PartitionCount | awk '{print $2}') echo "$topic: $PARTS partitions" done EOF

chmod +x /usr/local/bin/check-kafka-lag.sh

# Usage: /usr/local/bin/check-kafka-lag.sh kafka:9092 mygroup

# Monitor lag: watch -n 10 /usr/local/bin/check-kafka-lag.sh ```

Kafka Consumer Lag Checklist

CheckCommandExpected
Lag statusdescribe groupLag decreasing
Partitionsdescribe topicSufficient for consumers
Consumersdescribe membersActive consumers
Processing ratemetricsMatches production
RebalanceslogsMinimal rebalances
Downstreamlatency checkFast enough

Verify the Fix

```bash # After fixing consumer lag

# 1. Check lag decreasing kafka-consumer-groups --describe --group mygroup // LAG column decreasing

# 2. Check consumers kafka-consumer-groups --describe --group mygroup --members // All consumers active

# 3. Monitor trend watch -n 10 'kafka-consumer-groups --describe --group mygroup' // Lag trending to zero

# 4. Check processing time grep "processing" consumer.log // Acceptable latency

# 5. Verify throughput # Check metrics // records-consumed-rate stable

# 6. No rebalances grep "rebalance" consumer.log | tail // Stable assignment ```

  • [Fix Kafka Consumer Lag High](/articles/fix-kafka-consumer-lag-high)
  • [Fix Kafka Broker Not Available](/articles/fix-kafka-broker-not-available)
  • [Fix Kafka Producer Failed](/articles/fix-kafka-producer-failed)