Kafka Advanced Patterns: Partitioning, Consumer Groups, and Exactly-Once Semantics

Kafka Advanced Patterns: Partitioning, Consumer Groups, and Exactly-Once Semantics

Hero image

Introduction

Kafka at scale is not the same system as Kafka in tutorials. The tutorial version has one producer, one consumer, and no concern for ordering, durability, or throughput. The production version has partition strategies that determine your maximum parallelism, consumer group rebalancing that causes processing gaps, exactly-once semantics with non-trivial performance costs, and schema evolution that breaks consumers in subtle ways.

This post is for engineers operating Kafka in production — or designing systems that will. It covers the decisions that determine how your Kafka deployment scales: partition count and key selection strategy, consumer group semantics and the rebalancing problem, idempotent producers and transactional exactly-once processing, compacted topics for event sourcing, consumer lag monitoring, and the patterns that make Kafka reliable at hundreds of thousands of events per second.

Partitions: The Unit of Parallelism

Everything in Kafka's performance model flows from partitions. A topic's partitions are the unit of both parallelism and ordering. One consumer instance per partition is the maximum parallelism — you cannot parallelize more than the partition count. Ordering is guaranteed only within a partition, not across partitions.

Partition count is set at topic creation and is not reducible without data loss (only increase is possible). Choosing the right count requires estimating your peak throughput and your consumer processing rate:

Required partitions = ceil(peak_events_per_second / events_per_consumer_per_second)

Example:
Peak: 50,000 events/second
Consumer throughput: 2,000 events/second (complex processing)
Required partitions: ceil(50,000 / 2,000) = 25
Recommended: 30 (25% headroom for burst)

Under-partition and you cannot scale horizontally. Over-partition and you increase broker overhead (each partition has a log segment file, index files, and in-memory state), increase consumer group rebalancing time, and waste resources when utilization is low. For most workloads, 12-30 partitions is the right range. For very high throughput, 50-100.

The partition key determines which partition a message goes to (via consistent hash). This choice has significant consequences:

from confluent_kafka import Producer

producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'acks': 'all',           # wait for all replicas — durability
    'enable.idempotence': True,  # exactly-once at producer level
    'max.in.flight.requests.per.connection': 5,  # idempotence requirement
    'compression.type': 'lz4',  # 2-4× throughput improvement
    'linger.ms': 5,          # batch for 5ms — improves throughput
    'batch.size': 65536,     # 64KB batch size
})

# Strategy 1: partition by user_id — ordering per user, balanced load
producer.produce(
    topic='user_events',
    key=user_id.encode(),    # hash(user_id) % partition_count → partition
    value=json.dumps(event),
    callback=delivery_callback
)

# Strategy 2: partition by tenant_id — ordering per tenant
# Risk: hot partition if one tenant has 10× the volume

# Strategy 3: null key — round-robin, maximum throughput, no ordering
producer.produce(
    topic='audit_logs',
    key=None,                # round-robin across all partitions
    value=json.dumps(log_entry)
)

producer.flush()  # wait for all outstanding messages to deliver

Hot partition problem: if your key space is skewed (one user generating 90% of events), most messages route to the same partition. That partition's consumer instance becomes the bottleneck while others are idle. Monitor partition offset lag per-partition — uneven lag reveals hot partitions. Solution: composite keys (user_id + event_type) or a salted key (user_id + str(random.randint(0, 4))).

Architecture diagram

Consumer Groups: Parallelism and the Rebalancing Problem

A consumer group is a set of consumer instances that collectively consume a topic. Kafka assigns each partition to exactly one consumer in the group. When you scale to N consumers, Kafka assigns ceil(partitions/N) partitions per consumer.

from confluent_kafka import Consumer, KafkaException
import signal

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'order_processor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,   # manual commit = at-least-once
    'max.poll.interval.ms': 300000,  # 5 minutes for long processing
    'session.timeout.ms': 45000,
    'heartbeat.interval.ms': 15000,
    'partition.assignment.strategy': 'cooperative-sticky',  # reduces rebalance impact
})

consumer.subscribe(['order_events'])

