CQRS and Event Sourcing: The Patterns Behind High-Scale Systems

Introduction
There is a category of bugs that CRUD systems cannot prevent: the lost update. Two users open the same record at the same time, make different changes, and the second save silently overwrites the first. The data is consistent — the database is happy — but a user's work has disappeared without any error message. In financial systems, this is not just a UX problem; it is a correctness failure that can cost real money.
There is a performance problem that CRUD systems handle awkwardly: different parts of your application read data in completely different shapes. Your REST API returns normalized JSON objects. Your analytics dashboard needs aggregated counts and sums. Your search index wants denormalized documents. You end up with a single data model — your database tables — that serves all these use cases poorly. Every query requires joins, aggregations, or expensive transformations.
And there is an audit and debugging problem that CRUD systems are blind to: when something goes wrong in production, you can see the current state of your data, but not how it got there. The record shows a balance of $0, but which transactions led to that? The log might tell you if you have good structured logging. But the database itself has no memory of the past — only the present.
CQRS (Command Query Responsibility Segregation) and Event Sourcing are two architectural patterns that solve these problems. They are often used together, but they are distinct ideas. CQRS separates the write model from the read model, allowing each to be optimized independently. Event Sourcing replaces mutable state with an immutable sequence of events — the current state is always derived by replaying history.
These patterns are not appropriate for every application. They add significant complexity. But for high-scale systems where auditability, correctness, and flexible querying all matter — financial systems, healthcare records, distributed e-commerce — they are the foundation. Understanding them deeply will change how you think about system design.
The Problem: What CRUD Gets Wrong at Scale
Traditional CRUD (Create, Read, Update, Delete) works by maintaining current state in a mutable data store. When a user updates their profile, the old values are overwritten. When an order is fulfilled, its status changes from "Pending" to "Fulfilled". The database always reflects the world as it is now, not as it was.
This model has three fundamental tensions:
Tension 1: Read and Write Patterns Diverge
The shape of data for writing and the shape for reading are almost never the same. When you create an invoice, you store normalized data: invoice_id, customer_id, line_items[]. When you display an invoice to the customer, you need a denormalized view: customer name, address, product names, computed subtotals, tax, total. To get there, you join invoices, customers, products, and compute aggregates in the query.
As the system grows, the write model and the read model pull further apart. The write model needs strong consistency and validation. The read model needs performance and denormalization. Forcing both through the same ORM and database table is like using the same road for freight trucks and sports cars — neither is well-served.
CQRS's answer: have two models. The command model handles writes with full validation and business logic. The query model is a denormalized, pre-computed view optimized for reads. Keep them in sync asynchronously.
Tension 2: You Cannot Answer "What Happened?"
With mutable state, you know the current value of every field. You do not know why it changed, when it changed, or what it was before. Implementing audit logs as an afterthought is error-prone: developers add updated_at timestamps and updated_by user IDs, but these are add-on metadata, not a first-class record of changes.
In financial systems, "what happened" is not optional — it is the core of the business. A bank account is not a row with a balance column. It is a ledger: a sequence of deposits, withdrawals, and interest calculations. The balance is derived from the ledger, not stored independently.
Event Sourcing makes this the primary model. Instead of storing the current balance, you store the sequence of events: AccountOpened, MoneyDeposited(100), MoneyWithdrawn(30), InterestApplied(0.50). The current balance (70.50) is computed by replaying these events. The history is the truth; the current state is a cache.
Tension 3: Concurrency and Lost Updates
Standard CRUD protects against corruption with database transactions, but not against the lost update problem at the application level. If two background jobs both load a user record, modify different fields, and save, one save overwrites the other. Optimistic locking (version numbers) helps but requires discipline to implement consistently across every update path.
Event Sourcing provides a natural solution: the event stream is the single source of truth. To prevent concurrent modification, you use optimistic concurrency on the stream: a write to the event stream fails if the stream's version number has changed since you read it. This is the same as optimistic locking, but it is enforced by the architecture rather than bolted on.
How It Works: CQRS Architecture
CQRS splits your application into two halves along a fundamental boundary:
- Commands express intent to change state:
PlaceOrder,UpdateShippingAddress,CancelOrder. They are validated, applied to the domain model, and rejected if the business rules are not satisfied. Commands produce events. - Queries retrieve data without modifying it:
GetOrderDetails,SearchProducts,GetDashboardStats. They read from pre-built, optimized read models.

