AI for Developers26 of 30 steps (87%)
Now that you have explored tools for building full-stack AI apps, this final section covers shipping AI features to production.

Production AI: Reliability and Cost Management

Why Production Is Different

In development, you can tolerate failures. An LLM times out, you retry manually. In production, LLM calls are made by thousands of users. Every failure degrades experience. Every failed call wastes money. This tutorial covers patterns to make production AI systems reliable and cost-effective.

Pattern 1: Retries with Exponential Backoff

LLM APIs fail. Network issues, rate limits, temporary server errors. A well-designed system retries intelligently.

Why Exponential Backoff

If you retry immediately, you might hit the same overloaded server. If you wait the same time every retry, you give the server no time to recover. Exponential backoff waits longer each time, allowing the server to stabilize.

Implementation

import time
import random
from openai import OpenAI

def call_llm_with_retry(
    client,
    model: str,
    prompt: str,
    max_retries: int = 3,
    base_delay: float = 1.0
) -> str:
    """
    Call LLM with exponential backoff retry logic.
    """
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}]
            )
            return response.choices[0].message.content
        except Exception as e:
            # Don't retry on validation errors (bad input)
            if "validation" in str(e).lower():
                raise
            
            if attempt == max_retries - 1:
                raise  # Last attempt, give up
            
            # Calculate delay: 1s, 2s, 4s, 8s
            delay = base_delay * (2 ** attempt)
            # Add jitter to prevent thundering herd
            delay += random.uniform(0, 1)
            
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

client = OpenAI()
result = call_llm_with_retry(client, "gpt-4", "Write a poem")

Key Points

  • First retry after 1 second
  • Second retry after 2 seconds
  • Third retry after 4 seconds
  • Add random jitter (0-1 second) to prevent multiple clients retrying at the same time
  • Don't retry validation errors (bad input won't change)

Pattern 2: Timeouts

Some requests hang indefinitely. A timeout kills the request and lets you fail gracefully or retry.

Implementation

import asyncio
from openai import AsyncOpenAI

async def call_llm_with_timeout(
    client,
    model: str,
    prompt: str,
    timeout_seconds: float = 30.0
) -> str:
    """
    Call LLM with a timeout.
    """
    try:
        response = await asyncio.wait_for(
            client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}]
            ),
            timeout=timeout_seconds
        )
        return response.choices[0].message.content
    except asyncio.TimeoutError:
        print(f"Request timed out after {timeout_seconds} seconds")
        raise

# Usage
client = AsyncOpenAI()
result = await call_llm_with_timeout(client, "gpt-4", "Write a poem", timeout_seconds=30)

Guidelines

  • Set timeouts based on expected latency + buffer
  • For coding tasks: 30-60 seconds
  • For chat: 10-30 seconds
  • Always have a fallback when timeout occurs

Pattern 3: Fallback Models

Your primary model (GPT-4) is expensive or unavailable. Fall back to a cheaper or faster alternative.

Implementation

import time
from openai import OpenAI

def call_llm_with_fallback(
    client,
    prompt: str,
    primary_model: str = "gpt-4",
    fallback_model: str = "gpt-3.5-turbo",
    max_retries: int = 2
) -> tuple[str, str]:
    """
    Try primary model. If it fails, fall back to cheaper model.
    Returns (response, model_used)
    """
    models_to_try = [primary_model, fallback_model]
    
    for model in models_to_try:
        for attempt in range(max_retries):
            try:
                response = client.chat.completions.create(
                    model=model,
                    messages=[{"role": "user", "content": prompt}],
                    timeout=30
                )
                return response.choices[0].message.content, model
            except Exception as e:
                if attempt == max_retries - 1:
                    # This model failed, try next
                    print(f"{model} failed: {e}. Trying next model...")
                    break
                time.sleep(2 ** attempt)
    
    raise Exception("All models failed")

client = OpenAI()
result, model_used = call_llm_with_fallback(client, "Write a poem")
print(f"Used model: {model_used}")

Fallback Chains

  • Primary: GPT-4 (smartest)
  • Secondary: GPT-3.5-turbo (cheaper, fast)
  • Tertiary: Local Mistral (free, but slower)

Pattern 4: Circuit Breakers

If a service is failing repeatedly, stop calling it. Wait before retrying.

Implementation

from enum import Enum
from datetime import datetime, timedelta

