LLM Error Handling Playbook: Retries, Fallbacks, and Circuit Breakers in Production
LLM APIs fail. Rate limits, timeouts, model availability issues, and occasional server errors are part of the reality of building on top of AI providers. This playbook covers the patterns that keep your application running when providers have problems.
The Error Taxonomy
Before writing retry logic, understand which errors are retriable:
| HTTP Status | Error Type | Retriable? | Strategy |
| 400 | Bad Request | No | Fix the request |
| 401 | Unauthorized | No | Fix your API key |
| 403 | Forbidden | No | Check permissions |
| 422 | Unprocessable | No | Fix the request format |
| 429 | Rate Limited | Yes | Backoff + retry |
| 500 | Server Error | Yes | Retry with backoff |
| 502 | Bad Gateway | Yes | Retry immediately |
| 503 | Service Unavailable | Yes | Retry with backoff |
| 504 | Gateway Timeout | Yes | Retry with backoff |
| Timeout | Network Timeout | Yes | Retry with backoff |
Never retry 4xx errors (except 429). They won't succeed on retry — the request is fundamentally wrong.
Pattern 1: Exponential Backoff with Jitter
The standard retry pattern. Wait longer after each failure, add randomness (jitter) to prevent thundering herd.
Python
import time
import random
import anthropic
from anthropic import RateLimitError, APIStatusError, APITimeoutError
def call_with_backoff(
messages: list,
model: str = "claude-sonnet-4-5",
max_retries: int = 5,
base_delay: float = 1.0,
max_delay: float = 60.0
) -> anthropic.types.Message:
"""Call Claude with exponential backoff on retriable errors."""
client = anthropic.Anthropic()
for attempt in range(max_retries):
try:
return client.messages.create(
model=model,
max_tokens=4096,
messages=messages
)
except RateLimitError as e:
if attempt == max_retries - 1:
raise
# Check if provider gave us a Retry-After header
retry_after = getattr(e, 'retry_after', None)
if retry_after:
wait = float(retry_after)
else:
# Exponential backoff with full jitter
wait = min(base_delay * (2 ** attempt), max_delay)
wait = random.uniform(0, wait) # Full jitter
print(f"Rate limited. Waiting {wait:.1f}s (attempt {attempt+1}/{max_retries})")
time.sleep(wait)
except APITimeoutError:
if attempt == max_retries - 1:
raise
wait = min(base_delay * (2 ** attempt), max_delay)
wait = random.uniform(wait * 0.5, wait) # Decorrelated jitter
print(f"Timeout. Retrying in {wait:.1f}s")
time.sleep(wait)
except APIStatusError as e:
if e.status_code in (500, 502, 503, 504):
if attempt == max_retries - 1:
raise
wait = min(base_delay * (2 ** attempt), max_delay)
wait = random.uniform(0, wait)
print(f"Server error {e.status_code}. Retrying in {wait:.1f}s")
time.sleep(wait)
else:
raise # Don't retry 4xx errors
TypeScript
import Anthropic from "@anthropic-ai/sdk";
const client = new Anthropic();
async function callWithBackoff(
messages: Anthropic.MessageParam[],
options: {
model?: string;
maxRetries?: number;
baseDelay?: number;
maxDelay?: number;
} = {}
): Promise<Anthropic.Message> {
const {
model = "claude-sonnet-4-5",
maxRetries = 5,
baseDelay = 1000,
maxDelay = 60000,
} = options;
for (let attempt = 0; attempt < maxRetries; attempt++) {
try {
return await client.messages.create({
model,
max_tokens: 4096,
messages,
});
} catch (error) {
if (attempt === maxRetries - 1) throw error;
let wait: number;
let isRetriable = false;
if (error instanceof Anthropic.RateLimitError) {
isRetriable = true;
// Check for Retry-After header
const retryAfter = error.headers?.['retry-after'];
wait = retryAfter
? parseFloat(retryAfter) * 1000
: Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
} else if (error instanceof Anthropic.APIConnectionTimeoutError) {
isRetriable = true;
wait = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
} else if (
error instanceof Anthropic.InternalServerError ||
(error instanceof Anthropic.APIError && [502, 503, 504].includes(error.status))
) {
isRetriable = true;
wait = Math.min(baseDelay * Math.pow(2, attempt), maxDelay);
}
if (!isRetriable) throw error;
// Add jitter
wait = wait * (0.5 + Math.random() * 0.5);
console.log(`Retrying in ${(wait / 1000).toFixed(1)}s (attempt ${attempt + 1}/${maxRetries})`);
await new Promise((resolve) => setTimeout(resolve, wait));
}
}
throw new Error("Unreachable");
}
Pattern 2: Model Fallback Chain
When your primary model is unavailable or too slow, fall back to alternatives.
from dataclasses import dataclass
from typing import Optional
import anthropic
from openai import OpenAI
@dataclass
class ModelConfig:
provider: str # "anthropic" | "openai" | "google"
model: str
max_tokens: int = 4096
FALLBACK_CHAIN = [
ModelConfig("anthropic", "claude-sonnet-4-5"),
ModelConfig("openai", "gpt-4o"),
ModelConfig("openai", "gpt-4o-mini"), # Cheapest fallback
]
def call_with_fallback(
messages: list,
fallback_chain: list[ModelConfig] = FALLBACK_CHAIN,
task_context: str = ""
) -> tuple[str, ModelConfig]:
"""Try each model in chain until one succeeds. Returns (response, model_used)."""
anthropic_client = anthropic.Anthropic()
openai_client = OpenAI()
last_error = None
for config in fallback_chain:
try:
if config.provider == "anthropic":
response = anthropic_client.messages.create(
model=config.model,
max_tokens=config.max_tokens,
messages=messages,
timeout=30.0 # Hard timeout
)
return response.content[0].text, config
elif config.provider == "openai":
# Convert message format if needed
oai_messages = convert_to_openai_format(messages)
response = openai_client.chat.completions.create(
model=config.model,
max_tokens=config.max_tokens,
messages=oai_messages,
timeout=30.0
)
return response.choices[0].message.content, config
except Exception as e:
last_error = e
print(f"Model {config.model} failed: {e}. Trying next...")
continue
raise RuntimeError(f"All models failed. Last error: {last_error}")
def convert_to_openai_format(messages: list) -> list:
"""Convert Anthropic message format to OpenAI format."""
converted = []
for msg in messages:
if isinstance(msg.get("content"), str):
converted.append({"role": msg["role"], "content": msg["content"]})
elif isinstance(msg.get("content"), list):
# Flatten content blocks to text
text = " ".join(
block.get("text", "") if isinstance(block, dict) else str(block)
for block in msg["content"]
if isinstance(block, dict) and block.get("type") == "text"
)
converted.append({"role": msg["role"], "content": text})
return converted
Pattern 3: Circuit Breaker
A circuit breaker prevents cascading failures. After N consecutive failures, it "opens" the circuit and immediately returns an error for a cooldown period, instead of making calls that will certainly fail.
import time
from enum import Enum
from threading import Lock
from dataclasses import dataclass, field
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing — reject all calls
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreaker:
failure_threshold: int = 5 # Open after 5 failures
recovery_timeout: float = 60.0 # Try again after 60 seconds
success_threshold: int = 2 # Close after 2 successes in HALF_OPEN
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
success_count: int = 0
last_failure_time: float = 0.0
_lock: Lock = field(default_factory=Lock)
def call(self, fn, *args, **kwargs):
with self._lock:
if self.state == CircuitState.OPEN:
# Check if recovery timeout has elapsed
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
print("Circuit HALF_OPEN: testing recovery")
else:
remaining = self.recovery_timeout - (time.time() - self.last_failure_time)
raise CircuitOpenError(f"Circuit open. Retry in {remaining:.0f}s")
try:
result = fn(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
with self._lock:
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
print("Circuit CLOSED: service recovered")
elif self.state == CircuitState.CLOSED:
self.failure_count = 0 # Reset on success
def _on_failure(self):
with self._lock:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"Circuit OPEN after {self.failure_count} failures")
class CircuitOpenError(Exception):
pass
# Usage
breakers = {
"anthropic": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
"openai": CircuitBreaker(failure_threshold=5, recovery_timeout=60),
}
def resilient_llm_call(messages: list, provider: str = "anthropic") -> str:
breaker = breakers[provider]
try:
return breaker.call(call_provider, messages, provider)
except CircuitOpenError:
# Fallback to alternate provider
alt_provider = "openai" if provider == "anthropic" else "anthropic"
print(f"{provider} circuit open, falling back to {alt_provider}")
return breakers[alt_provider].call(call_provider, messages, alt_provider)
Pattern 4: Timeout Handling
Always set timeouts. LLM providers can occasionally hang on requests.
import asyncio
import anthropic
async def call_with_timeout(
messages: list,
timeout_seconds: float = 30.0
) -> str:
client = anthropic.AsyncAnthropic()
try:
response = await asyncio.wait_for(
client.messages.create(
model="claude-sonnet-4-5",
max_tokens=4096,
messages=messages
),
timeout=timeout_seconds
)
return response.content[0].text
except asyncio.TimeoutError:
raise TimeoutError(f"LLM call exceeded {timeout_seconds}s timeout")
# Synchronous version with httpx timeout
client = anthropic.Anthropic(
timeout=anthropic.Timeout(
connect=5.0, # 5s to connect
read=60.0, # 60s to read response
write=10.0, # 10s to write request
pool=5.0 # 5s to get connection from pool
)
)
Pattern 5: Rate Limit Budget Management
Proactively manage rate limits rather than reacting to 429 errors:
class RateLimitBudget {
private tokensUsed: number = 0;
private requestsThisMinute: number = 0;
private windowStart: number = Date.now();
// Anthropic defaults: 40K tokens/min, 2000 req/min (Tier 1)
constructor(
private maxTokensPerMinute: number = 40_000,
private maxRequestsPerMinute: number = 2000
) {}
async waitIfNeeded(estimatedTokens: number = 1000): Promise<void> {
const now = Date.now();
const windowElapsed = now - this.windowStart;
// Reset window if 60 seconds have passed
if (windowElapsed >= 60_000) {
this.tokensUsed = 0;
this.requestsThisMinute = 0;
this.windowStart = now;
}
// Check if we'd exceed limits
const wouldExceedTokens = this.tokensUsed + estimatedTokens > this.maxTokensPerMinute * 0.9; // 90% threshold
const wouldExceedRequests = this.requestsThisMinute >= this.maxRequestsPerMinute * 0.9;
if (wouldExceedTokens || wouldExceedRequests) {
const waitMs = 60_000 - windowElapsed + 100; // Wait until window resets
console.log(`Rate limit budget: waiting ${(waitMs/1000).toFixed(1)}s`);
await new Promise(resolve => setTimeout(resolve, waitMs));
// Reset after waiting
this.tokensUsed = 0;
this.requestsThisMinute = 0;
this.windowStart = Date.now();
}
this.tokensUsed += estimatedTokens;
this.requestsThisMinute++;
}
recordActualUsage(inputTokens: number, outputTokens: number): void {
// Correct the estimate with actual usage
const actual = inputTokens + outputTokens;
this.tokensUsed = this.tokensUsed - 1000 + actual; // Remove estimate, add actual
}
}
// Usage
const budget = new RateLimitBudget(40_000, 2000);
async function rateLimitedCall(messages: any[]) {
await budget.waitIfNeeded(2000); // Estimate 2K tokens
const response = await client.messages.create({
model: "claude-sonnet-4-5",
max_tokens: 1024,
messages,
});
budget.recordActualUsage(
response.usage.input_tokens,
response.usage.output_tokens
);
return response;
}
The Production Error Handling Stack
Putting it all together in a production-ready wrapper:
class ProductionLLMClient:
def __init__(self):
self.anthropic = anthropic.Anthropic()
self.openai = OpenAI()
self.circuit_breakers = {
"anthropic": CircuitBreaker(),
"openai": CircuitBreaker()
}
def complete(
self,
messages: list,
primary: str = "anthropic",
primary_model: str = "claude-sonnet-4-5",
fallback: str = "openai",
fallback_model: str = "gpt-4o",
max_retries: int = 3
) -> dict:
"""Production-grade LLM call with full error handling."""
start_time = time.time()
# Try primary with retries
for attempt in range(max_retries):
try:
response_text = self.circuit_breakers[primary].call(
self._call_provider,
messages, primary, primary_model
)
return {
"text": response_text,
"provider": primary,
"model": primary_model,
"latency": time.time() - start_time
}
except CircuitOpenError:
break # Don't retry if circuit is open
except RateLimitError as e:
if attempt < max_retries - 1:
wait = min(1.0 * (2 ** attempt) + random.uniform(0, 1), 30)
time.sleep(wait)
continue
break
except Exception as e:
if attempt < max_retries - 1:
time.sleep(min(1.0 * (2 ** attempt), 10))
continue
break
# Try fallback
print(f"{primary} exhausted, trying {fallback}")
try:
response_text = self.circuit_breakers[fallback].call(
self._call_provider,
messages, fallback, fallback_model
)
return {
"text": response_text,
"provider": fallback,
"model": fallback_model,
"latency": time.time() - start_time,
"used_fallback": True
}
except Exception as e:
raise RuntimeError(f"All providers failed: {e}")
Monitoring and Alerting
Set alerts on these metrics:
| Metric | Warning Threshold | Critical Threshold |
| 429 rate (per 5min) | >5% | >20% |
| Fallback usage rate | >10% | >30% |
| Circuit breaker opens | >0 | >3 |
| P99 latency | >10s | >30s |
| Timeout rate | >2% | >10% |
Any time fallback usage exceeds 10%, investigate whether your primary provider has a service degradation before your users notice.