API Design in 2026: REST, Versioning, Pagination, and Rate Limiting Patterns

API Design in 2026: REST, Versioning, Pagination, and Rate Limiting Patterns

Hero image

Introduction

Most API design mistakes are invisible until scale exposes them. Offset-based pagination seems fine at 10,000 records; it returns wrong results and causes full-table scans at 10 million. A single versioning strategy that works for your first three clients fails when a fourth client needs a breaking change. The patterns that seemed like over-engineering at launch become the foundation you're glad you built when client count reaches double digits. Rate limiting by IP address seems obvious until you have enterprise customers behind corporate NAT proxies sharing a single IP — you'll rate-limit an entire company because one employee hit a burst.

This post covers the API design patterns that hold up at scale: URL structure and resource modeling, versioning strategies with their trade-offs, cursor-based pagination for large datasets, rate limiting by identity not IP, idempotency keys for safe retries, and the OpenAPI specification workflow that makes API changes manageable across teams.

URL Design and Resource Modeling

RESTful URL design expresses resources (nouns) and uses HTTP methods (verbs) to express operations. The conventions that experienced API designers follow:

# Resources: plural nouns
GET    /api/v1/orders           # list
POST   /api/v1/orders           # create
GET    /api/v1/orders/{id}      # retrieve
PATCH  /api/v1/orders/{id}      # partial update
PUT    /api/v1/orders/{id}      # full replacement
DELETE /api/v1/orders/{id}      # delete

# Nested resources: relationships
GET    /api/v1/orders/{id}/items          # order's line items
POST   /api/v1/orders/{id}/items          # add item to order
DELETE /api/v1/orders/{id}/items/{itemId} # remove item

# Actions that don't map cleanly to CRUD: use sub-resources as verbs
POST /api/v1/orders/{id}/cancel    # cancel an order
POST /api/v1/orders/{id}/refund    # refund an order
POST /api/v1/users/{id}/verify     # verify email

# Query parameters: filtering, sorting, pagination — not resource addressing
GET /api/v1/orders?status=pending&sort=-created_at&limit=20&cursor=abc123

The rule for when to use nested resources vs query parameters: if the relationship is hierarchical (an item belongs to an order), use nested URLs. If you're filtering or searching across resources, use query parameters.

HTTP status codes communicate response semantics:

from fastapi import FastAPI, HTTPException, status
from fastapi.responses import JSONResponse

app = FastAPI()

@app.post("/api/v1/orders", status_code=status.HTTP_201_CREATED)
async def create_order(body: CreateOrderRequest, user=Depends(get_current_user)):
    # 201 Created: resource created, Location header points to new resource
    order = await order_service.create(body, user.id)
    return JSONResponse(
        status_code=201,
        content=order.dict(),
        headers={"Location": f"/api/v1/orders/{order.id}"}
    )

@app.get("/api/v1/orders/{order_id}")
async def get_order(order_id: str, user=Depends(get_current_user)):
    order = await order_service.get(order_id)
    if not order:
        raise HTTPException(
            status_code=404,
            detail={"error": "not_found", "message": f"Order {order_id} not found"}
        )
    if order.user_id != user.id:
        raise HTTPException(
            status_code=403,
            detail={"error": "forbidden", "message": "You cannot access this order"}
        )
    return order

# Status codes to use correctly:
# 200 OK: success with body
# 201 Created: resource created (POST)
# 204 No Content: success with no body (DELETE, action endpoints)
# 400 Bad Request: client sent invalid data (validation error)
# 401 Unauthorized: not authenticated
# 403 Forbidden: authenticated but not authorized
# 404 Not Found: resource doesn't exist
# 409 Conflict: resource already exists or state conflict
# 422 Unprocessable Entity: semantically invalid (invalid field value)
# 429 Too Many Requests: rate limit exceeded
# 500 Internal Server Error: unexpected server error
Architecture diagram

Versioning Strategies

Breaking API changes break clients. The versioning strategies and their trade-offs:

