Introduction
Kafka consumer lag growing unbounded occurs when consumers cannot process messages as fast as producers write them, causing the lag (difference between latest offset and committed offset) to grow indefinitely. Eventually, lag consumes available disk space on brokers, causes memory pressure on consumers, and violates SLAs for real-time processing. Lag can stem from slow downstream dependencies, inefficient message processing, consumer group instability, offset commit failures, or fundamental capacity mismatch between production and consumption rates.
Symptoms
- Consumer lag metrics show continuous growth over hours/days
kafka_consumer_group_lagexceeds millions of messages- Consumer processing latency increases as backlog grows
- Consumer group shows frequent rebalancing events
- Some partitions show lag while others are current (skew)
- Issue appears after traffic increase, new deploy with slower processing, or downstream service degradation
- Consumer logs show
CommitFailedException,RebalanceException, orTimeoutException
Common Causes
- Message processing slower than production rate (capacity mismatch)
- Downstream dependency (database, API) causing processing delays
- Consumer group rebalancingstorm preventing steady consumption
- Offset commit failures causing re-processing
- Poison pill messages causing repeated failures
- Consumer crashing or OOM before committing offsets
- Partition skew (uneven key distribution) overloading single consumer
- Producer batching too large, causing burst consumption patterns
Step-by-Step Fix
### 1. Measure current consumer lag
Check lag across consumer groups:
```bash # Kafka CLI: describe consumer groups kafka-consumer-groups.sh --bootstrap-server broker1:9092 \ --describe --group <group-name>
# Output: # GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID # my-consumer my-topic 0 1000000 5000000 4000000 consumer-1-abc123 /10.0.1.10 consumer-1 # my-consumer my-topic 1 2000000 2500000 500000 consumer-2-def456 /10.0.1.11 consumer-2
# Check all groups kafka-consumer-groups.sh --bootstrap-server broker1:9092 \ --describe --all-groups | grep -v "LAG" | awk '{sum += $6} END {print "Total lag:", sum}'
# Check lag over time (run periodically) watch -n 5 'kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group <group-name> | grep <topic-name>' ```
Using Kafka Manager or UI:
```bash # With kafka-manager curl -s http://kafka-manager:9000/api/consumers/<cluster>/<group> | jq
# With Burrow (LinkedIn's monitoring tool) curl -s http://burrow:8000/v3/kafka/<cluster>/consumer/<group>/lag | jq ```
### 2. Identify slow partitions and consumers
Find partition skew:
```bash # Check lag by partition kafka-consumer-groups.sh --bootstrap-server broker1:9092 \ --describe --group <group-name> | \ awk 'NR>1 {print $3, $6}' | sort -k2 -rn | head -10
# Check message rate per partition kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list broker1:9092,broker2:9092 \ --topics my-topic \ --time -1 # Latest offset
# Compare with consumer assignment kafka-consumer-groups.sh --bootstrap-server broker1:9092 \ --describe --group <group-name> --members --verbose ```
Check consumer processing rate:
```bash # Prometheus metrics (if using kafka_consumer) # Consumer records consumed rate rate(kafka_consumer_records_consumed_total[5m])
# Consumer lag kafka_consumer_group_lag
# Consumer rebalance rate rate(kafka_consumer_rebalances_total[5m])
# Processing latency (application metric) histogram_quantile(0.99, rate(message_processing_duration_seconds_bucket[5m])) ```
### 3. Check for rebalancing storms
Frequent rebalancing prevents consumption:
```bash # Check consumer group state kafka-consumer-groups.sh --bootstrap-server broker1:9092 \ --describe --group <group-name> --state
# Output: # GROUP STATE COORDINATOR (ID) ASSIGNMENT-STRATEGY NUM-MEMBERS # my-consumer Stable broker1:9092 (1) range 5 # my-consumer PreparingRebalance broker1:9092 (1) range 3
# If STATE frequently shows PreparingRebalance or CompletingRebalance: # Consumers are unstable, rebalancing repeatedly ```
Check rebalance frequency:
```bash # From consumer logs grep "Rebalance" /var/log/kafka-consumer/app.log | tail -20
# Count rebalances per hour grep "Rebalance" /var/log/kafka-consumer/app.log | \ awk '{print $1, $2}' | uniq -c | awk '{print $2, $1}'
# Or from metrics rate(kafka_consumer_rebalances_total[5m]) ```
Fix rebalancing issues:
```java // Java consumer configuration Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092"); props.put("group.id", "my-consumer");
// Increase session timeout (time before consumer considered dead) props.put("session.timeout.ms", "45000"); // Default: 10000
// Increase heartbeat interval (must be < session.timeout.ms/3) props.put("heartbeat.interval.ms", "15000"); // Default: 3000
// Increase max poll interval (time between poll() calls) props.put("max.poll.interval.ms", "300000"); // Default: 300000
// Reduce max poll records (process fewer at once) props.put("max.poll.records", "100"); // Default: 500
// Static membership to avoid rebalance on restart props.put("group.instance.id", "consumer-1"); ```
### 4. Check offset commit failures
Failed commits cause re-processing:
```java // WRONG: Auto commit with processing failures props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "5000"); // If processing fails after commit, message is lost // If processing succeeds but commit fails, message is re-processed
// CORRECT: Manual commit after processing props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("my-topic"));
try { while (!shutdown) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { processMessage(record); // May throw exception }
// Commit after successful processing consumer.commitSync(); } } catch (CommitFailedException e) { logger.error("Commit failed - consumer group may have rebalanced"); // Consumer will re-process from last committed offset } ```
Check commit status:
```bash # Get committed offsets kafka-consumer-groups.sh --bootstrap-server broker1:9092 \ --describe --group <group-name> --members
# Compare with log end offset kafka-run-class.sh kafka.tools.GetOffsetShell \ --broker-list broker1:9092 \ --topics my-topic \ --time -1 ```
### 5. Identify poison pill messages
Single bad message blocking partition:
```java // Implement poison pill handling int retryCount = 0; int maxRetries = 3; Map<TopicPartition, Long> failedOffsets = new HashMap<>();
while (!shutdown) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) { try { processMessage(record); retryCount = 0; } catch (PoisonPillException e) { retryCount++; logger.error(f"Poison pill at offset {record.offset()}");
if (retryCount >= maxRetries) { // Send to dead letter queue sendToDLQ(record, e);
// Commit offset to skip this message consumer.commitSync(); retryCount = 0; } else { // Seek back to re-process consumer.seek(record.topic(), record.partition(), record.offset()); } } }
consumer.commitSync(); } ```
Dead letter queue pattern:
```java void sendToDLQ(ConsumerRecord<String, String> record, Exception e) { ProducerRecord<String, String> dlqRecord = new ProducerRecord<>( "my-topic-dlq", record.key(), record.value() );
// Add error metadata dlqRecord.headers().add("error-message", e.getMessage().getBytes()); dlqRecord.headers().add("original-topic", record.topic().getBytes()); dlqRecord.headers().add("original-partition", ByteBuffer.allocate(4).putInt(record.partition()).array()); dlqRecord.headers().add("original-offset", ByteBuffer.allocate(8).putLong(record.offset()).array());
producer.send(dlqRecord); } ```
### 6. Scale consumer group
Add parallelism to increase throughput:
```bash # Check current partition count kafka-topics.sh --bootstrap-server broker1:9092 \ --describe --topic my-topic
# Output: # Topic: my-topic PartitionCount: 6 ReplicationFactor: 3 # Topic: my-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3 # ...
# Max consumers = number of partitions # If 6 partitions, max 6 consumers can actively consume
# Increase partitions (if partition key allows) kafka-topics.sh --bootstrap-server broker1:9092 \ --alter --topic my-topic --partitions 12
# Or create new topic with more partitions kafka-topics.sh --bootstrap-server broker1:9092 \ --create --topic my-topic-v2 --partitions 24 --replication-factor 3
# Use Kafka Streams to migrate ```
Consumer scaling configuration:
yaml
# Kubernetes HPA for Kafka consumers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: kafka-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: kafka-consumer
minReplicas: 3
maxReplicas: 24 # Match partition count
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
selector:
matchLabels:
consumer-group: my-consumer
target:
type: AverageValue
averageValue: 10000 # Scale to maintain lag < 10k
### 7. Optimize message processing
Reduce per-message processing time:
```java // WRONG: Synchronous downstream calls per message void processMessage(ConsumerRecord<String, String> record) { // Each message makes DB call database.save(record.value()); // 50ms latency
// Each message makes API call apiClient.notify(record.value()); // 100ms latency
// Total: 150ms per message // At 1000 messages/batch: 150 seconds! }
// CORRECT: Batch downstream operations List<String> batch = new ArrayList<>(); int batchSize = 100;
void processMessage(ConsumerRecord<String, String> record) { batch.add(record.value());
if (batch.size() >= batchSize) { flushBatch(); } }
void flushBatch() { // Batch DB insert database.saveAll(batch); // 100ms for 100 records
// Batch API call apiClient.notifyAll(batch); // 200ms for 100 records
batch.clear(); } ```
Async processing pattern:
```java // Async processing with completion tracking ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture<Void>[] futures = new CompletableFuture[records.count()];
int i = 0; for (ConsumerRecord<String, String> record : records) { futures[i++] = CompletableFuture.runAsync(() -> { processMessage(record); }, executor); }
// Wait for all to complete CompletableFuture.allOf(futures).join();
// Then commit consumer.commitSync(); ```
### 8. Check downstream dependency health
Slow dependencies cause processing delays:
```java // Add timing instrumentation void processMessage(ConsumerRecord<String, String> record) { long start = System.currentTimeMillis();
try { long dbStart = System.currentTimeMillis(); database.save(record.value()); logger.info(f"DB save took {System.currentTimeMillis() - dbStart}ms");
long apiStart = System.currentTimeMillis(); apiClient.notify(record.value()); logger.info(f"API call took {System.currentTimeMillis() - apiStart}ms");
logger.info(f"Total processing: {System.currentTimeMillis() - start}ms"); } catch (Exception e) { logger.error(f"Processing failed after {System.currentTimeMillis() - start}ms", e); throw e; } } ```
Circuit breaker for downstream:
```java CircuitBreaker circuitBreaker = CircuitBreaker.ofDefaults("downstream");
void processMessage(ConsumerRecord<String, String> record) { // If circuit open, skip processing or send to retry queue Supplier<Void> supplier = CircuitBreaker.decorateSupplier(circuitBreaker, () -> { apiClient.notify(record.value()); return null; });
try { supplier.get(); } catch (CircuitBreakerOpenException e) { logger.warn("Circuit breaker open, sending to retry queue"); sendToRetryQueue(record); // Still commit to avoid re-processing immediately } } ```
### 9. Implement lag-based alerting
Set up monitoring and alerts:
```yaml # Prometheus alerting rules groups: - name: kafka_consumer rules: - alert: KafkaConsumerLagWarning expr: kafka_consumer_group_lag > 100000 for: 5m labels: severity: warning annotations: summary: "Kafka consumer lag exceeding threshold" description: "Consumer group {{ $labels.group }} has lag of {{ $value }}"
- alert: KafkaConsumerLagCritical
- expr: kafka_consumer_group_lag > 1000000
- for: 5m
- labels:
- severity: critical
- annotations:
- summary: "Kafka consumer lag critical"
- description: "Consumer group {{ $labels.group }} has lag of {{ $value }} - immediate action required"
- alert: KafkaConsumerLagGrowth
- expr: |
- rate(kafka_consumer_group_lag[5m]) > 1000
- for: 10m
- labels:
- severity: warning
- annotations:
- summary: "Kafka consumer lag growing rapidly"
- description: "Lag growing at {{ $value }} messages/minute"
- alert: KafkaConsumerRebalancing
- expr: |
- rate(kafka_consumer_rebalances_total[5m]) > 0.1
- for: 5m
- labels:
- severity: warning
- annotations:
- summary: "Kafka consumer group rebalancing frequently"
- description: "{{ $value }} rebalances/minute"
`
### 10. Implement lag-based backpressure
Slow down producers when consumers can't keep up:
```java // Producer with lag-based backpressure ProducerConfig config = new ProducerConfig(); config.setBootstrapServers("broker1:9092");
// Check lag before sending long currentLag = getConsumerLag(); long maxLag = 1_000_000;
if (currentLag > maxLag) { logger.warn(f"Lag {currentLag} exceeds max {maxLag}, applying backpressure");
// Reduce send rate Thread.sleep(1000);
// Or reject non-critical messages if (!message.isCritical()) { throw new BackpressureException("Consumer lag too high"); } }
// Send with acknowledgment producer.send(record, (metadata, exception) -> { if (exception != null) { logger.error("Send failed", exception); } }); ```
Prevention
- Monitor lag continuously with Prometheus/Burrow
- Set up alerts for lag growth rate, not just absolute value
- Size consumer group for 2x peak production rate
- Implement circuit breakers for downstream dependencies
- Use poison pill handling to prevent partition blocking
- Test consumer scaling procedures regularly
- Document runbooks for lag incidents
- Implement dead letter queues for failed messages
Related Errors
- **CommitFailedException**: Consumer group rebalanced during commit
- **TimeoutException**: Poll or commit exceeded timeout
- **RebalanceInProgressException**: Rebalance started during operation
- **NotLeaderForPartitionException**: Broker leadership changed