File: change-data-capture/cdc.py

Date: 2026-05-29

Time: 11:06

change-data-capture/cdc.py

Purpose

This file implements an in-memory Change Data Capture (CDC) system — a core pattern from DDIA Chapter 11 (Stream Processing). It owns the responsibility of intercepting every mutation to a simple relational database and recording it as an immutable, ordered event in an append-only log. Downstream consumers then derive secondary data structures (materialized views, search indexes) from that log rather than querying the primary database directly.

This is the "unbundled database" idea in miniature: writes go to one place, and all derived state is built asynchronously from the change stream.

Key Components

ChangeEvent (dataclass)

The unit of the log. Each event captures a before/after snapshot of a row, tagged with table name, primary key, operation type, a monotonic sequence number, and a wall-clock timestamp. The before/after fields follow the convention: INSERT has before=None, DELETE has after=None, UPDATE has both.

CDCLog

The append-only event log. Internally a list with a monotonically increasing nextseq counter. It provides:

CDCDatabase

A minimal in-memory relational database that emits change events on every mutation. Tables are defined with createtable(name, columns, primarykey) and stored as {pkvalue: rowdict} maps. Every insert, update, and delete call appends a corresponding ChangeEvent to the shared CDCLog. Read operations (select, scan) do not generate events.

CDCConsumer

A pull-based consumer that reads new events from the log. It maintains its own _position (cursor), supports handler registration filtered by table and/or operation type, and processes events in order via poll(). The seek() method allows rewinding or fast-forwarding the cursor.

MaterializedView

A derived table that mirrors a source table's state by replaying the CDC stream. It supports an optional transform function that can reshape or filter rows — if the transform returns None, the row is excluded from the view (effectively a filter). Refreshed explicitly via refresh().

SearchIndex

A keyword inverted index maintained from the CDC stream. Tokenization is whitespace-split + lowercase on specified columns. On UPDATE, it fully removes old tokens before adding new ones — correct but not optimized for unchanged columns.

create_snapshot

A standalone function that synthesizes the current database state as a list of synthetic INSERT events (all with sequence_number=-1) plus the current log position. This is the bootstrap mechanism: a new consumer can load the snapshot, then seek() to the returned position and start tailing the live log.

Patterns

Event Sourcing / Log-centric architecture: The CDC log is the source of truth for all changes. Derived data structures are projections of that log, not independent copies.

Pull-based consumption: Consumers own their cursor position and explicitly call poll() or refresh(). There's no push/callback registration with the log itself — this avoids coupling and makes replay trivial.

Before/after snapshots: Every change event captures the full row state before and after the mutation. This makes events self-contained — a consumer doesn't need to query the source database to understand what changed.

Log compaction: CDCLog.compact() implements Kafka-style log compaction — keep only the latest event per key. This bounds log growth while preserving the ability to reconstruct current state.

Snapshot + tail: create_snapshot implements the standard pattern for bootstrapping new consumers without replaying the entire log from the beginning.

Dependencies

Imports: Standard library only — typing, enum, dataclass, datetime. No external dependencies, which is consistent with the project's approach of implementing concepts from scratch.

Imported by: testcdc.py and testertest_cdc.py — the unit tests and a generated test wrapper, respectively.

Flow

A typical lifecycle:

1. Create a CDCDatabase, define tables via create_table.

2. Perform mutations (insert, update, delete) — each appends a ChangeEvent to the internal CDCLog.

3. Create downstream consumers: a CDCConsumer with handlers, a MaterializedView, or a SearchIndex, all pointing to db.cdc_log.

4. Periodically call poll() / refresh() on each consumer — they read new events from their last position, process them, and advance their cursor.

5. Optionally call cdc_log.compact() to reclaim space.

6. To bootstrap a new consumer late, call create_snapshot() to get synthetic events + position, process the synthetics, then seek() to the position and start polling.

Invariants

Error Handling

Errors are raised eagerly at the database mutation layer:

There is no error handling at the consumer layer — if a handler raises, the exception propagates out of poll() / refresh(). There's no dead-letter queue, retry, or error event. Consumers that crash mid-poll will re-process all events from their last committed position on the next call (at-least-once semantics).

Topics to Explore

Beliefs