URL versioning (/api/v1/, /api/v2/): explicit, cacheable, easy to route. Clients must explicitly migrate. The most common and pragmatic choice.

Header versioning (Accept: application/vnd.myapi.v2+json): cleaner URLs, harder to test (can't just open in browser). Popular in enterprise APIs.

Query parameter (?version=2): easy to add, but pollutes every URL and is easily forgotten in documentation.

# URL versioning implementation in FastAPI
from fastapi import FastAPI, APIRouter

app = FastAPI()

# v1 router: keep forever for backward compatibility
v1_router = APIRouter(prefix="/api/v1")

@v1_router.get("/orders/{id}")
async def get_order_v1(id: str):
    order = await get_order(id)
    # v1 format: amount as decimal string
    return {
        "id": order.id,
        "amount": f"{order.total_cents / 100:.2f}",  # "49.99"
        "status": order.status,
    }

# v2 router: new format
v2_router = APIRouter(prefix="/api/v2")

@v2_router.get("/orders/{id}")
async def get_order_v2(id: str):
    order = await get_order(id)
    # v2 format: amount as integer cents (breaking change from v1)
    return {
        "id": order.id,
        "amount_cents": order.total_cents,     # breaking: renamed + type change
        "currency": order.currency,            # new field
        "status": order.status,
        "created_at": order.created_at.isoformat(),
    }

app.include_router(v1_router)
app.include_router(v2_router)

Version sunset policy: announce deprecation 12+ months ahead, add Sunset and Deprecation headers to v1 responses, monitor v1 traffic, contact active clients. Only sunset when v1 traffic reaches zero or after the sunset date.

# Add deprecation headers to old versions
@v1_router.middleware("http")
async def add_deprecation_headers(request, call_next):
    response = await call_next(request)
    response.headers["Deprecation"] = "true"
    response.headers["Sunset"] = "Sat, 01 Jan 2027 00:00:00 GMT"
    response.headers["Link"] = '</api/v2/orders>; rel="successor-version"'
    return response

Cursor-Based Pagination

Offset-based pagination (LIMIT 20 OFFSET 1000) has two production problems: it causes full-table scans at high offsets (PostgreSQL counts and skips 1,000 rows before returning 20), and it produces inconsistent results when records are inserted or deleted between pages (page 2 might return records already shown on page 1).

Cursor-based pagination uses an opaque cursor (typically an encoded primary key or timestamp) to mark position — O(1) regardless of offset depth.

from base64 import b64encode, b64decode
import json
from datetime import datetime

def encode_cursor(order_id: str, created_at: datetime) -> str:
    """Encode pagination cursor — opaque to clients."""
    data = {"id": order_id, "created_at": created_at.isoformat()}
    return b64encode(json.dumps(data).encode()).decode()

def decode_cursor(cursor: str) -> tuple[str, datetime]:
    """Decode pagination cursor."""
    data = json.loads(b64decode(cursor.encode()).decode())
    return data["id"], datetime.fromisoformat(data["created_at"])

@app.get("/api/v2/orders")
async def list_orders(
    limit: int = Query(default=20, ge=1, le=100),
    cursor: str | None = None,
    status: str | None = None,
    user=Depends(get_current_user)
):
    query = """
        SELECT id, total_cents, status, created_at
        FROM orders
        WHERE user_id = $1
        {status_filter}
        {cursor_filter}
        ORDER BY created_at DESC, id DESC  -- deterministic ordering
        LIMIT $2
    """

    params = [user.id, limit + 1]  # fetch one extra to detect "has more"

    if status:
        query = query.format(
            status_filter="AND status = $3",
            cursor_filter="" if not cursor else "AND (created_at, id) < ($4, $5)"
        )
        params.append(status)

    if cursor:
        cursor_id, cursor_created_at = decode_cursor(cursor)
        params.extend([cursor_created_at, cursor_id])

    orders = await db.fetch(query.format(
        status_filter=f"AND status = ${len(params)-1}" if status else "",
        cursor_filter=f"AND (created_at, id) < (${len(params)-1}, ${len(params)})" if cursor else ""
    ), *params)

    has_more = len(orders) > limit
    orders = orders[:limit]  # trim the extra

    next_cursor = None
    if has_more and orders:
        last = orders[-1]
        next_cursor = encode_cursor(last['id'], last['created_at'])

    return {
        "data": orders,
        "pagination": {
            "has_more": has_more,
            "next_cursor": next_cursor,
            "limit": limit,
        }
    }

The client receives next_cursor and passes it as ?cursor= in the next request. Each page query is O(log n) (index seek) regardless of how deep into the dataset you are.

Comparison visual

Rate Limiting by Identity

Rate limiting by IP address fails in two scenarios: enterprise customers behind corporate NAT share one IP (you rate-limit the entire company when one employee is over the limit), and attackers with multiple IPs or rotating proxies bypass per-IP limits.

Production rate limiting is by API key or user ID, using a sliding window algorithm:

import redis.asyncio as redis
import time
from fastapi import HTTPException, Depends, Header

r = redis.from_url("redis://redis:6379")

async def rate_limit(
    api_key: str = Header(alias="X-API-Key"),
    limit: int = 1000,       # 1000 requests per window
    window_seconds: int = 3600  # per hour
) -> None:
    """Sliding window rate limiter using Redis sorted sets."""
    key = f"rate_limit:{api_key}"
    now = time.time()
    window_start = now - window_seconds

    pipe = r.pipeline()
    # Remove entries outside the window
    pipe.zremrangebyscore(key, 0, window_start)
    # Count requests in current window
    pipe.zcard(key)
    # Add current request (score=timestamp, member=unique ID)
    pipe.zadd(key, {f"{now}:{id(object())}": now})
    # Set TTL to prevent memory leak
    pipe.expire(key, window_seconds)
    _, count, _, _ = await pipe.execute()

    if count >= limit:
        # Include retry information
        oldest = await r.zrange(key, 0, 0, withscores=True)
        retry_after = int(oldest[0][1] + window_seconds - now) if oldest else window_seconds
        raise HTTPException(
            status_code=429,
            detail={
                "error": "rate_limit_exceeded",
                "limit": limit,
                "window_seconds": window_seconds,
                "retry_after": retry_after,
            },
            headers={
                "Retry-After": str(retry_after),
                "X-RateLimit-Limit": str(limit),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(now + retry_after)),
            }
        )

