File: partitioned-log/partitioned_log.py

Date: 2026-05-29

Time: 09:01

partitioned-log/partitioned_log.py

Purpose

This file implements a Kafka-style partitioned append-only log — the core abstraction from DDIA Chapter 11 (Stream Processing). It models the key components of a message broker: topics split into partitions, producers that write messages with key-based or round-robin routing, consumers that track read positions via offsets, and consumer groups that coordinate partition assignment across members. The Broker class ties it all together, adding persistence, retention enforcement, and log compaction.

This is a teaching implementation — everything runs in-process with Python lists as the storage backend, no networking, no replication. The goal is to demonstrate the *semantics* of a partitioned log, not production performance.

Key Components

Data Classes

Message — The unit of data in the log. Carries a key (used for partition routing and compaction), value (the payload), timestamp, and optional headers. The topic, partition, and offset fields are set by Topic.append at write time — they're None on construction and populated as metadata once the message lands in a partition.

RecordMetadata — The acknowledgment returned to a producer after a successful write. Contains the coordinates needed to locate the message: topic, partition, offset, timestamp.

Topic

Owns the partitioned storage for a single named topic. Each partition is a list[Message] with a separate baseoffsets tracker that advances when messages are truncated or compacted away.

Producer

Routes messages to partitions and writes them through the broker.

Partitioning strategy in send():

1. Explicit partition — caller specifies partition=N

2. Key-based — MD5 hash of the key mod partition count (deterministic routing, same key always hits the same partition)

3. Round-robin — a per-topic counter cycles through partitions for keyless messages

send_batch() is a convenience that iterates over a list of dicts, calling send() for each. flush() is a no-op stub (there's no buffering to flush).

Consumer

Tracks read position per assigned (topic, partition) pair.

ConsumerGroup

Manages partition assignment across consumers using round-robin rebalancing.

rebalance() collects all partitions from all subscribed topics, sorts consumers by ID, and distributes partitions round-robin. This fires on every addconsumer or removeconsumer call. After reassignment, each consumer's offsets are re-initialized.

Broker

Central coordinator. Owns topic registry, consumer groups, committed offsets, and optional disk persistence.

Patterns

Dependencies

Imports: All stdlib — hashlib (key hashing), json/os (persistence), time (timestamps), uuid (consumer IDs), dataclasses, typing.

Imported by: testpartitionedlog.py and test_smoke.py — the module has no downstream production consumers, it's a standalone reference implementation.

Flow

Write path: Producer.send() → resolve partition → construct MessageTopic.append() (assigns offset, stamps metadata) → Broker.persistmessage() (append JSONL if persistence enabled) → return RecordMetadata.

Read path: Consumer.poll() → iterate assigned (topic, partition) pairs → Topic.read() (linear scan from current offset) → advance consumer's offset cursor → optionally auto-commit → return messages.

Group coordination: Consumer.subscribe()ConsumerGroup.addconsumer()rebalance() (round-robin assignment) → Consumer.init_offsets() (resolve committed or reset offset per partition).

Retention/compaction: Broker.enforceretention() iterates partitions, truncates by size then by time. Broker.compact() deduplicates by key within each partition. Both advance base_offsets to maintain offset continuity.

Invariants

Error Handling

Minimal and fail-fast:

Topics to Explore

Beliefs