def process_and_commit(consumer, message):
    """At-least-once processing: commit only after successful processing."""
    try:
        event = json.loads(message.value())
        process_order(event)  # your business logic

        # Commit this specific offset — not auto-commit
        consumer.commit(
            offsets=[TopicPartition(
                message.topic(),
                message.partition(),
                message.offset() + 1  # +1: next offset to read
            )],
            asynchronous=False  # synchronous for correctness
        )
    except ProcessingError as e:
        # Don't commit — message will be redelivered
        logger.error("processing_failed", 
                    offset=message.offset(),
                    error=str(e))
        send_to_dead_letter_queue(message)
        # Commit to DLQ offset so we don't block the partition
        consumer.commit(offsets=[...], asynchronous=False)

running = True
signal.signal(signal.SIGTERM, lambda s, f: globals().update(running=False))

try:
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            raise KafkaException(msg.error())
        process_and_commit(consumer, msg)
finally:
    consumer.close()  # triggers final offset commit and group leave

The rebalancing problem: when a consumer joins or leaves the group (deployment, crash, scaling), Kafka triggers a rebalance — all consumers stop, all partition assignments are revoked, and new assignments are distributed. During rebalance, processing stops. With the default eager assignment strategy, every consumer stops. With cooperative-sticky, only partitions being moved are briefly paused — a significant improvement for large groups.

Rebalance triggering conditions to avoid:
- max.poll.interval.ms exceeded: consumer calls poll() less frequently than configured (processing is too slow). Increase max.poll.interval.ms or reduce batch size.
- session.timeout.ms exceeded: consumer fails to send heartbeat (GC pause, I/O block). Increase session.timeout.ms but be aware this delays detection of crashed consumers.
- Consumer crash: unavoidable, but health checks and readiness probes in Kubernetes minimize undetected crashes.

Exactly-Once Semantics: The Transaction API

Kafka supports exactly-once semantics (EOS) for read-process-write workflows: read from one topic, process, write to another topic — atomically. Either both the write and the offset commit succeed, or neither does.

from confluent_kafka import Producer, Consumer, KafkaTransaction

# Producer with transactional ID (unique per producer instance)
producer = Producer({
    'bootstrap.servers': 'kafka:9092',
    'transactional.id': f'payment-processor-{instance_id}',
    'enable.idempotence': True,
    'acks': 'all',
})

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'payment_processor',
    'enable.auto.commit': False,
    'isolation.level': 'read_committed',  # only read committed records
})

producer.init_transactions()  # required before any transactions

def process_with_eos(message):
    """Exactly-once: consume + produce + commit are atomic."""
    event = json.loads(message.value())
    result = process_payment(event)

    try:
        producer.begin_transaction()

        # Produce the result event
        producer.produce(
            topic='payment_results',
            key=event['order_id'].encode(),
            value=json.dumps(result)
        )

        # Commit the input offset within the same transaction
        # This atomically marks the input as consumed AND writes the output
        producer.send_offsets_to_transaction(
            offsets=[TopicPartition(
                message.topic(),
                message.partition(),
                message.offset() + 1
            )],
            group_metadata=consumer.consumer_group_metadata()
        )

        producer.commit_transaction()  # atomic: output written + offset committed

    except KafkaException as e:
        producer.abort_transaction()   # neither output written nor offset committed
        raise

EOS has a performance cost: approximately 20-30% throughput reduction compared to at-least-once, due to the two-phase commit protocol. Use EOS when:
- Your output topic drives financial or inventory state
- Duplicate writes cause incorrect results (double-charge, double-count)
- Your downstream systems don't have their own idempotency mechanisms

Don't use EOS when:
- Processing is idempotent (duplicate-safe) — at-least-once is sufficient
- You're writing to external systems (databases, APIs) — Kafka transactions don't span external systems
- Throughput is the primary constraint

Log Compaction: Kafka as an Event Store

Log compaction retains only the most recent message per key. A compacted topic is a perpetually updated changelog — useful for materializing the current state of an entity from its event history.

Use cases:
- Database CDC (Change Data Capture): each row update is a message keyed by row ID. Compacted topic = current DB snapshot
- Configuration store: service configs keyed by service name. New consumers read the latest config without replaying years of history
- Event sourcing state: order state keyed by order ID. Latest message per order = current order state

# Topic creation with compaction
from confluent_kafka.admin import AdminClient, NewTopic, ConfigResource

admin = AdminClient({'bootstrap.servers': 'kafka:9092'})

# Create compacted topic
admin.create_topics([
    NewTopic(
        topic='user_profiles',
        num_partitions=12,
        replication_factor=3,
        config={
            'cleanup.policy': 'compact',          # compaction enabled
            'min.cleanable.dirty.ratio': '0.1',   # compact when 10% dirty
            'segment.ms': '3600000',              # roll segment every hour
            'delete.retention.ms': '86400000',    # tombstones kept 1 day
        }
    )
])