# Add rate limit headers to successful responses too
@app.middleware("http")
async def add_rate_limit_headers(request, call_next):
    response = await call_next(request)
    # Attach rate limit info from request state (set by rate_limit dependency)
    if hasattr(request.state, 'rate_limit_info'):
        info = request.state.rate_limit_info
        response.headers["X-RateLimit-Limit"] = str(info['limit'])
        response.headers["X-RateLimit-Remaining"] = str(info['remaining'])
        response.headers["X-RateLimit-Reset"] = str(info['reset'])
    return response

Rate limit tiers by customer type (tiered limits):

RATE_LIMIT_TIERS = {
    'free':       {'limit': 100,   'window': 3600},
    'starter':    {'limit': 1000,  'window': 3600},
    'growth':     {'limit': 10000, 'window': 3600},
    'enterprise': {'limit': 100000,'window': 3600},
}

async def get_rate_limit_for_key(api_key: str) -> dict:
    key_info = await db.api_keys.get(api_key)
    tier = key_info.tier if key_info else 'free'
    return RATE_LIMIT_TIERS[tier]

Idempotency Keys for Safe Retries

Network requests fail. Clients retry. Without idempotency, a retry might create duplicate orders, double charges, or duplicate emails. The idempotency key pattern makes any non-idempotent operation safe to retry.

import hashlib

