What's Actually Happening
Your Kafka consumers are falling behind producers, causing increasing lag between the latest message in the topic and the last processed message. This leads to delayed data processing and potential data staleness.
The Error You'll See
Consumer lag growing:
```bash $ kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG mygroup mytopic 0 1000 50000 49000 mygroup mytopic 1 1500 50000 48500 ```
Consumer group rebalancing:
# Consumer logs show:
Revoking previously assigned partitions [mytopic-0, mytopic-1]
Joining group
Successfully synced groupProcessing timeout:
ERROR: Exceeded max.poll.interval.ms while processing batch
Consumer left the groupWhy This Happens
- 1.Too few consumers - Not enough consumers for partition count
- 2.Slow processing - Processing takes longer than message arrival rate
- 3.Consumer rebalances - Frequent group rebalancing
- 4.Network latency - High latency between consumer and broker
- 5.Memory pressure - Consumer OOM or GC pauses
- 6.Offset commit failures - Offsets not committed
- 7.Poison messages - Messages causing processing failures
- 8.Partition skew - Uneven distribution across partitions
Step 1: Monitor Consumer Lag
```bash # Check consumer group lag: kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup
# Monitor lag in real-time: watch -n 5 'kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group mygroup'
# Get lag for all groups: kafka-consumer-groups --bootstrap-server localhost:9092 --list | \ xargs -I {} kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group {}
# Check consumer group status: kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup --state
# Check consumer group members: kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup --members --verbose ```
Step 2: Scale Consumers Appropriately
```bash # Check partition count: kafka-topics --bootstrap-server localhost:9092 \ --describe --topic mytopic
# Output: # Topic: mytopic PartitionCount: 12
# Rule: Consumers should equal or be less than partitions # For 12 partitions: 12 consumers max, 4-6 optimal
# Add more consumers: # In Kubernetes: kubectl scale deployment my-consumer --replicas=6
# Verify assignment: kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup --members --verbose
# Output shows partition assignments: # CONSUMER-ID HOST PARTITIONS # consumer-1 10.0.0.1 mytopic-0,mytopic-1,mytopic-2 # consumer-2 10.0.0.2 mytopic-3,mytopic-4,mytopic-5 ```
Step 3: Optimize Consumer Configuration
```properties # Consumer configuration (consumer.properties):
# Fetch settings for throughput: fetch.min.bytes=1048576 # 1MB minimum fetch fetch.max.wait.ms=500 # Max wait time max.partition.fetch.bytes=1048576 # 1MB per partition
# Poll settings: max.poll.records=500 # Records per poll max.poll.interval.ms=300000 # 5 minutes between polls
# Session and heartbeat: session.timeout.ms=30000 # 30 seconds heartbeat.interval.ms=10000 # 10 seconds
# Processing settings: enable.auto.commit=false # Manual commits auto.offset.reset=latest # Start from latest
# Partition assignment: partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor ```
```java // Java consumer configuration: Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "mygroup"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Performance tuning: props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1048576); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 1048576); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Use cooperative sticky assignor for stability: props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singletonList("mytopic")); ```
Step 4: Implement Batch Processing
```java // Batch processing consumer: while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (!records.isEmpty()) { // Process batch List<Future<ProcessResult>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) { futures.add(executorService.submit(() -> processRecord(record))); }
// Wait for all to complete for (Future<ProcessResult> future : futures) { try { future.get(30, TimeUnit.SECONDS); } catch (TimeoutException e) { // Handle timeout log.error("Processing timeout"); } }
// Commit offsets after successful processing consumer.commitSync(); } }
// Or use async commit for better throughput: consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { log.error("Commit failed", exception); } else { log.debug("Committed offsets: {}", offsets); } } }); ```
Step 5: Handle Rebalancing Issues
```java // Consumer with rebalance listener: consumer.subscribe(Collections.singletonList("mytopic"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { log.info("Partitions revoked: {}", partitions); // Commit current offsets before losing partition consumer.commitSync(); // Cleanup resources }
@Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { log.info("Partitions assigned: {}", partitions); // Initialize for new partitions } });
// Static group membership (Kafka 2.3+): props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "consumer-1"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000); // 5 minutes
// This prevents rebalance when consumer restarts if within session timeout ```
Step 6: Fix Offset Commit Issues
```java // Proper offset commit pattern:
// Option 1: Commit after each batch while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
for (ConsumerRecord<String, String> record : records) { try { processRecord(record); } catch (Exception e) { log.error("Failed to process record at offset {}", record.offset(), e); // Send to DLQ or skip continue; } }
// Commit after processing try { consumer.commitSync(); } catch (CommitFailedException e) { log.error("Commit failed", e); // Consumer will rebalance } }
// Option 2: Manual offset tracking with commit Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) { processRecord(record);
currentOffsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, null) ); }
consumer.commitSync(currentOffsets); ```
Step 7: Implement Dead Letter Queue
```java // DLQ handling: public void processWithDLQ(ConsumerRecord<String, String> record, KafkaProducer<String, String> dlqProducer) { int maxRetries = 3; int retryCount = getRetryCount(record);
try { if (retryCount < maxRetries) { processRecord(record); } else { // Send to DLQ ProducerRecord<String, String> dlqRecord = new ProducerRecord<>( "mytopic.dlq", record.key(), record.value() ); dlqProducer.send(dlqRecord); log.warn("Sent to DLQ: offset={}", record.offset()); } } catch (Exception e) { // Retry by not committing log.error("Processing failed, will retry", e); throw e; } }
private int getRetryCount(ConsumerRecord<String, String> record) { if (record.headers() != null) { Header retryHeader = record.headers().lastHeader("retry-count"); if (retryHeader != null) { return Integer.parseInt(new String(retryHeader.value())); } } return 0; } ```
Step 8: Monitor Consumer Metrics
```java // Enable JMX metrics: // System.setProperty("kafka.jmx.port", "9999");
// Key metrics to monitor: // - records-consumed-total // - records-consumed-rate // - bytes-consumed-total // - fetch-rate // - commit-rate // - assigned-partitions // - commit-latency-avg // - consumer-lag
// Prometheus metrics: public class KafkaMetrics { private final MeterRegistry registry;
public void registerConsumerMetrics(KafkaConsumer<String, String> consumer) { // Lag gauge Gauge.builder("kafka.consumer.lag", consumer, c -> { Map<MetricName, ? extends Metric> metrics = c.metrics(); // Calculate lag return calculateLag(metrics); }).register(registry); } }
// Use Burrow for lag monitoring: # burrow.toml [general] client-id = "burrow"
[zookeeper] hostname = "localhost" port = 2181
[listener "http"] port = 8000
# Check lag via Burrow API: curl http://localhost:8000/v3/kafka/local/consumer/mygroup/lag ```
Step 9: Optimize Producer Throughput
# Producer configuration to reduce lag:
acks=1 # Don't wait for all replicas
compression.type=lz4 # Compress messages
batch.size=32768 # 32KB batches
linger.ms=10 # Wait 10ms to batch
buffer.memory=67108864 # 64MB buffer
max.in.flight.requests.per.connection=5
enable.idempotence=true # Exactly onceStep 10: Production Consumer Pattern
```java // Complete production consumer: public class ProductionKafkaConsumer implements Runnable { private final KafkaConsumer<String, String> consumer; private final String topic; private final ExecutorService executor; private volatile boolean running = true;
public ProductionKafkaConsumer(Properties config, String topic) { this.consumer = new KafkaConsumer<>(config); this.topic = topic; this.executor = Executors.newFixedThreadPool(10); }
@Override public void run() { try { consumer.subscribe(Collections.singletonList(topic));
while (running) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
if (!records.isEmpty()) { processBatch(records); }
} catch (WakeupException e) { // Ignore, shutting down } catch (Exception e) { log.error("Consumer error", e); } } } finally { consumer.close(); executor.shutdown(); } }
private void processBatch(ConsumerRecords<String, String> records) { List<CompletableFuture<Void>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) { futures.add(CompletableFuture.runAsync(() -> { processRecord(record); }, executor)); }
// Wait for completion CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .join();
// Commit consumer.commitSync(); }
public void shutdown() { running = false; consumer.wakeup(); } } ```
Kafka Consumer Lag Checklist
| Check | Command | Expected |
|---|---|---|
| Partition count | describe topic | Known value |
| Consumer count | describe group | <= partitions |
| Lag trend | monitor | Decreasing |
| Rebalance rate | logs | Low frequency |
| Commit success | metrics | High rate |
| Processing rate | metrics | > ingest rate |
Verify the Fix
```bash # After optimizing:
# 1. Check lag decreasing kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup # Output: LAG column decreasing
# 2. Verify partition assignment kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group mygroup --members --verbose # Output: Balanced assignment
# 3. Monitor throughput # Records processed per second should exceed produced
# Compare before/after: # Before: LAG 49000, increasing # After: LAG 100, stable ```
Related Issues
- [Fix RabbitMQ Consumer Stuck](/articles/fix-rabbitmq-consumer-stuck)
- [Fix Kafka Broker Unavailable](/articles/fix-kafka-broker-unavailable)
- [Fix Kafka Producer Timeout](/articles/fix-kafka-producer-timeout)