LLM Applications in Production 2026: RAG Optimization, Prompt Caching, Streaming, and Cost Control
LLM Applications in Production 2026: RAG Optimization, Prompt Caching, Streaming, and Cost Control

Introduction
Between 2024 and 2026, LLM APIs crossed the threshold from "impressive demo" to "core infrastructure." The companies that shipped fast in 2023 learned the hard way what production LLM systems actually demand: latency that doesn't embarrass you, costs that don't crater your margin, context windows that stay coherent across long sessions, and reliability that survives token storms, provider outages, and malformed outputs.
The tooling matured fast. Anthropic Claude introduced prompt caching. OpenAI rolled out automatic prefix caching and the Batch API. Vector databases became commodity infrastructure. Cross-encoder re-ranking went from research paper to pip install. And yet most teams are still leaving significant performance and cost on the table because they never moved beyond the basic client.messages.create() call they copy-pasted from the quickstart.
This post covers the six engineering patterns that separate working LLM demos from production-grade LLM applications: RAG architecture optimization, prompt caching and cost control, streaming responses, context window management, reliability and evaluation, and production architecture patterns. Each section includes complete, runnable Python code with comments explaining the cost and latency impact of every decision.
The numbers matter here. At $3.00 per million input tokens and $15.00 per million output tokens (Claude Sonnet 3.5 pricing), a system processing 100,000 queries per day with an average of 2,000 input tokens and 500 output tokens spends $600/day on input and $750/day on output — $1,350/day, $490,500/year. A 40% cache hit rate on system prompts cuts that by $240/day. Hybrid search that eliminates 30% of irrelevant retrieved chunks saves another $90/day. These aren't rounding errors. They're the difference between a profitable product and one that burns cash.
1. RAG Architecture Optimization
Retrieval-Augmented Generation became the default architecture for knowledge-intensive LLM applications. The basic pattern — embed a query, find similar document chunks, stuff them in the prompt — works well enough to ship a demo. Production requires every layer of that pipeline to be deliberate.
Chunking Strategy
Chunk size is the most consequential decision in a RAG pipeline, and most teams get it wrong by picking a fixed size arbitrarily. Fixed-size chunking (e.g., 512 tokens, 50-token overlap) is fast and predictable but routinely splits semantically complete units — a sentence, a code block, a numbered list item — across chunk boundaries. The retrieved chunk is coherent in isolation but loses meaning.
Semantic chunking uses embedding similarity to find natural breakpoints: when the embedding distance between consecutive sentences exceeds a threshold, start a new chunk. This produces variable-length chunks that respect document structure. The tradeoff is 3-5x slower indexing — acceptable for offline ingestion, problematic for real-time document addition.
Sentence-window chunking is a practical middle ground: index at the sentence level for precision retrieval, then expand each hit to a ±3 sentence window before passing to the LLM. The small index unit gives you high-precision retrieval; the expanded context gives the LLM enough surrounding text to answer correctly. This approach consistently outperforms both fixed and semantic chunking on question-answering benchmarks at reasonable indexing cost.
Embedding Model Selection
OpenAI's text-embedding-3-large (3072 dimensions, ~$0.13/million tokens) remains the default for teams that want strong out-of-the-box performance without operational overhead. For high-volume applications, local models eliminate per-query cost entirely. BGE-M3 from BAAI supports 8192-token input, produces 1024-dimensional embeddings, and runs comfortably on a single A10G GPU — at $0.80/hour on major cloud providers, break-even versus OpenAI's API is roughly 6 million tokens/month.
Nomic Embed v2 is a strong alternative with a permissive Apache 2.0 license, Matryoshka representation learning (you can truncate to 256 dimensions without significant accuracy loss), and competitive MTEB benchmark scores. For multilingual applications, mE5-large or multilingual-E5-large outperform most alternatives without requiring separate models per language.
Always evaluate embedding models on your own documents and queries, not just MTEB benchmarks. Domain shift is real — a model trained on web text may underperform on medical records or legal documents regardless of its aggregate benchmark score.
Hybrid Search and Re-Ranking
Dense vector search alone misses exact keyword matches. BM25 keyword search alone misses semantic variations. Hybrid search combines both, and Reciprocal Rank Fusion (RRF) merges the ranked lists without requiring score normalization:
import httpx
from rank_bm25 import BM25Okapi
import numpy as np
from sentence_transformers import CrossEncoder
from typing import List, Dict, Any
# Reciprocal Rank Fusion — combines dense and sparse rankings
# k=60 is standard; higher k reduces the impact of top-ranked docs
def reciprocal_rank_fusion(
dense_results: List[Dict],
sparse_results: List[Dict],
k: int = 60
) -> List[Dict]:
"""
Merge two ranked lists using RRF.
Cost impact: zero — pure CPU, no API calls.
Latency: ~1ms for lists up to 1000 items.
"""
scores: Dict[str, float] = {}
doc_map: Dict[str, Dict] = {}
for rank, doc in enumerate(dense_results):
doc_id = doc["id"]
scores[doc_id] = scores.get(doc_id, 0) + 1 / (rank + k)
doc_map[doc_id] = doc
for rank, doc in enumerate(sparse_results):
doc_id = doc["id"]
scores[doc_id] = scores.get(doc_id, 0) + 1 / (rank + k)
doc_map[doc_id] = doc
ranked = sorted(scores.items(), key=lambda x: x[1], reverse=True)
return [doc_map[doc_id] for doc_id, _ in ranked]
# Cross-encoder re-ranking — the most impactful single improvement to RAG quality
# Cross-encoders score (query, document) pairs jointly, not independently
# ms-marco-MiniLM-L-6-v2: 22M params, ~4ms/pair on CPU, excellent for top-20 re-ranking
reranker = CrossEncoder("cross-encoder/ms-marco-MiniLM-L-6-v2")
def rerank_chunks(
query: str,
candidates: List[Dict],
top_k: int = 5
) -> List[Dict]:
"""
Re-rank retrieved chunks with a cross-encoder.
Cost: ~40ms CPU for 20 candidates — worth it, dramatically improves recall@5.
Run this AFTER hybrid search narrows to top 20; don't run on 100+ candidates.
"""
pairs = [(query, doc["text"]) for doc in candidates]
scores = reranker.predict(pairs)
ranked = sorted(
zip(candidates, scores),
key=lambda x: x[1],
reverse=True
)
return [doc for doc, _ in ranked[:top_k]]
def build_rag_pipeline(vector_store, bm25_index: BM25Okapi, documents: List[Dict]):
"""
Complete RAG pipeline: hybrid search + re-rank + metadata filter.
"""
def retrieve(
query: str,
query_embedding: List[float],
metadata_filter: Dict = None,
dense_top_k: int = 20,
sparse_top_k: int = 20,
final_top_k: int = 5
) -> List[Dict]:
# Metadata filter before vector search — eliminates irrelevant results
# before spending compute on embedding comparison
# Cost impact: reduces tokens sent to LLM by 20-40% in typical deployments
filter_kwargs = {}
if metadata_filter:
filter_kwargs["filter"] = metadata_filter
# Dense retrieval
dense_results = vector_store.similarity_search_by_vector(
query_embedding,
k=dense_top_k,
**filter_kwargs
)
# Sparse retrieval (BM25 operates on tokenized text)
tokenized_query = query.lower().split()
bm25_scores = bm25_index.get_scores(tokenized_query)
top_sparse_idx = np.argsort(bm25_scores)[-sparse_top_k:][::-1]
sparse_results = [documents[i] for i in top_sparse_idx if bm25_scores[i] > 0]
# Merge with RRF
merged = reciprocal_rank_fusion(dense_results, sparse_results)
# Re-rank top candidates with cross-encoder
# Only re-rank top 20 to keep latency under 100ms
reranked = rerank_chunks(query, merged[:20], top_k=final_top_k)
return reranked
return retrieve
Context compression with LLMLingua reduces retrieved chunk token count by 40-60% with minimal accuracy loss by removing low-perplexity tokens from retrieved documents. At $0.003/1K input tokens, compressing 2,000 tokens of retrieved context to 1,200 tokens saves $0.0024 per query — $2,400/day at 1 million daily queries.

