What's Actually Happening
Kafka producer fails to send messages after exhausting retry attempts. The producer cannot deliver messages to the Kafka cluster, causing data loss or application errors.
The Error You'll See
Retries exhausted:
org.apache.kafka.common.errors.RetriesExhaustedException:
Failed to send message after 3 retriesProducer timeout:
org.apache.kafka.common.errors.TimeoutException:
Failed to update metadata after 60000 ms.Network exception:
org.apache.kafka.common.errors.NetworkException:
The server disconnected before a response was received.Batch expired:
org.apache.kafka.common.errors.RecordBatchTooLargeException:
The batch is larger than the server's max.message.bytesWhy This Happens
- 1.Broker unavailable - Kafka brokers down or unreachable
- 2.Network issues - Connection timeouts or failures
- 3.Topic not found - Topic doesn't exist or auto-create disabled
- 4.Producer misconfigured - Insufficient retries or timeouts
- 5.Message too large - Exceeds broker max.message.bytes
- 6.Disk full - Broker cannot accept writes
Step 1: Check Broker Status
```bash # Check Kafka brokers running: systemctl status kafka
# Or in Kubernetes: kubectl get pods -n kafka
# List broker IDs: kafka-broker-api-versions.sh --bootstrap-server localhost:9092
# Check broker metadata: kafka-metadata.sh --snapshot /var/lib/kafka/data/__cluster_metadata-0/00000000000000000000.log --command "ls /brokers/ids"
# Check broker list via Zookeeper: zookeeper-shell.sh localhost:2181 <<< "ls /brokers/ids"
# Check topic leadership: kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic mytopic
# Check under-replicated partitions: kafka-topics.sh --bootstrap-server localhost:9092 --describe --under-replicated-partitions ```
Step 2: Check Network Connectivity
```bash # Test connection to Kafka brokers: nc -zv kafka-broker-1 9092 nc -zv kafka-broker-2 9092 nc -zv kafka-broker-3 9092
# Check from producer host: ping kafka-broker-1 telnet kafka-broker-1 9092
# Check firewall: iptables -L -n | grep 9092
# Allow Kafka port: iptables -I INPUT -p tcp --dport 9092 -j ACCEPT
# Check DNS resolution: nslookup kafka-broker-1 dig kafka-broker-1
# Test with kafka-console-producer: echo "test message" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test ```
Step 3: Check Topic Configuration
```bash # Check if topic exists: kafka-topics.sh --bootstrap-server localhost:9092 --list | grep mytopic
# If missing, create: kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mytopic --partitions 3 --replication-factor 2
# Check topic config: kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic mytopic
# Check topic-specific settings: kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --describe
# Check max.message.bytes: kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --describe | grep max.message.bytes
# Increase max.message.bytes if needed: kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --alter --add-config max.message.bytes=10485760 ```
Step 4: Check Producer Configuration
```java // Common producer config: Properties props = new Properties(); props.put("bootstrap.servers", "kafka-broker-1:9092,kafka-broker-2:9092"); props.put("key.serializer", StringSerializer.class.getName()); props.put("value.serializer", StringSerializer.class.getName());
// Retry configuration: props.put("retries", 10); // Number of retries props.put("retry.backoff.ms", 100); // Wait between retries props.put("delivery.timeout.ms", 120000); // Total time allowed for delivery
// Batch configuration: props.put("batch.size", 16384); // Batch size props.put("linger.ms", 5); // Wait before sending batch props.put("buffer.memory", 33554432); // Total buffer memory
// Request configuration: props.put("max.request.size", 1048576); // Max request size props.put("request.timeout.ms", 30000); // Request timeout props.put("acks", "all"); // Acknowledgment level
// Increase retries: props.put("retries", 20); props.put("delivery.timeout.ms", 300000); // 5 minutes ```
Step 5: Check Broker Disk Space
```bash # Check disk space on brokers: ssh kafka-broker-1 "df -h /var/lib/kafka/data"
# Check Kafka log directories: du -sh /var/lib/kafka/data/*
# If disk full: # 1. Delete old log segments kafka-delete-records.sh --bootstrap-server localhost:9092 --offset-json-file offsets.json
# Or increase disk size
# Check broker config for log retention: grep -E "log.retention|log.segment" /etc/kafka/server.properties
# Adjust retention: log.retention.hours=168 log.retention.bytes=1073741824 log.segment.bytes=1073741824
# Check if broker accepting writes: kafka-configs.sh --bootstrap-server localhost:9092 --entity-type brokers --entity-default --describe | grep log.retention ```
Step 6: Handle Large Messages
```bash # Check message size limits:
# Producer side: max.request.size = 1048576 # 1MB default
# Broker side: message.max.bytes = 1048588 # Per topic/partition replica.fetch.max.bytes = 1048576
# Consumer side: fetch.max.bytes = 52428800
# If messages larger, increase all:
# Producer: props.put("max.request.size", 10485760); // 10MB
# Broker: message.max.bytes=10485760 replica.fetch.max.bytes=10485760
# Topic: kafka-configs.sh --bootstrap-server localhost:9092 --entity-type topics --entity-name mytopic --alter --add-config max.message.bytes=10485760
# Or compress messages: props.put("compression.type", "gzip"); ```
Step 7: Check Broker Logs
```bash # Check Kafka server logs: tail -f /var/log/kafka/server.log
# Look for errors: grep -i "error|exception|failed" /var/log/kafka/server.log | tail -20
# Common errors: # - NotLeaderForPartitionException # - LeaderNotAvailableException # - ReplicaNotAvailableException # - KafkaStorageException
# Check controller logs: grep "Controller" /var/log/kafka/server.log
# Check for GC issues: grep "GC" /var/log/kafka/server.log
# Check broker state: kafka-broker-api-versions.sh --bootstrap-server localhost:9092 ```
Step 8: Implement Error Handling
```java // Producer with callback: Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { // Handle failure System.err.println("Send failed: " + exception.getMessage());
// Retry logic if (exception instanceof RetriesExhaustedException) { // Dead letter queue or other handling sendToDeadLetterQueue(record); } } else { // Success System.out.println("Sent to partition " + metadata.partition()); } } });
// Idempotent producer for exactly-once: props.put("enable.idempotence", true);
// Transactional producer: props.put("transactional.id", "my-transactional-id"); producer.initTransactions(); producer.beginTransaction(); try { producer.send(record); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); } ```
Step 9: Monitor Producer Metrics
```bash # Enable JMX for producer metrics: KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# Key producer metrics: # record-send-rate: Records sent per second # record-error-rate: Failed records per second # request-latency-avg: Average request latency # record-queue-time-avg: Time records wait in queue # buffer-available-bytes: Available buffer memory # buffer-exhausted-rate: Buffer exhaustion rate
# Check via JMX: jconsole <producer-host>:9999
# Prometheus metrics (if using JMX exporter): curl http://producer:7071/metrics | grep kafka_producer
# Alert for high error rate: - alert: KafkaProducerErrorRate expr: rate(kafka_producer_record_error_rate[5m]) > 0 for: 2m labels: severity: warning annotations: summary: "Kafka producer errors detected" ```
Step 10: Configure Dead Letter Queue
```bash # Create DLQ topic: kafka-topics.sh --bootstrap-server localhost:9092 --create --topic mytopic-dlq --partitions 3 --replication-factor 2
# Application code: public void sendWithFallback(ProducerRecord<String, String> record) { try { producer.send(record).get(30, TimeUnit.SECONDS); } catch (Exception e) { log.error("Failed to send to main topic, sending to DLQ", e); ProducerRecord<String, String> dlqRecord = new ProducerRecord<>( record.topic() + "-dlq", record.key(), record.value() + "|ERROR:" + e.getMessage() ); producer.send(dlqRecord); } }
# For Kafka Connect, configure DLQ: "errors.deadletterqueue.topic.name": "mytopic-dlq" "errors.deadletterqueue.topic.replication.factor": 2 "errors.retry.timeout": 60000 "errors.retry.max.delay": 1000 ```
Kafka Producer Retries Checklist
| Check | Command | Expected |
|---|---|---|
| Broker running | systemctl kafka | Active |
| Topic exists | kafka-topics --list | Topic listed |
| Network | nc -zv 9092 | Connected |
| Disk space | df -h | > 10% free |
| Producer retries | config | >= 10 |
| Message size | config | < max.message.bytes |
| Buffer memory | config | Adequate |
Verify the Fix
```bash # After fixing producer issues
# 1. Test producer send kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test // Send test message, no error
# 2. Check producer metrics curl producer:7071/metrics | grep kafka_producer_record_send_rate // > 0
# 3. Verify messages delivered kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning // Messages visible
# 4. Check error rate curl producer:7071/metrics | grep kafka_producer_record_error_rate // 0
# 5. Monitor under load # Run load test kafka-producer-perf-test.sh --topic test --num-records 10000 --record-size 100 --throughput 1000 --producer-props bootstrap.servers=localhost:9092 // All records sent
# 6. Check broker logs tail /var/log/kafka/server.log // No errors ```
Related Issues
- [Fix Kafka Consumer Lag Growing](/articles/fix-kafka-consumer-lag-growing)
- [Fix Kafka Broker Not Starting](/articles/fix-kafka-broker-not-starting)
- [Fix Kafka No Leader For Partition](/articles/fix-kafka-no-leader-for-partition)