@app.post("/api/v2/orders")
async def create_order(
    body: CreateOrderRequest,
    idempotency_key: str = Header(alias="Idempotency-Key"),
    user=Depends(get_current_user)
):
    """Create an order. Safe to retry with the same Idempotency-Key."""
    cache_key = f"idempotency:{user.id}:{idempotency_key}"

    # Check if we've seen this key before
    cached = await r.get(cache_key)
    if cached:
        # Return the exact same response as the original request
        return JSONResponse(
            content=json.loads(cached),
            headers={"Idempotent-Replayed": "true"}
        )

    # Process the request
    order = await order_service.create(body, user.id)
    response_body = order.dict()

    # Cache the response for 24 hours
    await r.setex(cache_key, 86400, json.dumps(response_body))

    return JSONResponse(
        status_code=201,
        content=response_body,
        headers={"Location": f"/api/v2/orders/{order.id}"}
    )

The client generates a unique idempotency key per logical operation (a UUID v4 works well) and includes it in the Idempotency-Key header. If the request times out, the client retries with the same key. If the server processed it already, it returns the cached response. If not, it processes normally. The client always gets a result, and the server processes the operation exactly once.

Webhook Design: Event-Driven API Integrations

Webhooks push events to clients rather than requiring polling. The design decisions that determine whether your webhook implementation is reliable or frustrating:

import hmac
import hashlib
import json
from datetime import datetime

# Webhook payload: include event type, timestamp, and idempotency ID
def build_webhook_payload(event_type: str, data: dict) -> dict:
    return {
        "id": f"evt_{uuid4().hex}",    # unique event ID — clients can deduplicate
        "type": event_type,            # e.g., "order.created", "payment.failed"
        "created": int(datetime.utcnow().timestamp()),
        "data": data,
        "api_version": "2026-06-01",   # schema version of the payload
    }

# Signature verification: HMAC-SHA256 of the raw body
def sign_webhook(payload: str, secret: str) -> str:
    """Sign webhook payload with HMAC-SHA256."""
    timestamp = int(time.time())
    signed_payload = f"{timestamp}.{payload}"
    signature = hmac.new(
        secret.encode(), signed_payload.encode(), hashlib.sha256
    ).hexdigest()
    return f"t={timestamp},v1={signature}"

# Delivery with retry
async def deliver_webhook(endpoint_url: str, payload: dict, secret: str) -> bool:
    body = json.dumps(payload)
    signature = sign_webhook(body, secret)

    for attempt in range(5):  # retry up to 5 times
        try:
            async with httpx.AsyncClient(timeout=30.0) as client:
                response = await client.post(
                    endpoint_url,
                    content=body,
                    headers={
                        "Content-Type": "application/json",
                        "X-Webhook-Signature": signature,
                        "X-Webhook-Attempt": str(attempt + 1),
                    }
                )
                if response.status_code < 400:
                    return True
                # 4xx: don't retry (bad endpoint config, not transient)
                if response.status_code < 500:
                    return False
        except httpx.TimeoutException:
            pass  # retry on timeout

        # Exponential backoff: 1s, 2s, 4s, 8s, 16s
        await asyncio.sleep(2 ** attempt)

    return False  # all retries exhausted

Clients verify the webhook signature before processing:

def verify_webhook(payload: str, signature_header: str, secret: str) -> bool:
    """Verify HMAC-SHA256 webhook signature. Reject if >300 seconds old."""
    parts = dict(item.split("=") for item in signature_header.split(","))
    timestamp = int(parts.get("t", 0))
    received_sig = parts.get("v1", "")

    # Replay attack prevention: reject webhooks older than 5 minutes
    if abs(time.time() - timestamp) > 300:
        return False

    signed_payload = f"{timestamp}.{payload}"
    expected_sig = hmac.new(
        secret.encode(), signed_payload.encode(), hashlib.sha256
    ).hexdigest()

    return hmac.compare_digest(expected_sig, received_sig)

HTTP Caching: ETags and Conditional Requests

Properly implemented HTTP caching reduces server load and improves client performance for resources that don't change frequently.

import hashlib
from datetime import datetime, timedelta

