Date: 2026-05-29
Time: 06:22
event-sourcing-store/testeventstore.pyThis is the test suite for an event sourcing store — a reference implementation of the event sourcing pattern from DDIA (Chapter 11, stream processing / turning the database inside out). It validates twelve distinct capabilities of the EventStore system, organized as a specification: append/read, stream isolation, optimistic concurrency, batch writes, projections, temporal queries, snapshots, live projections, disk persistence, and global reads.
The file doubles as executable documentation. Each test is numbered and labeled to match a feature spec, making it easy to trace requirements to coverage.
on_opened, on_deposited, on_withdrawn
Pure functions with signature (state: dict, event) -> None that mutate a state dictionary in place. They model a bank account aggregate: open sets balance from initialbalance, deposit adds, withdrawal subtracts. State is keyed by event.streamid, so a single state dict can track multiple accounts.
HANDLERS dictMaps event type strings to handler functions. Used by reconstruct_state() for temporal queries — it's the "fold function" definition for replaying events into state.
make_projection(name, store)Factory that creates a Projection, registers all three handlers via .when(), and returns it. Avoids repeating the registration boilerplate across tests.
seedaccount(store)Appends four canonical events to "account:1": open (balance 0), deposit 100, withdraw 30, deposit 50. Final balance: 120. This is the shared fixture — most tests start from this known state.
Arrange-Act-Assert — every test follows this strictly. seedaccount is the shared arrange step; assertions are always on concrete values (120, 130, etc.), not relative comparisons.
Numbered specification tests — comments like # --- 1. Basic append and read --- tie each test to a feature requirement. This is a lightweight spec-by-example approach.
No fixtures/setup classes — each test creates its own EventStore(), giving full isolation. The only shared state is the helper functions, which are stateless.
Event sourcing domain model — the bank account is the classic event sourcing example. The test implicitly validates the core ES invariant: current state = fold(initial, events).
Imports from event_store:
EventStore — the append-only event logProjection — catch-up projection that polls for new eventsLiveProjection — projection that auto-updates on append (likely via callback/subscription)ConcurrencyConflict — exception for optimistic concurrency violationsreconstruct_state — replays events up to a position to rebuild past stateExternal: pytest for test running and pytest.raises for exception assertions. tempfile and os for the disk persistence test only.
Nothing imports this file — it's a leaf in the dependency graph.
The tests exercise five distinct flows through the event store:
1. Write path: append() / appendbatch() → events get sequential eventid values, streamversion increments, globalposition increments.
2. Read path: readstream(streamid) returns only that stream's events; readall(fromposition=N) returns the global log filtered by position.
3. Projection path: Projection.catch_up() reads events from self.position forward, applies handlers, advances position, returns count processed. Subsequent calls only process new events (incremental).
4. Snapshot path: proj.savesnapshot() persists state + position → proj.loadsnapshot() restores them → catch_up() resumes from the snapshot position.
5. Live projection path: LiveProjection.catchup() does initial replay, then subsequent append() calls on the store automatically invoke the projection's handlers — no explicit catchup() needed.
testappendandread asserts events[0].eventid == 1 and events[-1].event_id == 4."a" never appear in read_stream("b").append(..., expected_version=V) succeeds only if current stream version equals V. Stale versions raise ConcurrencyConflict.catch_up() twice without new events processes zero events the second time (tested implicitly — first call processes 4, second processes 1 new event).reconstructstate(..., upto=N) replays exactly the first N events, producing the state as it was at that point.EventStore reproduces the same stream contents and version.Minimal — this is a test file, so errors are the things being tested:
testoptimisticconcurrency uses pytest.raises(ConcurrencyConflict) to verify the store rejects stale writes. This is the only explicit error path tested.testdiskpersistence uses a try/finally to clean up the temp file, preventing test pollution.event-sourcing-store/event_store.py — The implementation behind all five classes/functions imported here; understanding how LiveProjection auto-updates (callback vs. observer pattern) is keyevent-sourcing-store/eventstore.py:reconstructstate — The temporal query engine; how it handles the up_to parameter and whether it short-circuits or reads then truncateslive-projection-subscription-mechanism — How LiveProjection receives events automatically after catch_up() — likely an observer/callback registered on the store's append pathevent-sourcing-store/test_verify.py — The companion test file; likely contains verification or property-based tests for the same storesnapshot-storage-format — Where and how savesnapshot/loadsnapshot persist projection state — in-memory dict on the store, or a separate filees-event-ids-are-stream-scoped — Event IDs are sequential per-stream starting at 1, not globally unique identifierses-global-position-tracks-all-streams — global_position increments across all streams and equals the total number of events appended to the storees-live-projection-auto-updates — LiveProjection automatically applies new events on append without requiring an explicit catch_up() call after the initial onees-projection-catch-up-is-incremental — Projection.catch_up() only processes events after its current position, returning the count of newly processed eventses-persistence-uses-jsonl — Disk persistence uses a JSONL (newline-delimited JSON) file format, loaded in full on EventStore construction