Date: 2026-05-29
Time: 11:10
LiveProjection — Auto-Updating Event ProjectionLiveProjection is a subclass of Projection that subscribes to an EventStore's event stream so its derived state updates synchronously and automatically whenever new events are appended. The base Projection class requires explicit catch_up() calls to process new events — LiveProjection eliminates that by hooking into the store's publish-subscribe mechanism.
This is the push-based counterpart to Projection's pull-based model. It trades control over when processing happens for the guarantee that the projection is always current immediately after any append() or append_batch() call returns.
Preconditions:
store must be a live EventStore instance (the projection registers a callback on it)..when() *before* events start flowing, or those early events will be silently skipped — there's no replay mechanism triggered by handler registration.Postconditions:
self.onevent is in store._subscribers.store.append() returns, this projection's state and position reflect that event (if a handler was registered for it).Invariants:
position is monotonically increasing — the event.eventid <= self._position guard ensures idempotency._position tracks the last *seen* event, not the last *handled* event. Events with no matching handler still advance the position.| Parameter | Type | Description |
|-----------|------|-------------|
| name | str | Identifier for this projection, used as the key in the snapshot store (store.snapshots[name]). Must be unique per store if snapshots are used. |
| store | EventStore | The event store to subscribe to. The projection mutates store.subscribers and optionally store.snapshots. |
| snapshot_interval | Optional[int] | If set, auto-saves a snapshot every N events processed. None disables auto-snapshotting. A value of 0 would disable it too (falsy check). |
Constructor returns the LiveProjection instance. The class exposes state via inherited properties: .state (the derived dict) and .position (last processed event ID).
Construction:
1. Calls Projection._init to set up handlers, state, position = 0, and snapshot tracking.
2. Appends self.onevent to the store's _subscribers list — this is the subscription.
On each event (onevent):
1. Idempotency guard: If event.eventid <= self.position, bail out. This prevents double-processing if catch_up() was called before subscribing, or if events somehow arrive out of order.
2. Dispatch: Look up event.eventtype in handlers. If a handler exists, call it with (self.state, event) — the handler mutates state in place.
3. Advance position: Set position = event.eventid regardless of whether a handler matched.
4. Snapshot check: Increment eventssince_snapshot. If we've hit the interval, save a snapshot and reset the counter.
store._subscribers at construction — adds a bound method reference. There is no unsubscribe mechanism; the projection stays subscribed for the lifetime of the store.self._state on every matching event — handlers modify the dict in place.store._snapshots when snapshot interval is reached — deep-copies state into the store.store.append() — the callback executes within the append call stack. A slow handler blocks the append from returning. An exception in onevent would propagate up through append() and leave the store in a partially-notified state (some subscribers called, others not).There is none. If a handler raises, the exception propagates up through EventStore.append() to the caller. The event is already persisted and appended to _events before subscribers are notified, so:
position was NOT updated (it crashes before line self.position = event.event_id).catch_up() call would reprocess the failed event successfully (assuming the handler was fixed or the data issue resolved).This is actually a reasonable failure mode for event sourcing, but callers of store.append() need to be aware that subscriber exceptions surface there.
store = EventStore()
projection = LiveProjection("order_totals", store, snapshot_interval=100)
# Register handlers BEFORE any appends
projection.when("OrderPlaced", lambda state, e: state.update(
{e.stream_id: state.get(e.stream_id, 0) + e.data["amount"]}
))
# Optionally catch up on historical events first
projection.load_snapshot() # restore from last snapshot
projection.catch_up() # replay events since snapshot
# From here, state is always current
store.append("order-1", "OrderPlaced", {"amount": 50})
assert projection.state["order-1"] == 50 # immediately visible
Caller obligations:
loadsnapshot() + catchup() after handler registration to process the backlog — the live subscription only handles *new* events appended after construction.Projection (base class) — provides handlers, state, position, catchup(), savesnapshot(), loadsnapshot().EventStore._subscribers — the pub-sub list. This is a plain list[Callable], not a formal observer pattern.Event — the dataclass consumed by handlers. eventid is assumed to be monotonically increasing (assigned as len(self.events) + 1 in the store).