@app.get("/api/v2/orders/{order_id}")
async def get_order(
    order_id: str,
    request: Request,
    user=Depends(get_current_user)
):
    order = await order_service.get(order_id, user.id)
    if not order:
        raise HTTPException(404)

    # ETag: hash of the resource content
    etag = f'"{hashlib.md5(order.json().encode()).hexdigest()}"'

    # 304 Not Modified: if client has current version
    if request.headers.get("If-None-Match") == etag:
        return Response(status_code=304, headers={"ETag": etag})

    # Last-Modified: for time-based conditional requests
    last_modified = order.updated_at.strftime("%a, %d %b %Y %H:%M:%S GMT")
    if_modified_since = request.headers.get("If-Modified-Since")
    if if_modified_since:
        ims_date = datetime.strptime(if_modified_since, "%a, %d %b %Y %H:%M:%S GMT")
        if order.updated_at <= ims_date.replace(tzinfo=None):
            return Response(status_code=304)

    return JSONResponse(
        content=order.dict(),
        headers={
            "ETag": etag,
            "Last-Modified": last_modified,
            "Cache-Control": "private, max-age=60",  # cache for 60s client-side
        }
    )

For list endpoints, avoid client-side caching (lists change frequently). For individual resources, ETags enable the client to request "give me the order only if it changed since I last fetched it" — reducing response body size to zero bytes for unchanged resources.

OpenAPI and API-First Design

OpenAPI (Swagger) specification defines your API as a YAML/JSON document that can generate documentation, client SDKs, mock servers, and test stubs. The API-first workflow: write the spec before writing code.

# FastAPI generates OpenAPI automatically from type annotations
from pydantic import BaseModel, field_validator
from typing import Annotated

class CreateOrderRequest(BaseModel):
    items: Annotated[list[OrderItem], Field(min_length=1, max_length=50)]
    currency: Literal['USD', 'EUR', 'GBP']
    shipping_address: ShippingAddress
    coupon_code: str | None = None

    model_config = {
        "json_schema_extra": {
            "example": {
                "items": [{"product_id": "prod_123", "quantity": 2}],
                "currency": "USD",
                "shipping_address": {"street": "123 Main St", ...}
            }
        }
    }

# FastAPI exposes: GET /openapi.json, GET /docs (Swagger UI), GET /redoc
# Generate client SDK: openapi-generator-cli generate -i openapi.json -g typescript-fetch

# Validate requests against schema automatically
# All validation errors → 422 with field-level details

Breaking vs non-breaking changes:
- Non-breaking (safe): add optional fields, add new endpoints, add new enum values to responses
- Breaking (requires version bump): remove fields, rename fields, change field types, change required/optional status, remove enum values

API Authentication Patterns

Two patterns dominate production APIs:

API keys (machine-to-machine): simple, long-lived, easy to rotate. Best for server-to-server integrations.

async def validate_api_key(
    x_api_key: str = Header(alias="X-API-Key"),
) -> ApiKey:
    # Timing-safe comparison prevents timing attacks
    key_hash = hashlib.sha256(x_api_key.encode()).hexdigest()
    key_record = await db.api_keys.find_by_hash(key_hash)

    if not key_record or key_record.revoked:
        raise HTTPException(401, detail={"error": "invalid_api_key"})

    # Track last used — helps customers audit key usage
    await db.api_keys.update(key_record.id, {"last_used_at": datetime.utcnow()})
    return key_record

# Key format: prefix_randomhex (e.g., "sk_live_abc123...")
# Prefix identifies key type (test vs live), makes them greppable in logs
# Never log the full key — log only key_id (database row ID)

JWT Bearer tokens (user-facing APIs): short-lived, self-contained claims, no database lookup per request. Best for user authentication flows.

import jwt
from datetime import datetime, timedelta

SECRET_KEY = os.environ["JWT_SECRET"]
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15    # short-lived
REFRESH_TOKEN_EXPIRE_DAYS = 30