The write side and read side are completely independent systems. The write side cares about correctness and consistency; it can afford to be slower. The read side cares about query performance; it can afford to be slightly stale (eventual consistency from the event projection latency).
Event Sourcing: Rebuilding State from Events
Event Sourcing takes the write side of CQRS further: instead of storing the current state of a domain aggregate, you store every event that has ever happened to it. The current state is computed by replaying the event history.
Implementation Guide: Bank Account with Event Sourcing in TypeScript
Let us implement a complete bank account aggregate using event sourcing. This example covers all the core concepts: events, aggregate state, applying events, command validation, optimistic concurrency, and projections.
// ============================================================
// types.ts — Core event sourcing types
// ============================================================
/**
* Base interface for all domain events.
* Every event is immutable and carries enough data to stand alone
* — it should not require context from other events to be understood.
*/
export interface DomainEvent {
readonly eventId: string; // UUID — globally unique
readonly eventType: string; // Discriminator for the apply() switch
readonly aggregateId: string; // Which aggregate does this event belong to
readonly aggregateVersion: number; // Position in this aggregate's stream
readonly occurredAt: Date; // When the event happened
readonly payload: Record<string, unknown>; // Event-specific data
}
/**
* A command expresses intent to change state.
* Commands can fail (validation, business rules).
* Events cannot fail — they are records of what happened.
*/
export interface Command {
readonly commandId: string;
readonly commandType: string;
}
/**
* The event store interface. In production this is backed by
* PostgreSQL (with an append-only events table), EventStoreDB,
* or AWS DynamoDB Streams.
*/
export interface EventStore {
/** Append events to a stream, failing if expectedVersion is wrong */
appendToStream(
streamId: string,
events: DomainEvent[],
expectedVersion: number
): Promise<void>;
/** Load all events for a stream, optionally from a starting version */
loadStream(
streamId: string,
fromVersion?: number
): Promise<DomainEvent[]>;
}
// ============================================================
// bank-account-events.ts — All possible bank account events
// ============================================================
export type BankAccountEvent =
| AccountOpenedEvent
| MoneyDepositedEvent
| MoneyWithdrawnEvent
| AccountClosedEvent
| TransferInitiatedEvent;
export interface AccountOpenedEvent extends DomainEvent {
eventType: 'AccountOpened';
payload: {
accountHolderId: string;
initialDeposit: number;
currency: string;
};
}
export interface MoneyDepositedEvent extends DomainEvent {
eventType: 'MoneyDeposited';
payload: {
amount: number;
transactionReference: string;
description: string;
};
}
export interface MoneyWithdrawnEvent extends DomainEvent {
eventType: 'MoneyWithdrawn';
payload: {
amount: number;
transactionReference: string;
description: string;
};
}
export interface AccountClosedEvent extends DomainEvent {
eventType: 'AccountClosed';
payload: {
reason: string;
closedBy: string;
};
}
export interface TransferInitiatedEvent extends DomainEvent {
eventType: 'TransferInitiated';
payload: {
destinationAccountId: string;
amount: number;
transferId: string;
};
}
// ============================================================
// bank-account-aggregate.ts — The domain aggregate
// ============================================================
import { randomUUID } from 'crypto';
import { DomainEvent } from './types';
import { BankAccountEvent } from './bank-account-events';
/**
* The current state of a bank account, derived by replaying events.
* This is the aggregate's "snapshot" — rebuilt from scratch when loaded.
*/
interface BankAccountState {
accountId: string;
balance: number;
currency: string;
accountHolderId: string;
status: 'Active' | 'Closed';
version: number; // Current event stream version
}
/**
* BankAccount aggregate — the core domain object.
*
* Key design decisions:
* 1. State is private and read-only — only apply() can modify it
* 2. Business methods (deposit, withdraw) return new events, not modified state
* 3. apply() is the single place where state transitions happen
* 4. apply() is deterministic and side-effect-free (pure function of state + event)
*/
export class BankAccount {
private state: BankAccountState | null = null;
private pendingEvents: DomainEvent[] = [];
private constructor() {}
/**
* Restore a BankAccount aggregate from its event history.
* This is how you load a BankAccount from the event store.
*/
static fromEvents(events: BankAccountEvent[]): BankAccount {
const account = new BankAccount();
for (const event of events) {
account.applyEvent(event, false); // false = don't add to pendingEvents
}
return account;
}
/**
* Create a new bank account (handle the OpenAccount command).
* Returns a BankAccount instance with one pending event: AccountOpened.
*/
static open(params: {
accountId: string;
accountHolderId: string;
initialDeposit: number;
currency: string;
}): BankAccount {
const account = new BankAccount();
if (params.initialDeposit < 0) {
throw new Error('Initial deposit cannot be negative');
}
const event: AccountOpenedEvent = {
eventId: randomUUID(),
eventType: 'AccountOpened',
aggregateId: params.accountId,
aggregateVersion: 1,
occurredAt: new Date(),
payload: {
accountHolderId: params.accountHolderId,
initialDeposit: params.initialDeposit,
currency: params.currency,
},
};
account.applyEvent(event, true); // true = add to pendingEvents for saving
return account;
}
/**
* Deposit money into the account.
* Validates business rules, then creates a MoneyDeposited event.
*/
deposit(params: {
amount: number;
transactionReference: string;
description?: string;
}): void {
this.ensureInitialized();
this.ensureActive();
if (params.amount <= 0) {
throw new Error(`Deposit amount must be positive, got: ${params.amount}`);
}
if (params.amount > 1_000_000) {
throw new Error('Deposits over $1,000,000 require manual review');
}
const event: MoneyDepositedEvent = {
eventId: randomUUID(),
eventType: 'MoneyDeposited',
aggregateId: this.state!.accountId,
aggregateVersion: this.state!.version + 1,
occurredAt: new Date(),
payload: {
amount: params.amount,
transactionReference: params.transactionReference,
description: params.description ?? 'Deposit',
},
};
this.applyEvent(event, true);
}
/**
* Withdraw money from the account.
* Validates business rules (sufficient funds, account active).
*/
withdraw(params: {
amount: number;
transactionReference: string;
description?: string;
}): void {
this.ensureInitialized();
this.ensureActive();
if (params.amount <= 0) {
throw new Error(`Withdrawal amount must be positive, got: ${params.amount}`);
}
if (params.amount > this.state!.balance) {
throw new Error(
`Insufficient funds: balance ${this.state!.balance}, requested ${params.amount}`
);
}
const event: MoneyWithdrawnEvent = {
eventId: randomUUID(),
eventType: 'MoneyWithdrawn',
aggregateId: this.state!.accountId,
aggregateVersion: this.state!.version + 1,
occurredAt: new Date(),
payload: {
amount: params.amount,
transactionReference: params.transactionReference,
description: params.description ?? 'Withdrawal',
},
};
this.applyEvent(event, true);
}
/**
* Close the account.
*/
close(params: { reason: string; closedBy: string }): void {
this.ensureInitialized();
this.ensureActive();
if (this.state!.balance !== 0) {
throw new Error(
`Cannot close account with non-zero balance: ${this.state!.balance}`
);
}
const event: AccountClosedEvent = {
eventId: randomUUID(),
eventType: 'AccountClosed',
aggregateId: this.state!.accountId,
aggregateVersion: this.state!.version + 1,
occurredAt: new Date(),
payload: { reason: params.reason, closedBy: params.closedBy },
};
this.applyEvent(event, true);
}
// ---- Getters ----
get currentBalance(): number {
this.ensureInitialized();
return this.state!.balance;
}
get currentVersion(): number {
this.ensureInitialized();
return this.state!.version;
}
get isActive(): boolean {
return this.state?.status === 'Active';
}
/** Pending events to be saved to the event store */
get uncommittedEvents(): DomainEvent[] {
return [...this.pendingEvents];
}
/** Call after saving to the event store to clear the pending events list */
markEventsAsCommitted(): void {
this.pendingEvents = [];
}
// ---- Private: state transitions ----
/**
* Apply an event to the aggregate's state.
* This is the ONLY place where state changes happen.
* It must be pure: same event + same state = same new state.
*/
private applyEvent(event: BankAccountEvent, isNew: boolean): void {
switch (event.eventType) {
case 'AccountOpened':
this.state = {
accountId: event.aggregateId,
balance: event.payload.initialDeposit as number,
currency: event.payload.currency as string,
accountHolderId: event.payload.accountHolderId as string,
status: 'Active',
version: event.aggregateVersion,
};
break;
case 'MoneyDeposited':
this.state!.balance += event.payload.amount as number;
this.state!.version = event.aggregateVersion;
break;
case 'MoneyWithdrawn':
this.state!.balance -= event.payload.amount as number;
this.state!.version = event.aggregateVersion;
break;
case 'AccountClosed':
this.state!.status = 'Closed';
this.state!.version = event.aggregateVersion;
break;
case 'TransferInitiated':
this.state!.balance -= event.payload.amount as number;
this.state!.version = event.aggregateVersion;
break;
default:
// Unrecognized event type — ignore (forward compatibility)
console.warn('Unrecognized event type, skipping:', (event as any).eventType);
}
if (isNew) {
this.pendingEvents.push(event);
}
}
private ensureInitialized(): void {
if (!this.state) {
throw new Error('BankAccount aggregate is not initialized — load from events first');
}
}
private ensureActive(): void {
if (this.state?.status !== 'Active') {
throw new Error('Operation rejected: account is not active');
}
}
}
// ============================================================
// bank-account-repository.ts — Loading and saving aggregates
// ============================================================
import { EventStore } from './types';
import { BankAccount } from './bank-account-aggregate';
import { BankAccountEvent } from './bank-account-events';
export class BankAccountRepository {
constructor(private readonly eventStore: EventStore) {}
/**
* Load a BankAccount by replaying its entire event history.
* For accounts with thousands of events, use snapshots instead (see below).
*/
async load(accountId: string): Promise<BankAccount | null> {
const streamId = `BankAccount-${accountId}`;
const events = await this.eventStore.loadStream(streamId) as BankAccountEvent[];
if (events.length === 0) {
return null; // Account does not exist
}
return BankAccount.fromEvents(events);
}
/**
* Save all uncommitted events from the aggregate to the event store.
*
* The expectedVersion is the version the aggregate had when loaded.
* If another process has appended events since then, this throws a
* ConcurrencyException — the caller must reload and retry.
*/
async save(
account: BankAccount,
expectedVersion: number
): Promise<void> {
const uncommittedEvents = account.uncommittedEvents;
if (uncommittedEvents.length === 0) {
return; // Nothing to save
}
const streamId = `BankAccount-${uncommittedEvents[0].aggregateId}`;
// This throws if stream version != expectedVersion
await this.eventStore.appendToStream(streamId, uncommittedEvents, expectedVersion);
// Clear pending events — they are now durably stored
account.markEventsAsCommitted();
}
}
// ============================================================
// deposit-command-handler.ts — Command handler with retry
// ============================================================
import { BankAccountRepository } from './bank-account-repository';
interface DepositCommand {
commandId: string;
accountId: string;
amount: number;
transactionReference: string;
description?: string;
}
export class DepositCommandHandler {
constructor(private readonly repo: BankAccountRepository) {}
/**
* Handle a DepositCommand.
*
* Uses optimistic concurrency: load the aggregate at its current version,
* apply the business operation, then save. If another write has happened
* concurrently, the save throws a ConcurrencyException and we retry.
*
* Maximum 3 retries is usually enough — genuine contention on a single
* account is rare. If it isn't, consider command-level queuing per account.
*/
async handle(command: DepositCommand): Promise<void> {
const MAX_RETRIES = 3;
for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
const account = await this.repo.load(command.accountId);
if (!account) {
throw new Error(`Account not found: ${command.accountId}`);
}
const versionAtLoad = account.currentVersion;
try {
// Apply the business operation — may throw if business rules fail
account.deposit({
amount: command.amount,
transactionReference: command.transactionReference,
description: command.description,
});
// Save with optimistic concurrency check
await this.repo.save(account, versionAtLoad);
return; // Success
} catch (error: any) {
if (error.code === 'CONCURRENCY_CONFLICT' && attempt < MAX_RETRIES) {
console.warn(
`Concurrency conflict on attempt ${attempt}/${MAX_RETRIES} for account ${command.accountId}, retrying...`
);
// Brief delay before retry to reduce contention
await new Promise(r => setTimeout(r, attempt * 50));
continue;
}
throw error; // Business error or max retries exceeded
}
}
}
}
// ============================================================
// account-balance-projection.ts — Building a read model
// ============================================================
/**
* Projections (also called "read models" or "view models") listen to events
* and build optimized views for queries.
*
* This projection maintains an account_balances table that can be queried
* instantly without replaying the event stream.
*/
import { Pool } from 'pg';
import { DomainEvent } from './types';
import { BankAccountEvent } from './bank-account-events';
export class AccountBalanceProjection {
constructor(private readonly db: Pool) {}
/**
* Process an event and update the read model accordingly.
* Called by the event bus subscriber for every new event on BankAccount streams.
*
* This method must be idempotent — it may be called more than once
* for the same event if the consumer restarts.
*/
async handle(event: BankAccountEvent): Promise<void> {
switch (event.eventType) {
case 'AccountOpened':
await this.db.query(`
INSERT INTO account_balances (account_id, balance, currency, account_holder_id, status, last_updated, stream_version)
VALUES ($1, $2, $3, $4, 'Active', $5, $6)
ON CONFLICT (account_id) DO NOTHING
`, [
event.aggregateId,
event.payload.initialDeposit,
event.payload.currency,
event.payload.accountHolderId,
event.occurredAt,
event.aggregateVersion,
]);
break;
case 'MoneyDeposited':
await this.db.query(`
UPDATE account_balances
SET balance = balance + $1,
last_updated = $2,
stream_version = $3
WHERE account_id = $4
AND stream_version = $3 - 1 -- Idempotency: only apply once
`, [
event.payload.amount,
event.occurredAt,
event.aggregateVersion,
event.aggregateId,
]);
break;
case 'MoneyWithdrawn':
await this.db.query(`
UPDATE account_balances
SET balance = balance - $1,
last_updated = $2,
stream_version = $3
WHERE account_id = $4
AND stream_version = $3 - 1
`, [
event.payload.amount,
event.occurredAt,
event.aggregateVersion,
event.aggregateId,
]);
break;
case 'AccountClosed':
await this.db.query(`
UPDATE account_balances
SET status = 'Closed',
last_updated = $1,
stream_version = $2
WHERE account_id = $3
AND stream_version = $2 - 1
`, [
event.occurredAt,
event.aggregateVersion,
event.aggregateId,
]);
break;
}
}
}
// Query the read model — fast, no event replay needed
export async function getAccountBalance(
db: Pool,
accountId: string
): Promise<{ balance: number; currency: string; status: string } | null> {
const result = await db.query(
'SELECT balance, currency, status FROM account_balances WHERE account_id = $1',
[accountId]
);
return result.rows[0] ?? null;
}
Comparison and Tradeoffs