# Delete a record: produce a tombstone (null value for the key)
producer.produce(
    topic='user_profiles',
    key=user_id.encode(),
    value=None  # tombstone: this key will be deleted during compaction
)

A compaction-friendly pattern for event sourcing: the primary event topic uses time-based retention (retain 7 days). A compacted "state" topic derived from the events topic contains only current state. New consumers bootstrap from the compacted topic (current state) rather than replaying the entire event history.

Schema Evolution with Schema Registry

Schema compatibility is the silent killer of Kafka-based systems. Producer deploys a new message format. Consumer is still running the old code. Consumer throws a deserialization error. Processing stops.

Confluent Schema Registry enforces compatibility rules at produce time:

from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import SerializationContext, MessageField

schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})

# Schema v1
user_event_schema_v1 = """
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp", "type": "long"}
  ]
}"""

# Schema v2: backward compatible (new field with default)
user_event_schema_v2 = """
{
  "type": "record",
  "name": "UserEvent",
  "fields": [
    {"name": "user_id", "type": "string"},
    {"name": "event_type", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "session_id", "type": ["null", "string"], "default": null}
  ]
}"""
# v1 consumers reading v2 messages: session_id absent → they use null default
# v2 consumers reading v1 messages: session_id absent → use null default
# Backward compatible: old consumers can read new messages

avro_serializer = AvroSerializer(
    schema_registry_client,
    user_event_schema_v2,
    conf={'auto.register.schemas': True}  # register schema if not exists
)

Schema Registry enforces compatibility at registration time — not at consume time. Attempting to register an incompatible schema fails immediately, blocking the deployment. The compatibility modes:
- BACKWARD: new schema can read old data (new consumers can process old messages)
- FORWARD: old schema can read new data (old consumers can process new messages)
- FULL: both directions (safest, most restrictive)

Rules for backward-compatible Avro schema evolution:
1. Add fields only with defaults (null or a sensible value)
2. Never remove required fields (remove with caution, add default first)
3. Never change field types
4. Never rename fields (add new, deprecate old)

Dead Letter Queue Patterns

Not all messages can be processed. Poison pills — malformed messages, schema violations, messages that trigger unrecoverable errors — will block partition processing if not handled explicitly.

DLQ_TOPIC = "order_events.dlq"

def safe_process(consumer, message):
    """Process with DLQ for unrecoverable errors."""
    try:
        event = json.loads(message.value())
        validate_schema(event)           # raises SchemaError on invalid
        result = process_order(event)    # raises ProcessingError on failure

        consumer.commit(asynchronous=False)
        return result

    except (json.JSONDecodeError, SchemaError) as e:
        # Non-retriable: schema/format errors won't fix themselves
        send_to_dlq(producer, message, error=str(e), error_type="schema_error")
        consumer.commit(asynchronous=False)  # commit to move past the bad message

    except ProcessingError as e:
        if e.is_retriable and retries_remaining > 0:
            # Retriable: exponential backoff via retry topics
            retry_topic = f"order_events.retry.{retry_count}"
            producer.produce(
                topic=retry_topic,
                headers={"retry_count": str(retry_count + 1)},
                key=message.key(),
                value=message.value()
            )
        else:
            send_to_dlq(producer, message, error=str(e), error_type="processing_error")
        consumer.commit(asynchronous=False)

def send_to_dlq(producer, original_message, error: str, error_type: str):
    """Preserve original message with error context in DLQ."""
    dlq_record = {
        "original_topic": original_message.topic(),
        "original_partition": original_message.partition(),
        "original_offset": original_message.offset(),
        "original_key": original_message.key().decode() if original_message.key() else None,
        "original_value": original_message.value().decode(errors='replace'),
        "error_message": error,
        "error_type": error_type,
        "failed_at": datetime.utcnow().isoformat(),
    }
    producer.produce(topic=DLQ_TOPIC, value=json.dumps(dlq_record))
    producer.flush()

The retry topic pattern uses a sequence of topics (retry.1, retry.2, retry.3) with increasing delay — a poor-man's exponential backoff without a dedicated retry queue service. Each retry topic has a consumer that delays processing based on the retry count in headers.

