Date: 2026-05-29
Time: 07:58
event-sourcing-store/event_store.pyThis file implements an event sourcing system — one of the core patterns from DDIA Chapter 11 (Stream Processing). It owns three responsibilities:
1. Durable, append-only event storage — events are immutable facts, never updated or deleted
2. Projections — derived read models built by replaying events through registered handlers
3. Snapshots — periodic state checkpoints so projections don't have to replay from the beginning every time
This is the kind of infrastructure that backs systems like bank account ledgers or order histories, where you store *what happened* rather than *current state*, and derive current state on demand.
Event (dataclass)The fundamental unit of the system. Each event captures:
event_id — a monotonically increasing global sequence number (1-based)stream_id — groups events into logical aggregates (e.g., "order-123")event_type — a string tag like "ItemAdded" that projections dispatch ondata — the event payload as a plain dicttimestamp — wall-clock time of appendmetadata — optional sidecar dict for correlation IDs, user context, etc.Events are *structurally* immutable (dataclass), though Python doesn't enforce this at runtime since frozen=True isn't set.
EventStoreThe append-only log. Its contract:
append(streamid, eventtype, data, ...) — writes a single event. Supports optimistic concurrency via expected_version: if the stream's current version doesn't match, raises ConcurrencyConflict. Returns the created Event.appendbatch(streamid, events, ...) — atomically appends multiple events to one stream. The concurrency check happens once *before* any writes, so either all events land or none do (within a single process — there's no WAL or transaction log backing this).readstream(streamid, fromversion) — returns events for a stream, filtered by eventid >= fromversion. Note: fromversion filters on global event_id, not stream-local position.readall(fromposition) — global ordered read, the foundation for projections' catch_up.streamversion(streamid) — returns the count of events in a stream (used for optimistic concurrency checks).Persistence is optional: pass persist_path and events are appended as newline-delimited JSON. On startup, the file is replayed to rebuild in-memory state.
ProjectionA pull-based read model. You register handlers via when(eventtype, handler) where the handler signature is (state: dict, event: Event) -> None (mutates state in place). Calling catchup() replays unprocessed events from the store.
Key details:
position tracks the last processed eventid, so catch_up is idempotent and incremental_state is a plain dict — the projection's handlers define its shapesnapshotinterval — every N events, state is deep-copied into store.snapshotsLiveProjection(Projection)A push-based variant that subscribes to the store's _subscribers list. Events are processed synchronously inside append(), so the projection is always up-to-date after every write. Same snapshot logic applies.
reconstructstate(store, streamid, handlers, up_to)A standalone utility that replays a single stream's events through handlers to produce state at a point in time. This is the classic event-sourcing "rehydrate an aggregate" operation — useful for ad-hoc queries or temporal debugging without maintaining a persistent projection.
EventStore is the system of record; projections are disposable and rebuildable.append, reads go through projections or reconstruct_state. The read and write models are separate, though they live in the same process.expected_version implements OCC. No locks; the writer declares what version it thinks the stream is at, and the store rejects the write if someone else got there first._subscribers is a simple pub/sub mechanism. LiveProjection uses it; you could also attach any Callable[[Event], None].Imports: All stdlib — copy, json, os, dataclasses, datetime, typing. No external dependencies.
Imported by: testeventstore.py and test_verify.py — the test suites exercise the store, projections, snapshots, and concurrency behavior.
1. Caller invokes append() or append_batch()
2. If expected_version is set, current stream version is checked → ConcurrencyConflict on mismatch
3. A new Event is created with the next sequential event_id
4. The event is appended to events and its index is recorded in streams[stream_id]
5. If persistence is enabled, the event is serialized to NDJSON and appended to disk
6. All _subscribers are notified synchronously
1. catchup() calls readall(fromposition=self.position + 1)
2. For each event, if a handler is registered for its eventtype, the handler mutates state
3. position advances to the event's eventid
4. If a snapshot interval is configured and the threshold is reached, savesnapshot() deep-copies state into store.snapshots
1. EventStore._init detects an existing file at persistpath
2. loadfromfile reads each NDJSON line, reconstructs Event objects, and rebuilds events and _streams
3. Projections can then loadsnapshot() (if snapshots were persisted — currently they're only in-memory on the store) and catchup() from there
eventid = len(self.events) + 1, so IDs are 1-based and gap-free within a process lifetime.streamversion returns the *count* of events in a stream, not the latest eventid. A stream with 3 events has version 3 regardless of their global event IDs.append_batch does the concurrency check once, then appends all events. If the process crashes mid-batch, the NDJSON file could contain a partial batch — there's no write-ahead log or transaction boundary on disk.position is compared against eventid, which is global. This means readall(fromposition=self._position + 1) correctly resumes from where the projection left off across all streams.ConcurrencyConflict — the only custom exception. Raised by append and appendbatch when expectedversion doesn't match. This is the primary safety mechanism for concurrent writers.persistevent or loadfrom_file fails (disk full, corrupt JSON, permissions), the exception propagates directly to the caller.onevent or in the append loop will short-circuit notification of remaining subscribers and bubble up.event-sourcing-store/testeventstore.py — Covers the full API surface including concurrency conflicts, snapshots, and live projectionsevent-sourcing-store/eventstore.py:reconstructstate — The temporal query function; compare how it differs from projection-based reads for point-in-time statebatch-atomicity-gap — The NDJSON persistence has no transaction boundary for append_batch; worth comparing with the write-ahead-log implementation in write-ahead-log/wal.pychange-data-capture/cdc.py — CDC and event sourcing are complementary patterns from DDIA Ch. 11; compare how they solve the "derived data" problem differentlysnapshot-persistence — Snapshots are stored in-memory on store._snapshots and lost on restart; a production system would persist them separatelyevent-ids-are-1-based-sequential — event_id starts at 1 and increments by 1 for each appended event, with no gaps within a process lifetimestream-version-is-event-count — streamversion() returns the number of events in a stream, not the latest eventid, and this is what expected_version checks againstbatch-append-not-crash-safe — append_batch can leave a partial batch on disk if the process crashes mid-write, since events are individually appended to the NDJSON file without a transaction markerlive-projection-is-synchronous — LiveProjection processes events inside the append() call path via the subscriber mechanism, meaning the caller blocks until all projections have updatedsnapshots-are-in-memory-only — Projection snapshots are stored in store._snapshots (a dict attached at runtime), not persisted to disk, so they are lost on process restart