Benefits of Event Sourcing
| Benefit | Description |
|---------|-------------|
| Complete audit log | Every state change is permanently recorded with who, what, and when |
| Temporal queries | Replay events to any point in time: "what was the balance on Jan 1?" |
| Event replay | Deploy new projections and build new read models from historical events |
| Debugging | Reproduce bugs by replaying the exact event sequence that caused them |
| Integration events | Events are a natural integration mechanism for other services |
| Optimistic concurrency | Built-in, architecture-level conflict detection |
Costs of Event Sourcing
| Cost | Mitigation |
|------|-----------|
| Complexity | More moving parts than CRUD; higher cognitive load for team |
| Schema evolution | Old events cannot be changed; forward/backward compatibility required |
| Eventual consistency | Read models lag behind writes (usually milliseconds) |
| Replay performance | Loading aggregates with long histories is slow without snapshots |
| Querying difficulty | Cannot query event store like a SQL table; need projections for all queries |
Snapshots for Performance
For aggregates with long event histories, replaying thousands of events on every load is slow. The solution is snapshots: periodically save the current aggregate state as a point-in-time snapshot. On load, start from the most recent snapshot instead of event 1:
interface Snapshot {
aggregateId: string;
aggregateVersion: number;
state: Record<string, unknown>;
snapshotAt: Date;
}
// Modified load strategy with snapshot support
async function loadWithSnapshot(
accountId: string,
eventStore: EventStore,
snapshotStore: SnapshotStore
): Promise<BankAccount> {
const snapshot = await snapshotStore.getLatest(`BankAccount-${accountId}`);
let events: BankAccountEvent[];
let fromVersion = 0;
if (snapshot) {
// Start from the snapshot version — only load events after it
fromVersion = snapshot.aggregateVersion;
events = await eventStore.loadStream(
`BankAccount-${accountId}`,
fromVersion + 1 // Events after the snapshot
) as BankAccountEvent[];
const account = BankAccount.fromSnapshot(snapshot.state);
for (const event of events) {
account.applyPublicEvent(event);
}
return account;
}
// No snapshot — load full history
events = await eventStore.loadStream(`BankAccount-${accountId}`) as BankAccountEvent[];
return BankAccount.fromEvents(events);
}
// Take a snapshot every 100 events
async function maybeSnapshot(account: BankAccount, snapshotStore: SnapshotStore) {
const SNAPSHOT_INTERVAL = 100;
if (account.currentVersion % SNAPSHOT_INTERVAL === 0) {
await snapshotStore.save({
aggregateId: account.accountId,
aggregateVersion: account.currentVersion,
state: account.serializeState(),
snapshotAt: new Date(),
});
}
}
When NOT to Use CQRS / Event Sourcing
These patterns add significant complexity. Do not use them when:
- Simple CRUD is sufficient: a user management system, a content CMS, settings pages — these do not need event sourcing
- Team is unfamiliar with the patterns: the learning curve is real; introducing these in a startup's early days usually slows you down more than the patterns help
- Audit is not a business requirement: if no one cares about history, storing history is pure overhead
- Consistency is more important than flexibility: if your domain genuinely needs synchronous, strongly consistent reads immediately after writes, eventual consistency from projections is a pain point
- Small scale: a system handling 100 requests/day gains nothing from CQRS's read/write separation
The right test: does your domain have complex business logic with multiple concurrent writers, meaningful history that people need to query, and multiple consumers who need the same data in different shapes? If yes, CQRS and Event Sourcing will repay the complexity investment.
Production Considerations
The Full CQRS + Event Sourcing Request Lifecycle
This diagram shows the complete flow of a command from API request through to the read model being updated:
Fails if version 5 already exists ES-->>REPO: OK, new version=5 REPO->>BUS: publish(MoneyDeposited event) CH-->>API: 200 OK Note over API,QH: Async Projection (milliseconds later) BUS->>PROJ: MoneyDeposited event PROJ->>READ: UPDATE account_balances SET balance=550 WHERE account_id=acc-001 READ-->>PROJ: OK Note over API,QH: Read Path (Query) API->>QH: GetAccountBalance(accountId) QH->>READ: SELECT balance FROM account_balances WHERE id=acc-001 READ-->>QH: balance: 550 QH-->>API: {balance: 550, currency: "USD"}
Event Store Implementation Options
| Option | Best For | Notes |
|--------|----------|-------|
| EventStoreDB | Purpose-built event sourcing | Native projections, subscriptions, excellent tooling |
| PostgreSQL (append-only table) | Teams already using Postgres | Low ops overhead; implement optimistic concurrency with version check |
| AWS DynamoDB Streams | Serverless / AWS-native | Good at scale; streams enable Lambda-based projections |
| Kafka (log compaction OFF) | High throughput, cloud-native | Not a dedicated event store but works well at scale |
A minimal PostgreSQL event store schema:
-- The events table: append-only, never UPDATE or DELETE
CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
stream_id TEXT NOT NULL, -- e.g. 'BankAccount-acc-001'
stream_version INT NOT NULL, -- Version within this stream
event_id UUID NOT NULL UNIQUE, -- Global deduplication key
event_type TEXT NOT NULL, -- e.g. 'MoneyDeposited'
payload JSONB NOT NULL, -- Event data
occurred_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB, -- Correlation IDs, user info, etc.
-- Optimistic concurrency: (stream_id, stream_version) must be unique
CONSTRAINT uq_stream_version UNIQUE (stream_id, stream_version)
);
-- Index for loading a stream efficiently
CREATE INDEX idx_events_stream ON events (stream_id, stream_version);
-- Index for global event feed (projections subscribing to all events)
CREATE INDEX idx_events_occurred ON events (occurred_at, id);
-- Append events with optimistic concurrency check
-- This fails with a unique constraint violation if stream_version already exists
INSERT INTO events (stream_id, stream_version, event_id, event_type, payload)
SELECT
$1 AS stream_id,
$2 AS stream_version,
$3 AS event_id,
$4 AS event_type,
$5 AS payload
WHERE NOT EXISTS (
SELECT 1 FROM events
WHERE stream_id = $1 AND stream_version >= $2
);
Schema Evolution
Events are immutable — you cannot change an event that has already been stored. But your application code evolves. Strategies for handling this:
1. Upcasters: transform old event shapes to the current format at read time. Store a V1 event, read it through an upcaster that adds new fields with defaults before passing to the aggregate.
2. Weak schema: use JSONB with default values in your application code — missing fields use sensible defaults, extra fields are ignored.
3. Versioned events: MoneyDeposited_V1, MoneyDeposited_V2 — explicit versioning. Cleaner but verbose.
Observability
Key metrics for event-sourced systems:
- Projection lag: time between an event being written and the projection updating the read model (alert if > 5 seconds under normal load)
- Event replay time: how long it takes to rebuild a projection from scratch (important for disaster recovery planning)
- Concurrency conflict rate:
ConcurrencyExceptioncount per minute — high rates indicate hot aggregates needing per-aggregate command queuing - Event store growth rate: events are never deleted; plan storage accordingly
Conclusion
CQRS and Event Sourcing are powerful patterns that address fundamental limitations of mutable-state, CRUD-oriented systems. CQRS separates the write model — with its focus on validation, business rules, and correctness — from the read model, which can be freely denormalized and optimized for query patterns. Event Sourcing makes the history of state changes the ground truth, enabling audit logs, temporal queries, and replay-driven projections as first-class features.
The implementation in TypeScript we built covers the full lifecycle: creating events, applying them to aggregate state, saving to an event store with optimistic concurrency, and building read models through projections. These are not abstract patterns — they are the architecture of systems like Stripe's payment ledger, Shopify's order management, and GitHub's repository events feed.
The tradeoffs are real. Schema evolution requires discipline. Eventual consistency in projections requires your team and users to accept that the "current balance" shown in the UI may be milliseconds behind the latest write. Rebuilding aggregates from events requires snapshot strategies for long-lived entities.
Use these patterns when your domain warrants them: complex business logic, meaningful history, multiple consumers of the same data, and concurrency requirements that make simple CRUD unreliable. Start with CQRS alone — the read/write separation — before adding Event Sourcing. Get comfortable with projections before tackling event store implementation. The patterns are modular; you do not have to adopt everything at once.
*Working on an event-sourced system or considering these patterns for your team? Reach out on [LinkedIn](https://linkedin.com/in/toc-am-b301373b4/) or drop a comment below. Follow AmtocSoft Tech Insights for more deep-dives into advanced distributed systems design.*
Enjoyed this post? Follow AmtocSoft for AI tutorials from beginner to professional.
☕ Buy Me a Coffee | 🔔 YouTube | 💼 LinkedIn | 🐦 X/Twitter
Comments
Post a Comment