After N retries, messages land in the DLQ. A separate process monitors the DLQ: alerts on DLQ growth rate, provides a UI for inspecting failed messages, and allows manual replay once the root cause is fixed.

Kafka Streams: Stateful Stream Processing

For transformations that go beyond simple message routing — aggregations, joins, windowed computations — Kafka Streams provides a DSL for stateful processing on top of Kafka, without an external stream processor like Flink.

Kafka Streams applications are distributed by default: each instance processes a subset of partitions. State stores (RocksDB-backed by default) are partitioned alongside the input topics. Fault tolerance is automatic: state stores are backed by changelog topics.

# Kafka Streams in Python via faust (Faust is a Python stream processing library built on Kafka)
import faust
from datetime import timedelta

app = faust.App(
    'order_analytics',
    broker='kafka://kafka:9092',
    value_serializer='json',
)

class OrderEvent(faust.Record):
    order_id: str
    user_id: str
    amount_cents: int
    status: str
    timestamp: float

# Input topic
orders_topic = app.topic('order_events', value_type=OrderEvent)

# Output topic
order_summary_topic = app.topic('order_summaries')

# Stateful table: count + total per user
user_order_counts = app.Table('user_order_counts', default=int)
user_order_totals = app.Table('user_order_totals', default=int)

@app.agent(orders_topic)
async def process_orders(orders):
    """Count and sum orders per user."""
    async for order in orders.group_by(OrderEvent.user_id):
        if order.status == 'completed':
            user_order_counts[order.user_id] += 1
            user_order_totals[order.user_id] += order.amount_cents

            await order_summary_topic.send(
                key=order.user_id,
                value={
                    'user_id': order.user_id,
                    'order_count': user_order_counts[order.user_id],
                    'total_cents': user_order_totals[order.user_id],
                    'updated_at': time.time()
                }
            )

# Windowed aggregation: 5-minute tumbling window
@app.agent(orders_topic)
async def windowed_revenue(orders):
    """5-minute revenue aggregation."""
    async for window, order in orders.tumbling(timedelta(minutes=5)).items():
        # window = (start_time, end_time)
        # aggregate revenue per 5-minute window
        pass

Kafka Streams state is stored in RocksDB on the local disk of each instance, with a Kafka changelog topic as the source of truth. When an instance fails and is replaced, the new instance rehydrates its state from the changelog topic — at ~100MB/second for typical workloads, this can take minutes for large state stores. Pre-built state stores ("standby replicas") reduce this to near-zero failover time.

Consumer Lag Monitoring and Alerting