class CircuitState(Enum):
    CLOSED = "closed"  # Normal operation
    OPEN = "open"     # Failing, reject calls
    HALF_OPEN = "half_open"  # Testing if recovered

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout_seconds: int = 60):
        self.failure_threshold = failure_threshold
        self.timeout_seconds = timeout_seconds
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if datetime.now() > self.last_failure_time + timedelta(seconds=self.timeout_seconds):
                self.state = CircuitState.HALF_OPEN
                self.failure_count = 0
            else:
                raise Exception("Circuit breaker is open. Service unavailable.")
        
        try:
            result = func(*args, **kwargs)
            self.failure_count = 0
            self.state = CircuitState.CLOSED
            return result
        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = datetime.now()
            
            if self.failure_count >= self.failure_threshold:
                self.state = CircuitState.OPEN
            
            raise

# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout_seconds=60)

try:
    result = breaker.call(call_llm, "Write a poem")
except Exception as e:
    print(f"Request failed: {e}")

How It Works

  1. Closed: Normal operation. All requests go through.
  2. Open: Service failing. Requests are immediately rejected to prevent cascading failures.
  3. Half-Open: After timeout, try one request to see if service recovered.
  4. Closed again: If half-open request succeeds, resume normal operation.

Pattern 5: Idempotency

If you retry a request, the same request might be processed twice. Make your operations idempotent so duplicates are harmless.

Implementation

from uuid import uuid4
import hashlib

def generate_idempotency_key(user_id: str, request_data: str) -> str:
    """
    Generate a unique key for this request.
    Same user + same data = same key.
    """
    content = f"{user_id}:{request_data}"
    return hashlib.sha256(content.encode()).hexdigest()

def call_llm_with_idempotency(
    client,
    user_id: str,
    prompt: str,
    request_cache: dict
) -> str:
    """
    Call LLM with idempotency. Cached responses are reused.
    """
    key = generate_idempotency_key(user_id, prompt)
    
    # Check cache first
    if key in request_cache:
        print(f"Cache hit for key {key}")
        return request_cache[key]
    
    # Call LLM
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": prompt}]
    )
    result = response.choices[0].message.content
    
    # Cache for future retries
    request_cache[key] = result
    return result

cache = {}
result = call_llm_with_idempotency(client, "user_123", "Write a poem", cache)

Why Idempotency Matters

If a network failure occurs after the LLM responds but before you receive it, you'll retry. Without idempotency, the request is processed twice. With idempotency, you get the cached response and pay nothing.

Pattern 6: Token Budgeting

Tokens cost money. Track token usage and set budgets.

Implementation

class TokenBudget:
    def __init__(self, monthly_budget: int = 1_000_000):
        self.monthly_budget = monthly_budget
        self.tokens_used = 0
    
    def check_budget(self, prompt_tokens: int, max_output_tokens: int) -> bool:
        """
        Check if we have budget for this request.
        """
        estimated_total = prompt_tokens + max_output_tokens
        if self.tokens_used + estimated_total > self.monthly_budget:
            return False
        return True
    
    def record_usage(self, prompt_tokens: int, completion_tokens: int):
        self.tokens_used += prompt_tokens + completion_tokens
        usage_percent = (self.tokens_used / self.monthly_budget) * 100
        print(f"Token usage: {usage_percent:.1f}%")
        
        if usage_percent > 80:
            print("WARNING: Approaching token budget limit")

budget = TokenBudget(monthly_budget=1_000_000)

if budget.check_budget(prompt_tokens=100, max_output_tokens=500):
    response = client.chat.completions.create(
        model="gpt-4",
        messages=[{"role": "user", "content": "Write a poem"}],
        max_tokens=500
    )
    budget.record_usage(
        prompt_tokens=response.usage.prompt_tokens,
        completion_tokens=response.usage.completion_tokens
    )
else:
    print("Token budget exceeded")

Pattern 7: Prompt Trimming

Long prompts use more tokens. Trim unnecessary context.

Implementation

def trim_context(context: str, max_tokens: int = 2000) -> str:
    """
    Trim context to fit within token limit.
    Keeps most recent content first.
    """
    # Rough estimate: 1 token ≈ 4 characters
    max_chars = max_tokens * 4
    
    if len(context) <= max_chars:
        return context
    
    # Keep the last max_chars characters (most relevant)
    trimmed = context[-max_chars:]
    
    # Add ellipsis to indicate truncation
    return "[...previous context truncated...]\n" + trimmed

