File: event-sourcing-store/event_store.py

Date: 2026-05-29

Time: 07:58

event-sourcing-store/event_store.py

Purpose

This file implements an event sourcing system — one of the core patterns from DDIA Chapter 11 (Stream Processing). It owns three responsibilities:

1. Durable, append-only event storage — events are immutable facts, never updated or deleted

2. Projections — derived read models built by replaying events through registered handlers

3. Snapshots — periodic state checkpoints so projections don't have to replay from the beginning every time

This is the kind of infrastructure that backs systems like bank account ledgers or order histories, where you store *what happened* rather than *current state*, and derive current state on demand.

Key Components

Event (dataclass)

The fundamental unit of the system. Each event captures:

Events are *structurally* immutable (dataclass), though Python doesn't enforce this at runtime since frozen=True isn't set.

EventStore

The append-only log. Its contract:

Persistence is optional: pass persist_path and events are appended as newline-delimited JSON. On startup, the file is replayed to rebuild in-memory state.

Projection

A pull-based read model. You register handlers via when(eventtype, handler) where the handler signature is (state: dict, event: Event) -> None (mutates state in place). Calling catchup() replays unprocessed events from the store.

Key details:

LiveProjection(Projection)

A push-based variant that subscribes to the store's _subscribers list. Events are processed synchronously inside append(), so the projection is always up-to-date after every write. Same snapshot logic applies.

reconstructstate(store, streamid, handlers, up_to)

A standalone utility that replays a single stream's events through handlers to produce state at a point in time. This is the classic event-sourcing "rehydrate an aggregate" operation — useful for ad-hoc queries or temporal debugging without maintaining a persistent projection.

Patterns

Dependencies

Imports: All stdlib — copy, json, os, dataclasses, datetime, typing. No external dependencies.

Imported by: testeventstore.py and test_verify.py — the test suites exercise the store, projections, snapshots, and concurrency behavior.

Flow

Write path

1. Caller invokes append() or append_batch()

2. If expected_version is set, current stream version is checked → ConcurrencyConflict on mismatch

3. A new Event is created with the next sequential event_id

4. The event is appended to events and its index is recorded in streams[stream_id]

5. If persistence is enabled, the event is serialized to NDJSON and appended to disk

6. All _subscribers are notified synchronously

Read path (Projection catch-up)

1. catchup() calls readall(fromposition=self.position + 1)

2. For each event, if a handler is registered for its eventtype, the handler mutates state

3. position advances to the event's eventid

4. If a snapshot interval is configured and the threshold is reached, savesnapshot() deep-copies state into store.snapshots

Recovery path

1. EventStore._init detects an existing file at persistpath

2. loadfromfile reads each NDJSON line, reconstructs Event objects, and rebuilds events and _streams

3. Projections can then loadsnapshot() (if snapshots were persisted — currently they're only in-memory on the store) and catchup() from there

Invariants

Error Handling

Topics to Explore

Beliefs