2. Prompt Caching and Cost Control
Prompt caching is the highest-leverage cost optimization available in 2026. Anthropic charges $0.30/MTok for cached input reads on Claude Sonnet 3.5, versus $3.00/MTok for uncached — a 90% discount. OpenAI's automatic prefix caching gives a 50% discount on prompt prefixes longer than 1,024 tokens without requiring any code change.
Anthropic Prompt Caching
The key insight is to structure prompts so stable content (system instructions, reference documents, few-shot examples) comes first, and dynamic content (the user's query, conversation history) comes last. Anthropic caches the stable prefix; you pay full price only for the dynamic suffix.
import anthropic
from typing import List, Dict, Optional
client = anthropic.Anthropic()
# System prompt with cache_control — mark stable content for caching
# Minimum cacheable size: 1,024 tokens for Haiku/Sonnet, 2,048 for Opus
# Cache TTL: 5 minutes default, 1 hour with "ephemeral" type
# Cost: $3.75/MTok to CREATE a cache entry, $0.30/MTok to READ it
# Break-even: cache creation cost recovered after 8 reads of the same content
SYSTEM_PROMPT = """You are an expert software engineer assistant specializing in
distributed systems, LLM applications, and production infrastructure. You provide
precise, actionable technical guidance with working code examples.
When answering questions:
- Lead with the direct answer, then explain the reasoning
- Include complete code examples, not snippets
- Call out cost and latency implications explicitly
- Flag common production pitfalls
Your knowledge base includes the following reference documentation:
[... large stable reference document, 2000+ tokens ...]
""" # In practice, load from file; must exceed 1024 tokens for caching
def chat_with_caching(
user_message: str,
conversation_history: List[Dict],
retrieved_context: Optional[str] = None
) -> anthropic.types.Message:
"""
Structured for maximum cache hits:
1. System prompt (stable, cached) — 90% discount on reads
2. Retrieved context (semi-stable, can cache if same docs reused)
3. Conversation history (dynamic, NOT cached)
4. Current user message (dynamic, NOT cached)
"""
# Build messages: stable context first, dynamic last
messages = []
# Retrieved context as a cacheable user turn if it's the same document set
# This is valuable when many queries hit the same knowledge base pages
if retrieved_context:
messages.append({
"role": "user",
"content": [
{
"type": "text",
"text": f"Reference context for this conversation:\n\n{retrieved_context}",
# Cache this if the same context appears in multiple turns
"cache_control": {"type": "ephemeral"}
}
]
})
messages.append({
"role": "assistant",
"content": "Understood. I'll use this context to answer your questions."
})
# Dynamic conversation history (no cache — changes every turn)
messages.extend(conversation_history)
# Current user message (always dynamic)
messages.append({"role": "user", "content": user_message})
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
system=[
{
"type": "text",
"text": SYSTEM_PROMPT,
# Cache the system prompt — this is the highest-value cache target
# At 2000 tokens, 1000 req/day: saves ~$5/day vs uncached
"cache_control": {"type": "ephemeral"}
}
],
messages=messages
)
# Log cache performance — track hit rate to validate your cache strategy
usage = response.usage
cache_read_tokens = getattr(usage, 'cache_read_input_tokens', 0)
cache_create_tokens = getattr(usage, 'cache_creation_input_tokens', 0)
uncached_tokens = usage.input_tokens - cache_read_tokens - cache_create_tokens
# Cost calculation for observability
cost_uncached = uncached_tokens * 3.00 / 1_000_000
cost_cached_reads = cache_read_tokens * 0.30 / 1_000_000
cost_cache_creation = cache_create_tokens * 3.75 / 1_000_000
cost_output = usage.output_tokens * 15.00 / 1_000_000
total_cost = cost_uncached + cost_cached_reads + cost_cache_creation + cost_output
print(f"Cache stats: {cache_read_tokens} read / {cache_create_tokens} created / "
f"{uncached_tokens} uncached | Cost: ${total_cost:.5f}")
return response
# OpenAI automatic prefix caching — no code changes required
# Caching activates automatically on prompts > 1024 tokens
# 50% discount on cached prefix tokens
# Structure: long stable system prompt first, dynamic content last
from openai import AsyncOpenAI
import asyncio
openai_client = AsyncOpenAI()
async def openai_cached_completion(
user_message: str,
conversation_history: List[Dict]
) -> dict:
"""
OpenAI prefix caching is automatic — just ensure the stable prefix
is long (>1024 tokens) and consistent across requests.
Discount: 50% off cached input tokens ($0.0015 vs $0.003 per 1K for GPT-4o-mini)
"""
# The system message must be identical across requests for cache hits
# Even a single token difference creates a new cache entry
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
# Stable few-shot examples go here — they'll be cached
# Dynamic history and user message go last
*conversation_history,
{"role": "user", "content": user_message}
]
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
max_tokens=1024
)
# Check cache hit in usage stats
usage = response.usage
if hasattr(usage, 'prompt_tokens_details'):
cached = usage.prompt_tokens_details.cached_tokens
print(f"OpenAI cache hit: {cached} tokens cached "
f"(saved ${cached * 0.0015 / 1000:.5f})")
return response
Model Routing
A complexity classifier routes simple queries (factual lookups, short answers) to cheap models (GPT-4o-mini at $0.15/MTok input) and complex queries (multi-step reasoning, code generation) to expensive models (Claude Sonnet at $3.00/MTok input). This alone typically cuts LLM spend by 35-50% in mixed-complexity workloads.
async def classify_query_complexity(query: str) -> str:
"""
Cheap classifier — use the fast model to decide which model to use.
GPT-4o-mini at $0.15/MTok is 20x cheaper than Claude Sonnet.
Cost of classification: ~200 tokens = $0.00003. Worth it above ~500 queries/day.
"""
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": (
"Classify query complexity as SIMPLE or COMPLEX.\n"
"SIMPLE: factual lookup, yes/no, short definition, basic how-to\n"
"COMPLEX: multi-step reasoning, code generation, architectural design, "
"synthesis across multiple sources\n"
"Respond with only the word SIMPLE or COMPLEX."
)
},
{"role": "user", "content": query}
],
max_tokens=5
)
return response.choices[0].message.content.strip()
async def routed_completion(query: str, conversation_history: List[Dict]) -> str:
"""Route to cheap or expensive model based on query complexity."""
complexity = await classify_query_complexity(query)
if complexity == "SIMPLE":
# GPT-4o-mini: $0.15/MTok input, $0.60/MTok output
response = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[*conversation_history, {"role": "user", "content": query}],
max_tokens=512
)
return response.choices[0].message.content
else:
# Claude Sonnet: $3.00/MTok input, $15.00/MTok output
# Use for complex reasoning — the quality gap justifies the cost
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
messages=[*conversation_history, {"role": "user", "content": query}]
)
return response.content[0].text
3. Streaming Responses
Streaming is not a nice-to-have — it is a core reliability and UX pattern for any LLM application with a human in the loop. The reason is simple: users perceive a system that shows the first word in 300ms and streams the rest over 4 seconds as dramatically faster than one that returns the complete answer after 4.3 seconds, even though the total generation time is the same. The metric that matters for perceived responsiveness is Time to First Token (TTFT), not total generation time.
TTFT targets for production systems: under 300ms for real-time chat, under 1 second for document analysis, under 2 seconds for complex multi-step reasoning. These are achievable with the right infrastructure placement — LLM API calls from a server co-located with the provider's endpoints shave 50-150ms vs calls from user devices.
# FastAPI streaming endpoint
import asyncio
import json
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import anthropic
app = FastAPI()
stream_client = anthropic.Anthropic()
class StreamRequest(BaseModel):
message: str
conversation_id: str
system_prompt: str = ""
async def generate_stream(message: str, system: str):
"""
Generator that yields SSE-formatted chunks.
Cost note: you pay for ALL tokens generated even on cancelled streams.
Implement server-side cancellation to avoid paying for abandoned requests.
"""
try:
with stream_client.messages.stream(
model="claude-sonnet-4-5",
max_tokens=2048,
system=system or "You are a helpful assistant.",
messages=[{"role": "user", "content": message}]
) as stream:
for text in stream.text_stream:
# SSE format: data: <payload>\n\n
# Wrap in JSON to carry metadata alongside content
chunk = json.dumps({"type": "text", "content": text})
yield f"data: {chunk}\n\n"
# Send final usage stats so client can track cost
final_message = stream.get_final_message()
usage = {
"type": "usage",
"input_tokens": final_message.usage.input_tokens,
"output_tokens": final_message.usage.output_tokens,
# Approximate cost at Sonnet pricing
"cost_usd": round(
final_message.usage.input_tokens * 3.00 / 1_000_000 +
final_message.usage.output_tokens * 15.00 / 1_000_000,
6
)
}
yield f"data: {json.dumps(usage)}\n\n"
yield "data: [DONE]\n\n"
except anthropic.APIError as e:
error = json.dumps({"type": "error", "message": str(e)})
yield f"data: {error}\n\n"
yield "data: [DONE]\n\n"
@app.post("/stream")
async def stream_endpoint(request: StreamRequest):
return StreamingResponse(
generate_stream(request.message, request.system_prompt),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Critical for nginx — disables response buffering
"Connection": "keep-alive"
}
)
The JavaScript client implements reconnection on dropped streams, which is essential for mobile users and unreliable connections. Partial content already shown to the user must be tracked so reconnection appends rather than replaces:
// JavaScript EventSource client with reconnect and cancellation
class LLMStreamClient {
constructor(endpoint) {
this.endpoint = endpoint;
this.controller = null;
}
async stream(message, onChunk, onDone, onError) {
// AbortController allows client-side cancellation
// Without this, navigating away still consumes tokens server-side
this.controller = new AbortController();
const response = await fetch(this.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message }),
signal: this.controller.signal
});
if (!response.ok) {
onError(new Error(`HTTP ${response.status}`));
return;
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop(); // Keep incomplete chunk in buffer
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const data = line.slice(6);
if (data === '[DONE]') {
onDone();
return;
}
try {
const parsed = JSON.parse(data);
if (parsed.type === 'text') onChunk(parsed.content);
if (parsed.type === 'usage') onDone(parsed);
if (parsed.type === 'error') onError(new Error(parsed.message));
} catch (e) {
// Malformed JSON in stream — log and continue
console.warn('Stream parse error:', e, 'Raw:', data);
}
}
}
} catch (e) {
if (e.name !== 'AbortError') onError(e);
}
}
cancel() {
// Client-side cancel — sends abort signal to fetch
// Server still generates tokens until it processes the disconnect
// FastAPI detects client disconnect within ~500ms via request.is_disconnected()
if (this.controller) this.controller.abort();
}
}

