Production AI: Reliability and Cost Management
Why Production Is Different
In development, you can tolerate failures. An LLM times out, you retry manually. In production, LLM calls are made by thousands of users. Every failure degrades experience. Every failed call wastes money. This tutorial covers patterns to make production AI systems reliable and cost-effective.
Pattern 1: Retries with Exponential Backoff
LLM APIs fail. Network issues, rate limits, temporary server errors. A well-designed system retries intelligently.
Why Exponential Backoff
If you retry immediately, you might hit the same overloaded server. If you wait the same time every retry, you give the server no time to recover. Exponential backoff waits longer each time, allowing the server to stabilize.
Implementation
import time
import random
from openai import OpenAI
def call_llm_with_retry(
client,
model: str,
prompt: str,
max_retries: int = 3,
base_delay: float = 1.0
) -> str:
"""
Call LLM with exponential backoff retry logic.
"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
except Exception as e:
# Don't retry on validation errors (bad input)
if "validation" in str(e).lower():
raise
if attempt == max_retries - 1:
raise # Last attempt, give up
# Calculate delay: 1s, 2s, 4s, 8s
delay = base_delay * (2 ** attempt)
# Add jitter to prevent thundering herd
delay += random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)
client = OpenAI()
result = call_llm_with_retry(client, "gpt-4", "Write a poem")
Key Points
- First retry after 1 second
- Second retry after 2 seconds
- Third retry after 4 seconds
- Add random jitter (0-1 second) to prevent multiple clients retrying at the same time
- Don't retry validation errors (bad input won't change)
Pattern 2: Timeouts
Some requests hang indefinitely. A timeout kills the request and lets you fail gracefully or retry.
Implementation
import asyncio
from openai import AsyncOpenAI
async def call_llm_with_timeout(
client,
model: str,
prompt: str,
timeout_seconds: float = 30.0
) -> str:
"""
Call LLM with a timeout.
"""
try:
response = await asyncio.wait_for(
client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
),
timeout=timeout_seconds
)
return response.choices[0].message.content
except asyncio.TimeoutError:
print(f"Request timed out after {timeout_seconds} seconds")
raise
# Usage
client = AsyncOpenAI()
result = await call_llm_with_timeout(client, "gpt-4", "Write a poem", timeout_seconds=30)
Guidelines
- Set timeouts based on expected latency + buffer
- For coding tasks: 30-60 seconds
- For chat: 10-30 seconds
- Always have a fallback when timeout occurs
Pattern 3: Fallback Models
Your primary model (GPT-4) is expensive or unavailable. Fall back to a cheaper or faster alternative.
Implementation
import time
from openai import OpenAI
def call_llm_with_fallback(
client,
prompt: str,
primary_model: str = "gpt-4",
fallback_model: str = "gpt-3.5-turbo",
max_retries: int = 2
) -> tuple[str, str]:
"""
Try primary model. If it fails, fall back to cheaper model.
Returns (response, model_used)
"""
models_to_try = [primary_model, fallback_model]
for model in models_to_try:
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
timeout=30
)
return response.choices[0].message.content, model
except Exception as e:
if attempt == max_retries - 1:
# This model failed, try next
print(f"{model} failed: {e}. Trying next model...")
break
time.sleep(2 ** attempt)
raise Exception("All models failed")
client = OpenAI()
result, model_used = call_llm_with_fallback(client, "Write a poem")
print(f"Used model: {model_used}")
Fallback Chains
- Primary: GPT-4 (smartest)
- Secondary: GPT-3.5-turbo (cheaper, fast)
- Tertiary: Local Mistral (free, but slower)
Pattern 4: Circuit Breakers
If a service is failing repeatedly, stop calling it. Wait before retrying.
Implementation
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, reject calls
HALF_OPEN = "half_open" # Testing if recovered
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
self.failure_threshold = failure_threshold
self.timeout_seconds = timeout_seconds
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if datetime.now() > self.last_failure_time + timedelta(seconds=self.timeout_seconds):
self.state = CircuitState.HALF_OPEN
self.failure_count = 0
else:
raise Exception("Circuit breaker is open. Service unavailable.")
try:
result = func(*args, **kwargs)
self.failure_count = 0
self.state = CircuitState.CLOSED
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
raise
# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout_seconds=60)
try:
result = breaker.call(call_llm, "Write a poem")
except Exception as e:
print(f"Request failed: {e}")
How It Works
- Closed: Normal operation. All requests go through.
- Open: Service failing. Requests are immediately rejected to prevent cascading failures.
- Half-Open: After timeout, try one request to see if service recovered.
- Closed again: If half-open request succeeds, resume normal operation.
Pattern 5: Idempotency
If you retry a request, the same request might be processed twice. Make your operations idempotent so duplicates are harmless.
Implementation
from uuid import uuid4
import hashlib
def generate_idempotency_key(user_id: str, request_data: str) -> str:
"""
Generate a unique key for this request.
Same user + same data = same key.
"""
content = f"{user_id}:{request_data}"
return hashlib.sha256(content.encode()).hexdigest()
def call_llm_with_idempotency(
client,
user_id: str,
prompt: str,
request_cache: dict
) -> str:
"""
Call LLM with idempotency. Cached responses are reused.
"""
key = generate_idempotency_key(user_id, prompt)
# Check cache first
if key in request_cache:
print(f"Cache hit for key {key}")
return request_cache[key]
# Call LLM
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
result = response.choices[0].message.content
# Cache for future retries
request_cache[key] = result
return result
cache = {}
result = call_llm_with_idempotency(client, "user_123", "Write a poem", cache)
Why Idempotency Matters
If a network failure occurs after the LLM responds but before you receive it, you'll retry. Without idempotency, the request is processed twice. With idempotency, you get the cached response and pay nothing.
Pattern 6: Token Budgeting
Tokens cost money. Track token usage and set budgets.
Implementation
class TokenBudget:
def __init__(self, monthly_budget: int = 1_000_000):
self.monthly_budget = monthly_budget
self.tokens_used = 0
def check_budget(self, prompt_tokens: int, max_output_tokens: int) -> bool:
"""
Check if we have budget for this request.
"""
estimated_total = prompt_tokens + max_output_tokens
if self.tokens_used + estimated_total > self.monthly_budget:
return False
return True
def record_usage(self, prompt_tokens: int, completion_tokens: int):
self.tokens_used += prompt_tokens + completion_tokens
usage_percent = (self.tokens_used / self.monthly_budget) * 100
print(f"Token usage: {usage_percent:.1f}%")
if usage_percent > 80:
print("WARNING: Approaching token budget limit")
budget = TokenBudget(monthly_budget=1_000_000)
if budget.check_budget(prompt_tokens=100, max_output_tokens=500):
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Write a poem"}],
max_tokens=500
)
budget.record_usage(
prompt_tokens=response.usage.prompt_tokens,
completion_tokens=response.usage.completion_tokens
)
else:
print("Token budget exceeded")
Pattern 7: Prompt Trimming
Long prompts use more tokens. Trim unnecessary context.
Implementation
def trim_context(context: str, max_tokens: int = 2000) -> str:
"""
Trim context to fit within token limit.
Keeps most recent content first.
"""
# Rough estimate: 1 token ≈ 4 characters
max_chars = max_tokens * 4
if len(context) <= max_chars:
return context
# Keep the last max_chars characters (most relevant)
trimmed = context[-max_chars:]
# Add ellipsis to indicate truncation
return "[...previous context truncated...]\n" + trimmed
# Usage
full_context = "A very long context string..."
trimmed = trim_context(full_context, max_tokens=2000)
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": f"Context:\n{trimmed}\n\nQuestion: ..."}]
)
Pattern 8: Caching LLM Responses
Identical requests produce identical responses. Cache them.
Implementation
import json
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_llm_call(model: str, prompt: str) -> str:
"""
Call LLM with caching. Python's lru_cache handles deduplication.
"""
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}]
)
return response.choices[0].message.content
# First call: hits API
result1 = cached_llm_call("gpt-4", "What is Python?")
# Second call: returns cached result (no API call)
result2 = cached_llm_call("gpt-4", "What is Python?")
When to Cache
- Common questions that produce stable answers
- Reference data (company info, product specs)
- Not for personalized or time-sensitive queries
Pattern 9: Model Tiering
Use the cheapest model that solves the problem.
Implementation
def select_model_for_task(task_complexity: str) -> str:
"""
Route to appropriate model based on task complexity.
"""
if task_complexity == "simple":
# Spell check, grammar, classification
return "gpt-3.5-turbo"
elif task_complexity == "medium":
# Code generation, summarization
return "gpt-4-turbo"
else:
# Complex reasoning, advanced coding
return "gpt-4"
# Usage
task = "Check if this SQL query is valid"
model = select_model_for_task("simple")
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": task}]
)
Pattern 10: Batching
Process multiple requests together to reduce overhead.
Implementation
def batch_process(items: list[str], batch_size: int = 10) -> list[str]:
"""
Process items in batches to reduce API calls and costs.
"""
results = []
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
# Combine batch into single prompt
prompt = f"Process these {len(batch)} items:\n"
for item in batch:
prompt += f"- {item}\n"
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
# Parse response to extract individual results
batch_results = response.choices[0].message.content.split("\n")
results.extend(batch_results)
return results
# Usage
items = [f"Item {i}" for i in range(100)]
results = batch_process(items, batch_size=10)
Monitoring and Alerting
Track These Metrics
- Request latency: How long requests take
- Error rate: Percentage of failed requests
- Token usage: Daily and monthly trends
- Cost: Actual spending vs. budget
- Cache hit rate: Percentage of cached responses
Example Monitoring
from collections import defaultdict
from datetime import datetime
class MetricsCollector:
def __init__(self):
self.requests = []
self.errors = 0
self.total_tokens = 0
def record_request(self, latency_ms: float, tokens: int, success: bool):
self.requests.append({
"timestamp": datetime.now(),
"latency": latency_ms,
"tokens": tokens,
"success": success
})
self.total_tokens += tokens
if not success:
self.errors += 1
def error_rate(self) -> float:
if not self.requests:
return 0
failed = sum(1 for r in self.requests if not r["success"])
return (failed / len(self.requests)) * 100
def avg_latency(self) -> float:
if not self.requests:
return 0
latencies = [r["latency"] for r in self.requests]
return sum(latencies) / len(latencies)
metrics = MetricsCollector()
metrics.record_request(latency_ms=150, tokens=250, success=True)
print(f"Error rate: {metrics.error_rate()}%")
print(f"Avg latency: {metrics.avg_latency()}ms")
Summary
Production AI systems need:
- Retries with exponential backoff
- Timeouts to prevent hanging
- Fallback models for resilience
- Circuit breakers to prevent cascading failures
- Idempotency for safe retries
- Token budgeting and trimming
- Caching to reduce costs
- Model tiering for cost optimization
- Batching to reduce overhead
- Monitoring and alerting for visibility
Implement these patterns and your production AI systems will be reliable, cost-effective, and maintainable.
Discussion
Sign in to comment. Your account must be at least 1 day old.