Introduction Elasticsearch returns HTTP 429 when the bulk thread pool queue is full and cannot accept new requests. This is a backpressure mechanism—rather than queuing indefinitely and risking OOM, Elasticsearch rejects excess requests immediately. Applications that do not handle 429 responses properly lose data or create retry storms.
Symptoms - Bulk indexing returns `{"error":{"type":"es_rejected_execution_exception"}}` - HTTP 429 responses from `POST /_bulk` endpoints - `GET /_nodes/stats/thread_pool` shows `write` or `write` queue at capacity - `GET /_cat/thread_pool?v` shows `write` queue `rejected` count increasing - Application data loss for rejected bulk requests
Common Causes - Bulk request rate exceeds the cluster indexing capacity - Bulk payload too large (many documents in a single request) - Thread pool queue size too small for the burst traffic - Slow indexing due to heavy mapping/analysis causing queue backup - No retry logic in the application for 429 responses
Step-by-Step Fix 1. **Check thread pool status": ```bash curl -s localhost:9200/_cat/thread_pool/write?v&h=node_name,active,queue,rejected,completed # If queue is at max (200 default) and rejected is increasing, you have a problem ```
- 1.**Implement exponential backoff retry in the application":
- 2.```python
- 3.import time
- 4.from elasticsearch import Elasticsearch
- 5.from elasticsearch.helpers import bulk
es = Elasticsearch(["http://localhost:9200"])
def bulk_with_retry(es, actions, max_retries=5): for attempt in range(max_retries): try: success, errors = bulk(es, actions, raise_on_error=False) if errors: # Retry only failed actions actions = [action for _, action in errors] time.sleep(2 ** attempt * 0.1) continue return success except Exception as e: if "rejected" in str(e) or "429" in str(e): time.sleep(2 ** attempt * 0.1) continue raise raise Exception(f"Bulk failed after {max_retries} retries") ```
- 1.**Reduce bulk request size and rate":
- 2.```python
- 3.def stream_bulk(docs, batch_size=500, delay_ms=100):
- 4."""Yield documents in batches with delays to avoid overwhelming ES"""
- 5.batch = []
- 6.for doc in docs:
- 7.batch.append({"index": {"_index": "my_index", "_id": doc["id"]}})
- 8.batch.append(doc)
- 9.if len(batch) >= batch_size * 2:
- 10.yield batch
- 11.batch = []
- 12.time.sleep(delay_ms / 1000)
- 13.if batch:
- 14.yield batch
- 15.
` - 16.**Increase thread pool queue size (temporary fix)":
- 17.```bash
- 18.curl -X PUT localhost:9200/_cluster/settings -H 'Content-Type: application/json' -d '{
- 19."persistent": {
- 20."thread_pool.write.queue_size": 1000
- 21.}
- 22.}'
- 23.
`