4. Context Window Management
Modern LLMs support 128K to 1M token context windows, but "fits in context" and "performs well in context" are different claims. Research on needle-in-a-haystack benchmarks consistently shows degraded recall on information placed in the middle of very long contexts — models attend more strongly to the beginning and end of the prompt. Stuffing every available document into a 128K context window degrades answer quality compared to a well-curated 8K context.
The right mental model is a sliding window: keep the system prompt and the most relevant retrieved context fixed, summarize older conversation turns when the rolling history grows beyond budget, and always track token counts before sending.
import tiktoken
from typing import List, Dict, Optional, Tuple
# tiktoken for OpenAI models; Anthropic has its own token counting API
# Always count BEFORE sending — surprise context overruns are expensive
enc = tiktoken.encoding_for_model("gpt-4o")
def count_tokens_openai(text: str) -> int:
"""Count tokens for OpenAI models. ~0.1ms per call."""
return len(enc.encode(text))
async def count_tokens_anthropic(messages: List[Dict], system: str) -> int:
"""Use Anthropic's token counting API — exact, model-specific."""
response = client.messages.count_tokens(
model="claude-sonnet-4-5",
system=system,
messages=messages
)
return response.input_tokens
class ConversationManager:
"""
Manages conversation history within a token budget using rolling summarization.
Strategy:
- Keep last N turns verbatim (recent context is highest value)
- Summarize older turns when budget exceeded (preserves key facts, saves tokens)
- Entity extraction for persistent facts (user preferences, key decisions)
Token budget allocation (example for 8K context):
- System prompt: 1500 tokens (reserved)
- Retrieved context: 3000 tokens (reserved for RAG)
- Conversation history: 2500 tokens (managed here)
- Response buffer: 1000 tokens (reserved for output)
"""
def __init__(
self,
system_prompt: str,
max_history_tokens: int = 2500,
summarize_threshold: int = 2000, # Summarize when history exceeds this
keep_recent_turns: int = 4 # Always keep last N turns verbatim
):
self.system_prompt = system_prompt
self.max_history_tokens = max_history_tokens
self.summarize_threshold = summarize_threshold
self.keep_recent_turns = keep_recent_turns
self.history: List[Dict] = []
self.summary: Optional[str] = None
def _count_history_tokens(self) -> int:
total = 0
for msg in self.history:
total += count_tokens_openai(str(msg.get("content", "")))
if self.summary:
total += count_tokens_openai(self.summary)
return total
async def _summarize_old_turns(self, turns_to_summarize: List[Dict]) -> str:
"""
Summarize older conversation turns.
Cost: ~500 input tokens + ~200 output tokens = ~$0.0016 per summarization.
Saves ~2000 tokens on every subsequent request = ~$0.006/request.
Break-even: ~1 subsequent request after summarization.
"""
conversation_text = "\n".join([
f"{msg['role'].upper()}: {msg['content']}"
for msg in turns_to_summarize
])
response = await openai_client.chat.completions.create(
model="gpt-4o-mini", # Use cheap model for summarization
messages=[
{
"role": "system",
"content": (
"Summarize this conversation segment concisely. "
"Preserve: key decisions made, facts established, "
"user preferences, unresolved questions. "
"Omit: pleasantries, repeated information, verbose explanations. "
"Output a 2-4 sentence summary."
)
},
{"role": "user", "content": conversation_text}
],
max_tokens=200
)
return response.choices[0].message.content
async def add_turn(self, role: str, content: str):
"""Add a turn and compress history if over token budget."""
self.history.append({"role": role, "content": content})
# Check if we need to compress
if self._count_history_tokens() > self.summarize_threshold:
# Split: keep recent turns verbatim, summarize the rest
recent = self.history[-self.keep_recent_turns:]
old = self.history[:-self.keep_recent_turns]
if old:
new_summary = await self._summarize_old_turns(old)
# Append to existing summary if present
if self.summary:
self.summary = f"{self.summary}\n\nLater: {new_summary}"
else:
self.summary = new_summary
self.history = recent
def get_messages_for_api(self) -> Tuple[List[Dict], int]:
"""
Return messages formatted for API, prepending summary if present.
Also returns token count for budget enforcement.
"""
messages = []
if self.summary:
messages.append({
"role": "user",
"content": f"[Conversation summary from earlier: {self.summary}]"
})
messages.append({
"role": "assistant",
"content": "Understood, I have that context."
})
messages.extend(self.history)
token_count = self._count_history_tokens()
return messages, token_count
5. Reliability and Evaluation
LLM APIs have higher variance failure modes than traditional HTTP services: rate limiting under load, partial stream failures, context length errors from unexpected input sizes, and occasional model degradation that produces coherent but incorrect outputs. A production LLM client handles all of these.
import asyncio
import random
from dataclasses import dataclass
from enum import Enum
import anthropic
from openai import AsyncOpenAI
from pydantic import BaseModel, ValidationError
from typing import TypeVar, Type, Optional, Callable, Any
T = TypeVar('T', bound=BaseModel)
class LLMProvider(Enum):
ANTHROPIC = "anthropic"
OPENAI = "openai"
@dataclass
class LLMConfig:
provider: LLMProvider
model: str
max_tokens: int = 1024
timeout: float = 30.0 # Hard timeout — LLMs can genuinely hang on large outputs
class ResilientLLMClient:
"""
Production-grade LLM client with:
- Exponential backoff retries on rate limits and transient errors
- Provider fallback (Anthropic → OpenAI)
- Hard timeout enforcement
- Structured output with retry on parse failure
"""
def __init__(self):
self.anthropic = anthropic.Anthropic()
self.openai = AsyncOpenAI()
# Primary + fallback provider chain
self.primary = LLMConfig(
provider=LLMProvider.ANTHROPIC,
model="claude-sonnet-4-5",
timeout=30.0
)
self.fallback = LLMConfig(
provider=LLMProvider.OPENAI,
model="gpt-4o",
timeout=30.0
)
async def _call_with_timeout(
self,
config: LLMConfig,
messages: List[Dict],
system: str = ""
) -> str:
"""Single LLM call with hard timeout. Raises TimeoutError if exceeded."""
try:
if config.provider == LLMProvider.ANTHROPIC:
# asyncio.wait_for wraps the sync Anthropic client in a thread
response = await asyncio.wait_for(
asyncio.get_event_loop().run_in_executor(
None,
lambda: self.anthropic.messages.create(
model=config.model,
max_tokens=config.max_tokens,
system=system,
messages=messages
)
),
timeout=config.timeout
)
return response.content[0].text
else: # OpenAI
response = await asyncio.wait_for(
self.openai.chat.completions.create(
model=config.model,
max_tokens=config.max_tokens,
messages=[
{"role": "system", "content": system},
*messages
]
),
timeout=config.timeout
)
return response.choices[0].message.content
except asyncio.TimeoutError:
# Timeout after 30s — happens on very long outputs or provider latency spikes
raise TimeoutError(f"LLM call timed out after {config.timeout}s")
async def complete(
self,
messages: List[Dict],
system: str = "",
max_retries: int = 3
) -> str:
"""
Complete with exponential backoff retries and provider fallback.
Jitter prevents thundering herd on rate limit recovery.
"""
last_error = None
for attempt in range(max_retries):
try:
return await self._call_with_timeout(self.primary, messages, system)
except (anthropic.RateLimitError, anthropic.APIStatusError) as e:
last_error = e
# Exponential backoff with full jitter: sleep(random(0, 2^attempt))
# Full jitter outperforms equal jitter for distributed systems
wait = random.uniform(0, 2 ** attempt)
print(f"Primary provider error (attempt {attempt + 1}): {e}. "
f"Retrying in {wait:.1f}s")
await asyncio.sleep(wait)
except TimeoutError as e:
last_error = e
print(f"Primary provider timeout (attempt {attempt + 1})")
# All retries exhausted — try fallback provider
print(f"Falling back to {self.fallback.provider.value} after {max_retries} failures")
try:
return await self._call_with_timeout(self.fallback, messages, system)
except Exception as e:
raise RuntimeError(
f"Both providers failed. Primary: {last_error}. Fallback: {e}"
)
async def complete_structured(
self,
messages: List[Dict],
output_schema: Type[T],
system: str = "",
max_parse_retries: int = 2
) -> T:
"""
Complete and parse into a Pydantic model.
Retries with the parse error in the prompt on validation failure.
"""
schema_instruction = (
f"\n\nRespond with valid JSON matching this schema:\n"
f"{output_schema.model_json_schema()}\n"
f"Output ONLY the JSON object, no explanation."
)
current_messages = list(messages)
for attempt in range(max_parse_retries + 1):
response_text = await self.complete(current_messages, system + schema_instruction)
try:
# Handle markdown code fences that models sometimes add
json_text = response_text.strip()
if json_text.startswith("```"):
json_text = json_text.split("```")[1]
if json_text.startswith("json"):
json_text = json_text[4:]
return output_schema.model_validate_json(json_text)
except (ValidationError, ValueError) as e:
if attempt < max_parse_retries:
# Add parse error to conversation so the model can self-correct
current_messages.append({"role": "assistant", "content": response_text})
current_messages.append({
"role": "user",
"content": f"That response failed validation: {e}. "
f"Please correct it and respond with valid JSON only."
})
else:
raise ValueError(
f"Failed to parse structured output after {max_parse_retries} retries. "
f"Last response: {response_text[:200]}"
)
# LLM-as-judge for automated quality evaluation
# Cost: ~500 tokens per evaluation = $0.0015 at GPT-4o-mini pricing
# Use for: regression testing on prompt changes, production quality sampling
class EvaluationResult(BaseModel):
score: int # 1-5
reasoning: str
passed: bool
llm_client = ResilientLLMClient()
async def llm_judge_quality(
question: str,
answer: str,
reference_answer: Optional[str] = None
) -> EvaluationResult:
"""
Use a cheap model to score answer quality.
Calibrate against human labels before deploying to production.
Run on 5% sample in production, 100% in staging regression tests.
"""
reference_section = ""
if reference_answer:
reference_section = f"\nReference answer: {reference_answer}"
result = await llm_client.complete_structured(
messages=[{
"role": "user",
"content": (
f"Question: {question}\n"
f"Answer: {answer}"
f"{reference_section}\n\n"
"Score the answer 1-5 where:\n"
"5: Complete, accurate, well-structured\n"
"4: Mostly correct, minor gaps\n"
"3: Partially correct, notable gaps\n"
"2: Mostly incorrect or misleading\n"
"1: Wrong or unhelpful"
)
}],
output_schema=EvaluationResult,
system="You are an expert evaluator. Be precise and critical."
)
return result
Latency SLOs for production LLM services: TTFT p50 under 400ms, p95 under 1.2s. Total generation p50 under 4s for typical outputs, p95 under 15s. Anything slower than these thresholds should trigger investigation — provider latency spikes, context window pressure, or infrastructure bottlenecks between your service and the LLM API.
6. Production Architecture Patterns
The LLM API call is rarely the bottleneck in a well-architected system. The bottlenecks are queue management for burst traffic, result deduplication for repeated queries, and observability gaps that make it impossible to diagnose cost spikes or quality regressions.
# Semantic result caching — cache LLM responses for semantically similar queries
# Not exact string matching; uses embedding similarity to detect near-duplicate queries
# Hit rate in practice: 15-35% depending on query diversity
# Saves: ~$0.018 per cache hit at 2000-token average input (Sonnet pricing)
from functools import lru_cache
import hashlib
import json
import time
class SemanticCache:
"""
Cache LLM responses by query embedding similarity.
Backend: Redis with vector search (Redis Stack) or any vector DB.
TTL: 1 hour for factual queries, 24h for stable reference questions.
"""
def __init__(self, similarity_threshold: float = 0.95, ttl_seconds: int = 3600):
self.threshold = similarity_threshold
self.ttl = ttl_seconds
# In production: use Redis + vector index
# This demo uses in-memory storage
self._cache: List[Dict] = []
def _get_embedding(self, text: str) -> List[float]:
"""Get embedding for cache key."""
# Use a fast, cheap embedding model for cache lookups
# text-embedding-3-small: $0.02/MTok — negligible vs LLM call cost
response = client.messages.create( # placeholder — use embedding API
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def get(self, query: str) -> Optional[str]:
"""Look up cached response by semantic similarity."""
if not self._cache:
return None
query_emb = self._get_embedding(query)
now = time.time()
best_score = 0
best_entry = None
for entry in self._cache:
if now - entry["timestamp"] > self.ttl:
continue
# Cosine similarity
score = np.dot(query_emb, entry["embedding"]) / (
np.linalg.norm(query_emb) * np.linalg.norm(entry["embedding"])
)
if score > best_score:
best_score = score
best_entry = entry
if best_score >= self.threshold and best_entry:
return best_entry["response"]
return None
def set(self, query: str, response: str):
"""Cache a query-response pair."""
embedding = self._get_embedding(query)
self._cache.append({
"query": query,
"embedding": embedding,
"response": response,
"timestamp": time.time()
})
# Per-user token budget enforcement
# Prevents single users from exhausting shared rate limits
# Track daily token spend per user_id; block or throttle at threshold
class TokenBudgetEnforcer:
"""
Track and enforce per-user daily token budgets.
Storage: Redis with TTL, keyed by user_id:YYYY-MM-DD.
"""
def __init__(self, daily_token_limit: int = 100_000):
self.limit = daily_token_limit
# In production: use Redis
self._usage: Dict[str, int] = {}
def check_and_increment(self, user_id: str, tokens_requested: int) -> bool:
"""
Returns True if user is within budget, False if over limit.
Atomically checks and increments — use Redis INCRBY for production.
"""
key = f"{user_id}:{time.strftime('%Y-%m-%d')}"
current = self._usage.get(key, 0)
if current + tokens_requested > self.limit:
return False
self._usage[key] = current + tokens_requested
return True
Observability is non-negotiable. Every LLM request should emit a structured log entry with: user_id, model, prompt_tokens, completion_tokens, cached_tokens, cost_usd, latency_ms, ttft_ms, request_id, session_id. This data drives cost attribution, quality monitoring, and capacity planning. Without it, you're operating blind.
Multi-tenant deployments must isolate usage tracking and, where compliance requires it, prompt/response logging per tenant. Store conversation history in tenant-partitioned storage. Rotate API keys per-environment, not globally — a compromised development key should not affect production.
Conclusion
The LLM production stack in 2026 is not complicated, but it requires discipline at every layer. The patterns in this post address the four places where teams consistently waste resources or sacrifice reliability.
On cost: prompt caching alone, applied to the system prompt, returns 50-90% on cached reads versus cold input. Model routing with a cheap classifier cuts LLM spend by 35-50% on mixed-complexity workloads. These are not marginal improvements — they determine whether a product is economically viable at scale.
On latency: streaming is not optional for interactive applications. TTFT under 300ms is achievable and required. Hybrid search with cross-encoder re-ranking adds 50ms of retrieval latency and meaningfully improves answer quality — the tradeoff is almost always worth it.
On reliability: exponential backoff with provider fallback handles the vast majority of LLM API failures transparently. Structured output with parse-error retry loops catches the long tail of model output failures. Hard timeouts prevent hung requests from blocking your async workers.
On accuracy: RAG quality comes from retrieval precision, not context window size. Semantic chunking with sentence-window expansion, hybrid dense+sparse search, and cross-encoder re-ranking produce retrievals that compete with significantly larger context approaches at 30-40% lower token cost.
The teams shipping the best LLM products in 2026 are not the ones with the biggest context windows — they are the ones who instrument every API call, measure cache hit rates, run eval suites before every prompt change, and treat the LLM as a component in a system rather than a magic box. Build the boring infrastructure first. The product quality follows.
Sources
- Anthropic Prompt Caching Documentation
- OpenAI Prompt Caching Guide
- LLMLingua: Prompt Compression Research
- BGE-M3 Embedding Model
- ms-marco-MiniLM Cross-Encoder
- Reciprocal Rank Fusion (Cormack et al., 2009)
- Lost in the Middle: LLM Long-Context Performance Study
Enjoyed this post? Follow AmtocSoft for AI tutorials from beginner to professional.
☕ Buy Me a Coffee | 🔔 YouTube | 💼 LinkedIn | 🐦 X/Twitter
Comments
Post a Comment