# Usage
full_context = "A very long context string..."
trimmed = trim_context(full_context, max_tokens=2000)
response = client.chat.completions.create(
    model="gpt-4",
    messages=[{"role": "user", "content": f"Context:\n{trimmed}\n\nQuestion: ..."}]
)

Pattern 8: Caching LLM Responses

Identical requests produce identical responses. Cache them.

Implementation

import json
from functools import lru_cache

@lru_cache(maxsize=1000)
def cached_llm_call(model: str, prompt: str) -> str:
    """
    Call LLM with caching. Python's lru_cache handles deduplication.
    """
    response = client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}]
    )
    return response.choices[0].message.content

# First call: hits API
result1 = cached_llm_call("gpt-4", "What is Python?")

# Second call: returns cached result (no API call)
result2 = cached_llm_call("gpt-4", "What is Python?")

When to Cache

  • Common questions that produce stable answers
  • Reference data (company info, product specs)
  • Not for personalized or time-sensitive queries

Pattern 9: Model Tiering

Use the cheapest model that solves the problem.

Implementation

def select_model_for_task(task_complexity: str) -> str:
    """
    Route to appropriate model based on task complexity.
    """
    if task_complexity == "simple":
        # Spell check, grammar, classification
        return "gpt-3.5-turbo"
    elif task_complexity == "medium":
        # Code generation, summarization
        return "gpt-4-turbo"
    else:
        # Complex reasoning, advanced coding
        return "gpt-4"

# Usage
task = "Check if this SQL query is valid"
model = select_model_for_task("simple")
response = client.chat.completions.create(
    model=model,
    messages=[{"role": "user", "content": task}]
)

Pattern 10: Batching

Process multiple requests together to reduce overhead.

Implementation

def batch_process(items: list[str], batch_size: int = 10) -> list[str]:
    """
    Process items in batches to reduce API calls and costs.
    """
    results = []
    
    for i in range(0, len(items), batch_size):
        batch = items[i:i + batch_size]
        
        # Combine batch into single prompt
        prompt = f"Process these {len(batch)} items:\n"
        for item in batch:
            prompt += f"- {item}\n"
        
        response = client.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}]
        )
        
        # Parse response to extract individual results
        batch_results = response.choices[0].message.content.split("\n")
        results.extend(batch_results)
    
    return results

# Usage
items = [f"Item {i}" for i in range(100)]
results = batch_process(items, batch_size=10)

Monitoring and Alerting

Track These Metrics

  • Request latency: How long requests take
  • Error rate: Percentage of failed requests
  • Token usage: Daily and monthly trends
  • Cost: Actual spending vs. budget
  • Cache hit rate: Percentage of cached responses

Example Monitoring

from collections import defaultdict
from datetime import datetime

class MetricsCollector:
    def __init__(self):
        self.requests = []
        self.errors = 0
        self.total_tokens = 0
    
    def record_request(self, latency_ms: float, tokens: int, success: bool):
        self.requests.append({
            "timestamp": datetime.now(),
            "latency": latency_ms,
            "tokens": tokens,
            "success": success
        })
        self.total_tokens += tokens
        if not success:
            self.errors += 1
    
    def error_rate(self) -> float:
        if not self.requests:
            return 0
        failed = sum(1 for r in self.requests if not r["success"])
        return (failed / len(self.requests)) * 100
    
    def avg_latency(self) -> float:
        if not self.requests:
            return 0
        latencies = [r["latency"] for r in self.requests]
        return sum(latencies) / len(latencies)

metrics = MetricsCollector()
metrics.record_request(latency_ms=150, tokens=250, success=True)
print(f"Error rate: {metrics.error_rate()}%")
print(f"Avg latency: {metrics.avg_latency()}ms")

Summary

Production AI systems need:

  1. Retries with exponential backoff
  2. Timeouts to prevent hanging
  3. Fallback models for resilience
  4. Circuit breakers to prevent cascading failures
  5. Idempotency for safe retries
  6. Token budgeting and trimming
  7. Caching to reduce costs
  8. Model tiering for cost optimization
  9. Batching to reduce overhead
  10. Monitoring and alerting for visibility

Implement these patterns and your production AI systems will be reliable, cost-effective, and maintainable.

Discussion

  • Loading…

← Back to learning path