Consumer lag (the gap between the latest produced offset and the consumer's committed offset) is the primary operational metric for Kafka consumers.

from confluent_kafka.admin import AdminClient
from confluent_kafka import TopicPartition

def get_consumer_lag(group_id: str, topic: str) -> dict[int, int]:
    """Returns {partition: lag} for a consumer group on a topic."""
    admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
    consumer = Consumer({'bootstrap.servers': 'kafka:9092', 'group.id': 'lag-checker'})

    # Get high watermarks (latest offsets)
    metadata = admin.list_topics(topic=topic)
    partitions = [
        TopicPartition(topic, p)
        for p in metadata.topics[topic].partitions
    ]

    # Get end offsets (latest written)
    end_offsets = consumer.get_watermark_offsets(partitions[0])

    # Get committed offsets for the group
    committed = admin.list_consumer_group_offsets(
        [ConsumerGroupTopicPartitions(group_id, partitions)]
    )

    lag = {}
    for partition in partitions:
        end_offset = end_offsets[1]  # high watermark
        committed_offset = committed.result()[group_id].topic_partitions[partition].offset
        lag[partition.partition] = max(0, end_offset - committed_offset)

    return lag

Alert thresholds for consumer lag:
- Warning: lag > 10,000 messages and growing (consumer not keeping up)
- Critical: lag > 100,000 messages (significant backlog building)
- Emergency: lag growth rate > 5,000 messages/minute sustained for 10 minutes

The standard tool for Kafka lag monitoring is Burrow (LinkedIn's open-source Kafka consumer lag monitoring service) or Prometheus kafka_consumer_group_lag metric via the Kafka JMX exporter.

Comparison visual

Kafka Cluster Operations: Topic Management and Replication

Operational knowledge that every engineer working with Kafka should have:

Replication factor and ISR (In-Sync Replicas): with replication factor 3, each partition has a leader and two follower replicas. The ISR is the set of replicas caught up to the leader. With acks=all, producers wait for all ISR members to acknowledge — if one replica falls behind and leaves the ISR, acks=all only waits for the remaining ISR members.

# Check partition state: leader, replicas, ISR
kafka-topics.sh --bootstrap-server kafka:9092 \
    --describe --topic order_events
# Topic: order_events Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
# Topic: order_events Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
# Under-replication: Isr count < replication factor = potential data loss risk

# Increase partitions (only increase, never decrease)
kafka-topics.sh --bootstrap-server kafka:9092 \
    --alter --topic order_events \
    --partitions 30
# WARNING: this changes the hash routing for existing keys

# Check consumer group offsets and lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
    --describe --group order_processor
# GROUP          TOPIC         PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order_proc     order_events  0          48291           48291           0
# order_proc     order_events  1          48100           48291           191  ← lag

# Reset consumer offset (for replay or skip)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
    --group order_processor \
    --topic order_events \
    --reset-offsets --to-earliest --execute  # replay from beginning

Partition leadership rebalancing: after a broker restart, partition leaders may be concentrated on the recovered broker (or absent from it). Run kafka-leader-election.sh to redistribute leadership evenly. Uneven leadership → uneven network/CPU load on brokers.

Under-replicated partitions: the most important broker metric. kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions > 0 means one or more partitions don't have the configured replication factor in the ISR. This is a data durability alert — a second broker failure could cause data loss.

Production Configuration Reference

The configurations that matter most for production Kafka:

# Producer: durability + throughput balance
PRODUCER_CONFIG = {
    'bootstrap.servers': 'kafka-0:9092,kafka-1:9092,kafka-2:9092',
    'acks': 'all',                    # ISR must acknowledge (durability)
    'enable.idempotence': True,       # deduplication at broker
    'max.in.flight.requests.per.connection': 5,
    'retries': 2147483647,            # retry indefinitely (idempotence safe)
    'delivery.timeout.ms': 120000,    # 2-minute delivery window
    'compression.type': 'lz4',        # fast compression, good ratio
    'batch.size': 65536,              # 64KB batches
    'linger.ms': 10,                  # 10ms batching window
    'buffer.memory': 67108864,        # 64MB in-memory buffer
}

# Consumer: at-least-once with manual commit
CONSUMER_CONFIG = {
    'bootstrap.servers': 'kafka-0:9092,kafka-1:9092,kafka-2:9092',
    'group.id': 'my-consumer-group',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000,   # 5min for slow processing
    'max.poll.records': 500,          # batch size per poll
    'fetch.min.bytes': 1024,          # wait for 1KB before returning
    'fetch.max.wait.ms': 500,         # or 500ms, whichever first
    'partition.assignment.strategy': 'cooperative-sticky',
    'isolation.level': 'read_committed',  # skip uncommitted transactions
}

Conclusion

Kafka's power and complexity are inseparable. The partition model that enables horizontal scaling also requires upfront capacity planning. The consumer group model that enables parallelism also introduces the rebalancing problem. The exactly-once semantics that ensure correctness also impose throughput costs.

The patterns in this post cover the decisions you'll face operating Kafka at scale: partition key selection (ordering vs. even distribution), cooperative rebalancing (minimize stop-the-world pauses), EOS only when idempotency is absent, log compaction for state materialization, schema registry for safe evolution, and consumer lag as the primary operational metric.

Kafka in 2026 remains the de facto standard for high-throughput event streaming. Its operational complexity is real — but with the right configuration choices and monitoring, it becomes manageable.

The key decisions at each layer: partition count (plan for peak, 20-30% headroom), partition key (ordering vs. even distribution — rarely both), consumer group strategy (cooperative-sticky to minimize rebalance pain), durability level (acks=all + idempotence for financial data, lower durability for analytics), and monitoring (consumer lag + under-replicated partitions as the two essential metrics). Get these right at the start, and Kafka's capacity for scale — hundreds of thousands of events per second, retained for days or weeks — becomes a reliable foundation rather than an operational burden.

Sources


Enjoyed this post? Follow AmtocSoft for AI tutorials from beginner to professional.

Buy Me a Coffee | 🔔 YouTube | 💼 LinkedIn | 🐦 X/Twitter

Comments

Popular posts from this blog

29 Million Secrets Leaked: The Hardcoded Credentials Crisis

What is an LLM? A Beginner's Guide to Large Language Models

What Is Voice AI? TTS, STT, and Voice Agents Explained