Introduction
API rate limiting and throttling errors occur when clients exceed the allowed request rate set by API providers, resulting in HTTP 429 (Too Many Requests), HTTP 503 (Service Unavailable), or HTTP 403 (Forbidden) responses. Rate limiting protects APIs from abuse, ensures fair resource allocation, and prevents system overload. Common rate limiting implementations include fixed window counters, sliding window logs, token bucket, and leaky bucket algorithms. When rate limited, APIs return error responses with headers indicating limit status and retry timing (X-RateLimit-Limit, X-RateLimit-Remaining, X-RateLimit-Reset, Retry-After). Common causes include burst traffic exceeding limits, multiple clients sharing IP without coordination, missing authentication (lower unauthenticated limits), aggressive polling, webhook retry storms, distributed systems without centralized rate tracking, and misconfigured retry logic causing retry cascades. The fix requires understanding rate limit headers, implementing proper backoff strategies, request queuing, and architectural patterns for rate limit handling. This guide provides production-proven troubleshooting for API rate limiting across REST, GraphQL, and WebSocket APIs.
Symptoms
- HTTP 429 Too Many Requests response
- HTTP 503 Service Unavailable with rate limit message
{"error": "rate_limit_exceeded", "retry_after": 60}- HTTP 403 Forbidden with
API rate limit exceededmessage - Response headers show
X-RateLimit-Remaining: 0 Retry-After: 300header present- Requests succeed intermittently, fail under load
- GitHub API:
403 API rate limit exceeded - Google API:
429 Resource has been exhausted - AWS API:
ThrottlingException: Rate exceeded - Stripe API:
429 Too many requests - Connection dropped without response (aggressive throttling)
Common Causes
- Request rate exceeds API tier limits (free vs paid tiers)
- Burst traffic from application startup or batch jobs
- Multiple application instances sharing single API key
- Missing or expired authentication (lower rate limits)
- Polling too frequently instead of webhooks
- Retry logic without backoff causing cascade
- No request queuing or rate limiting on client side
- Distributed services without coordinated rate tracking
- Single IP making requests for multiple users
- Webhook delivery retries overwhelming endpoint
- Cache miss storm hitting API simultaneously
- CDN or proxy caching not configured for rate-limited APIs
Step-by-Step Fix
### 1. Diagnose rate limit issues
Identify rate limit headers:
```bash # Make test request and check headers curl -i https://api.github.com/users/octocat
# GitHub rate limit headers: # X-RateLimit-Limit: 60 # X-RateLimit-Remaining: 0 # X-RateLimit-Reset: 1711987200 # Retry-After: 300
# Common rate limit headers: # X-RateLimit-Limit: Maximum requests allowed # X-RateLimit-Remaining: Requests remaining in window # X-RateLimit-Reset: Unix timestamp when limit resets # Retry-After: Seconds to wait before retrying # X-Retry-After: Alternative header name # RateLimit-Limit: RFC 6585 standard # RateLimit-Remaining: RFC 6585 standard # RateLimit-Reset: RFC 6585 standard ```
Check rate limit status:
```bash # GitHub - Check rate limit status curl -i https://api.github.com/rate_limit
# Output: # { # "resources": { # "core": { # "limit": 60, # "remaining": 0, # "reset": 1711987200, # "used": 60 # }, # "search": { # "limit": 10, # "remaining": 10, # "reset": 1711987200, # "used": 0 # } # } # }
# GitLab - Check rate limit curl -i https://gitlab.com/api/v4/application/rate_limit
# Stripe - Rate limit info in response # Check headers on any request
# Google APIs - Check quota # https://console.cloud.google.com/apis/api/[api-name]/quotas
# AWS - Check CloudWatch metrics # CloudWatch > API Gateway > Count/4XXError/429Error ```
Analyze rate limit patterns:
```bash # Log rate limit headers over time # Add to your API client logging:
# Python example import requests import logging
response = requests.get('https://api.example.com/data') logging.info(f"Rate limit: remaining={response.headers.get('X-RateLimit-Remaining')}, reset={response.headers.get('X-RateLimit-Reset')}")
# Check when limits reset reset_timestamp = int(response.headers.get('X-RateLimit-Reset', 0)) reset_time = datetime.fromtimestamp(reset_timestamp) print(f"Limit resets at: {reset_time}")
# Calculate wait time from datetime import datetime wait_seconds = reset_timestamp - int(datetime.now().timestamp()) print(f"Wait {wait_seconds} seconds before retrying") ```
### 2. Implement exponential backoff
Basic retry with exponential backoff:
```python # Python - Exponential backoff with retry import requests import time from datetime import datetime, timedelta
def api_request_with_backoff(url, max_retries=5): """Make API request with exponential backoff for rate limits."""
base_delay = 1 # Start with 1 second max_delay = 300 # Maximum 5 minutes
for attempt in range(max_retries): try: response = requests.get(url)
# Success - not rate limited if response.status_code != 429: return response
# Rate limited - extract retry-after retry_after = response.headers.get('Retry-After') if retry_after: delay = min(int(retry_after), max_delay) else: # Exponential backoff: 1s, 2s, 4s, 8s, 16s delay = min(base_delay * (2 ** attempt), max_delay)
print(f"Rate limited. Waiting {delay}s before retry {attempt + 1}/{max_retries}") time.sleep(delay)
except requests.RequestException as e: if attempt == max_retries - 1: raise time.sleep(base_delay * (2 ** attempt))
raise Exception(f"Failed after {max_retries} retries")
# Usage response = api_request_with_backoff('https://api.example.com/data') ```
```javascript // Node.js - Exponential backoff with jitter async function apiRequestWithBackoff(url, maxRetries = 5) { const baseDelay = 1000; // 1 second const maxDelay = 300000; // 5 minutes
for (let attempt = 0; attempt < maxRetries; attempt++) { try { const response = await fetch(url);
// Not rate limited if (response.status !== 429) { return response; }
// Get retry-after header const retryAfter = response.headers.get('Retry-After'); let delay;
if (retryAfter) { delay = Math.min(parseInt(retryAfter) * 1000, maxDelay); } else { // Exponential backoff with jitter delay = Math.min(baseDelay * Math.pow(2, attempt), maxDelay); // Add jitter (±25%) delay = delay * (0.75 + Math.random() * 0.5); }
console.log(Rate limited. Waiting ${delay}ms before retry ${attempt + 1}/${maxRetries});
await sleep(delay);
} catch (error) { if (attempt === maxRetries - 1) { throw error; } await sleep(baseDelay * Math.pow(2, attempt)); } } }
function sleep(ms) { return new Promise(resolve => setTimeout(resolve, ms)); } ```
Backoff with circuit breaker:
```python # Python - Circuit breaker pattern from datetime import datetime, timedelta from functools import wraps
class CircuitBreaker: def __init__(self, failure_threshold=5, reset_timeout=300): self.failure_threshold = failure_threshold self.reset_timeout = reset_timeout self.failures = 0 self.last_failure = None self.state = 'closed' # closed, open, half-open
def call(self, func, *args, **kwargs): if self.state == 'open': if datetime.now() - self.last_failure > timedelta(seconds=self.reset_timeout): self.state = 'half-open' else: raise Exception("Circuit breaker is open")
try: result = func(*args, **kwargs) if self.state == 'half-open': self.state = 'closed' self.failures = 0 return result except RateLimitError as e: self.failures += 1 self.last_failure = datetime.now() if self.failures >= self.failure_threshold: self.state = 'open' raise
# Usage breaker = CircuitBreaker(failure_threshold=5, reset_timeout=300)
def api_call(): response = requests.get('https://api.example.com/data') if response.status_code == 429: raise RateLimitError("Rate limited") return response
try: result = breaker.call(api_call) except RateLimitError: print("Circuit breaker triggered - waiting before retry") ```
### 3. Implement request queuing
Token bucket rate limiter:
```python # Python - Token bucket implementation import time import threading from collections import deque
class TokenBucket: def __init__(self, rate, capacity): """ rate: tokens per second (request rate) capacity: maximum bucket size (burst capacity) """ self.rate = rate self.capacity = capacity self.tokens = capacity self.last_update = time.time() self.lock = threading.Lock() self.wait_queue = deque()
def consume(self, tokens=1, blocking=True, timeout=None): """Consume tokens from bucket.""" start_time = time.time()
while True: with self.lock: # Add tokens based on elapsed time now = time.time() elapsed = now - self.last_update self.tokens = min(self.capacity, self.tokens + elapsed * self.rate) self.last_update = now
# Check if we can consume if self.tokens >= tokens: self.tokens -= tokens return True
# Not enough tokens if not blocking: return False
# Calculate wait time needed = tokens - self.tokens wait_time = needed / self.rate
# Check timeout if timeout and (time.time() - start_time) > timeout: return False
# Wait for tokens to refill time.sleep(min(wait_time, 0.1)) # Poll every 100ms max
def wait_time(self, tokens=1): """Get time until tokens available.""" with self.lock: if self.tokens >= tokens: return 0 needed = tokens - self.tokens return needed / self.rate
# Usage limiter = TokenBucket(rate=10, capacity=20) # 10 req/s, burst 20
def make_api_request(): if limiter.consume(blocking=True, timeout=30): return requests.get('https://api.example.com/data') else: raise Exception("Request timeout - rate limited") ```
Request queue with priority:
```python # Python - Priority request queue import heapq import threading from dataclasses import dataclass, field from typing import Any import time
@dataclass(order=True) class PrioritizedRequest: priority: int timestamp: float = field(compare=False) request_data: Any = field(compare=False)
class RequestQueue: def __init__(self, rate_limiter): self.queue = [] self.rate_limiter = rate_limiter self.lock = threading.Lock() self.worker_thread = threading.Thread(target=self._process_queue, daemon=True) self.worker_thread.start()
def enqueue(self, request_data, priority=0): """Add request to queue. Lower priority number = higher priority.""" with self.lock: heapq.heappush(self.queue, PrioritizedRequest(priority, time.time(), request_data))
def _process_queue(self): """Background worker to process queued requests.""" while True: request = None with self.lock: if self.queue: # Check if we can make request if self.rate_limiter.consume(blocking=False): request = heapq.heappop(self.queue)
if request: try: # Make actual API call result = self._make_request(request.request_data) request.request_data['callback'](result) except Exception as e: request.request_data['error_callback'](e) else: time.sleep(0.1)
def _make_request(self, data): # Actual API call logic pass
# Usage queue = RequestQueue(TokenBucket(rate=10, capacity=20))
# High priority request queue.enqueue({'url': 'https://api.example.com/critical'}, priority=0)
# Normal priority request queue.enqueue({'url': 'https://api.example.com/normal'}, priority=1)
# Low priority / batch request queue.enqueue({'url': 'https://api.example.com/batch'}, priority=10) ```
### 4. Handle specific API rate limits
GitHub API:
```python # GitHub API rate limiting # Authenticated: 5000 requests/hour # Unauthenticated: 60 requests/hour
import requests from datetime import datetime, timedelta
class GitHubClient: def __init__(self, token=None): self.session = requests.Session() if token: self.session.headers['Authorization'] = f'token {token}' self.rate_limit_reset = None
def _check_rate_limit(self, response): """Check and handle GitHub rate limit.""" remaining = int(response.headers.get('X-RateLimit-Remaining', 60)) reset_time = int(response.headers.get('X-RateLimit-Reset', 0))
if remaining < 5: print(f"Warning: Only {remaining} requests remaining")
if response.status_code == 403 and 'rate limit' in response.text.lower(): reset_dt = datetime.fromtimestamp(reset_time) wait_time = (reset_dt - datetime.now()).total_seconds() + 60 # Add buffer print(f"Rate limited. Reset at {reset_dt}, waiting {wait_time}s") time.sleep(wait_time) return False return True
def request(self, endpoint): """Make rate-limit-aware request.""" response = self.session.get(f'https://api.github.com{endpoint}')
if not self._check_rate_limit(response): # Retry after waiting return self.request(endpoint)
return response
# Use GraphQL for more efficient queries # GraphQL has separate rate limit (based on nodes, not requests) graphql_query = """ query { repository(owner: "octocat", name: "Hello-World") { issues(first: 100) { nodes { title number } } } } """ ```
Google APIs:
```python # Google API rate limiting from google.api_core.exceptions import ResourceExhausted from google.api_core.retry import Retry
# Use built-in retry with exponential backoff from google.api_core import retry
@retry.Retry( initial=1.0, maximum=60.0, multiplier=2.0, deadline=300.0, predicate=retry.if_exception_type(ResourceExhausted) ) def call_google_api(): from google.cloud import storage client = storage.Client() bucket = client.get_bucket('my-bucket') return bucket.list_blobs()
# Or use quotas from Google Cloud Console # Set up quota project: https://console.cloud.google.com/apis/api/[api]/quotas # Request quota increase if needed ```
AWS API:
```python # AWS API throttling import boto3 from botocore.exceptions import ClientError from botocore.config import Config
# Configure retry with exponential backoff config = Config( retries={ 'max_attempts': 10, 'mode': 'adaptive' # or 'standard' } )
# Create client with retry config client = boto3.client('dynamodb', config=config)
# Handle throttling manually def make_request_with_retry(operation, max_retries=5): from time import sleep from random import random
for attempt in range(max_retries): try: return operation() except ClientError as e: if e.response['Error']['Code'] == 'ThrottlingException': # Exponential backoff with jitter delay = (2 ** attempt) + random() sleep(delay) else: raise
raise Exception(f"Failed after {max_retries} retries due to throttling")
# Use batch operations to reduce API calls # Instead of 100 PutItem calls, use batch_write_item ```
Stripe API:
```python # Stripe API rate limiting import stripe from stripe.error import RateLimitError
stripe.api_key = 'sk_test_...'
def create_charge_with_retry(customer_id, amount): max_retries = 5
for attempt in range(max_retries): try: return stripe.Charge.create( amount=amount, currency='usd', customer=customer_id ) except RateLimitError as e: if attempt == max_retries - 1: raise
# Stripe includes retry_after in response retry_after = e.http_body.get('retry_after', 2 ** attempt) time.sleep(retry_after)
# Stripe has different limits per endpoint # Check limit headers on response ```
### 5. Optimize API usage patterns
Implement caching:
```python # Python - Cache API responses from functools import lru_cache import requests import time
# Simple LRU cache for API responses @lru_cache(maxsize=1000) def cached_api_call(endpoint, params_hash): """Cache API responses.""" response = requests.get(f'https://api.example.com{endpoint}') response.raise_for_status() return response.json()
# Usage with cache key def get_user_data(user_id): import hashlib params_hash = hashlib.md5(str(user_id).encode()).hexdigest() return cached_api_call(f'/users/{user_id}', params_hash)
# Time-based cache with expiration class ExpiringCache: def __init__(self, ttl_seconds=300): self.cache = {} self.ttl = ttl_seconds
def get(self, key): if key in self.cache: value, timestamp = self.cache[key] if time.time() - timestamp < self.ttl: return value del self.cache[key] return None
def set(self, key, value): self.cache[key] = (value, time.time())
# Usage cache = ExpiringCache(ttl_seconds=60)
def api_with_cache(endpoint): cached = cache.get(endpoint) if cached: return cached
response = requests.get(f'https://api.example.com{endpoint}') cache.set(endpoint, response.json()) return response.json() ```
Use webhooks instead of polling:
```python # Instead of polling every 5 seconds: # BAD - Polling (uses rate limit) def poll_for_updates(): while True: response = requests.get('https://api.example.com/updates') process_updates(response.json()) time.sleep(5) # 12 requests per minute!
# GOOD - Use webhooks (no rate limit impact) from flask import Flask, request
app = Flask(__name__)
@app.route('/webhook/updates', methods=['POST']) def webhook_handler(): update = request.json process_updates([update]) # Process as received return {'status': 'ok'}
# Register webhook with API provider # POST https://api.example.com/webhooks # { # "url": "https://myapp.com/webhook/updates", # "events": ["*"] # } ```
Batch requests:
```python # Batch multiple operations into single request # Instead of: for item in items: requests.post('https://api.example.com/items', json=item) # N requests, N rate limit tokens
# Use batch endpoint: requests.post('https://api.example.com/items/batch', json={'items': items}) # 1 request, 1 rate limit token
# GitHub GraphQL - batch queries query = """ query { repo1: repository(owner: "org", name: "repo1") { issues(first: 10) { nodes { title } } } repo2: repository(owner: "org", name: "repo2") { issues(first: 10) { nodes { title } } } } """ ```
### 6. Monitor rate limit usage
Prometheus metrics:
```yaml # Track rate limit metrics # Python with prometheus_client from prometheus_client import Counter, Gauge, Histogram
# Metrics api_requests_total = Counter('api_requests_total', 'Total API requests', ['status', 'endpoint']) rate_limit_remaining = Gauge('api_rate_limit_remaining', 'Rate limit remaining', ['endpoint']) rate_limit_reset_seconds = Gauge('api_rate_limit_reset_seconds', 'Seconds until rate limit reset', ['endpoint']) rate_limit_hits_total = Counter('api_rate_limit_hits_total', 'Times rate limited', ['endpoint']) request_latency = Histogram('api_request_latency_seconds', 'API request latency')
# In API client def make_request(endpoint): start = time.time() response = requests.get(f'https://api.example.com{endpoint}')
api_requests_total.labels(status=response.status_code, endpoint=endpoint).inc()
if 'X-RateLimit-Remaining' in response.headers: remaining = int(response.headers['X-RateLimit-Remaining']) rate_limit_remaining.labels(endpoint=endpoint).set(remaining)
if response.status_code == 429: rate_limit_hits_total.labels(endpoint=endpoint).inc()
request_latency.observe(time.time() - start) return response ```
Grafana alert rules:
```yaml groups: - name: api_rate_limits rules: - alert: RateLimitApproaching expr: api_rate_limit_remaining < 10 for: 5m labels: severity: warning annotations: summary: "API rate limit nearly exhausted" description: "{{ $labels.endpoint }} has {{ $value }} requests remaining"
- alert: RateLimitExceeded
- expr: rate(api_rate_limit_hits_total[5m]) > 0
- for: 5m
- labels:
- severity: warning
- annotations:
- summary: "API rate limit being hit"
- description: "{{ $value }} rate limit errors per second on {{ $labels.endpoint }}"
- alert: HighRateLimitErrorRate
- expr: |
- sum(rate(api_requests_total{status="429"}[5m]))
- /
- sum(rate(api_requests_total[5m]))
- > 0.1
- for: 10m
- labels:
- severity: critical
- annotations:
- summary: "More than 10% of API requests are rate limited"
`
Prevention
- Implement client-side rate limiting to stay under API limits
- Use exponential backoff with jitter for retries
- Cache responses to reduce API calls
- Use webhooks instead of polling where available
- Batch multiple operations into single requests
- Monitor rate limit headers and set up alerts
- Use authenticated requests for higher limits
- Implement request queuing for burst traffic
- Distribute API calls across multiple keys/accounts
- Document rate limits and design architecture accordingly
- Test rate limit handling in staging environment
Related Errors
- **HTTP 503 Service Unavailable**: Server temporarily overloaded
- **HTTP 504 Gateway Timeout**: Upstream server timeout
- **HTTP 408 Request Timeout**: Client took too long to send request
- **HTTP 403 Forbidden**: Access denied (may include rate limit message)
- **ETIMEDOUT**: Connection timeout from network layer