def create_access_token(user_id: str, scope: list[str]) -> str:
    payload = {
        "sub": user_id,
        "scope": scope,
        "iat": datetime.utcnow(),
        "exp": datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
        "type": "access",
    }
    return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(
    authorization: str = Header(),
) -> dict:
    try:
        token = authorization.removeprefix("Bearer ")
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "access":
            raise ValueError("Not an access token")
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(401, detail={"error": "token_expired"})
    except (jwt.InvalidTokenError, ValueError):
        raise HTTPException(401, detail={"error": "invalid_token"})

The combination: API keys for server-to-server, JWT for browser/mobile clients. Both patterns include the error field with a machine-readable code so clients can handle specific error types programmatically.

Error Response Design

Consistent error responses make debugging and client error handling tractable:

from pydantic import BaseModel

class ErrorDetail(BaseModel):
    field: str | None = None
    message: str
    code: str  # machine-readable error code

class ErrorResponse(BaseModel):
    error: str           # top-level error type: "validation_error", "not_found"
    message: str         # human-readable description
    request_id: str      # for support — correlates with server logs
    details: list[ErrorDetail] = []  # field-level errors for 400/422

# 400 Bad Request: validation error with field details
{
    "error": "validation_error",
    "message": "Request validation failed",
    "request_id": "req_abc123",
    "details": [
        {"field": "items.0.quantity", "message": "Must be between 1 and 100", "code": "range_error"},
        {"field": "currency", "message": "Must be one of: USD, EUR, GBP", "code": "invalid_enum"}
    ]
}

# 404 Not Found: no details needed
{
    "error": "not_found",
    "message": "Order ord_abc123 not found",
    "request_id": "req_def456"
}

# 500 Internal Server Error: never expose internals
{
    "error": "internal_error",
    "message": "An unexpected error occurred",
    "request_id": "req_ghi789"
    # No stack trace, no database error message
}

CORS and Security Headers

Browser clients require CORS headers for cross-origin API requests. The minimal correct configuration:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],  # specific origins, never "*" in production
    allow_credentials=True,    # allows cookies and auth headers
    allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
    allow_headers=["Authorization", "Content-Type", "X-API-Key", "Idempotency-Key"],
    max_age=86400,             # browser caches preflight for 24 hours
)

# Additional security headers middleware
@app.middleware("http")
async def add_security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
    # Note: don't add CSP headers to API responses — CSP is for HTML responses
    return response

Never use allow_origins=["*"] with allow_credentials=True — this is a browser security error and indicates a misconfigured CORS policy. Enumerate specific allowed origins.

Conclusion

API design decisions compound. A URL structure chosen in year one, an offset-based pagination scheme that "works for now," a rate limiter that counts IPs — these choices persist because changing them is a breaking change. The patterns in this post are the ones that hold up: cursor pagination scales to billions of rows, identity-based rate limiting handles enterprise customers, idempotency keys make payment APIs safe to retry, and URL versioning gives you a clear path for breaking changes.

The OpenAPI-first workflow ties everything together — a spec that documents, validates, and generates clients from a single source of truth. In 2026, APIs that aren't documented in OpenAPI are APIs that are hard to integrate with. The spec is the contract; the implementation proves the contract.

Good API design is also the foundation of developer experience. Consistent error codes, predictable pagination, documented idempotency, clear versioning timelines — these are what distinguish an API that developers trust from one they route around. The best API is the one that surprises clients least: predictable, consistent, and explicit about what happens when things go wrong. Get these foundations right early, and the API becomes a stable platform that many clients depend on. Get them wrong, and every breaking change becomes a negotiation.

Sources


Enjoyed this post? Follow AmtocSoft for AI tutorials from beginner to professional.

Buy Me a Coffee | 🔔 YouTube | 💼 LinkedIn | 🐦 X/Twitter

Comments

Popular posts from this blog

29 Million Secrets Leaked: The Hardcoded Credentials Crisis

What is an LLM? A Beginner's Guide to Large Language Models

What Is Voice AI? TTS, STT, and Voice Agents Explained