Function: LiveProjection in event-sourcing-store/event_store.py

Date: 2026-05-29

Time: 11:10

LiveProjection — Auto-Updating Event Projection

Purpose

LiveProjection is a subclass of Projection that subscribes to an EventStore's event stream so its derived state updates synchronously and automatically whenever new events are appended. The base Projection class requires explicit catch_up() calls to process new events — LiveProjection eliminates that by hooking into the store's publish-subscribe mechanism.

This is the push-based counterpart to Projection's pull-based model. It trades control over when processing happens for the guarantee that the projection is always current immediately after any append() or append_batch() call returns.

Contract

Preconditions:

Postconditions:

Invariants:

Parameters

| Parameter | Type | Description |

|-----------|------|-------------|

| name | str | Identifier for this projection, used as the key in the snapshot store (store.snapshots[name]). Must be unique per store if snapshots are used. |

| store | EventStore | The event store to subscribe to. The projection mutates store.subscribers and optionally store.snapshots. |

| snapshot_interval | Optional[int] | If set, auto-saves a snapshot every N events processed. None disables auto-snapshotting. A value of 0 would disable it too (falsy check). |

Return Value

Constructor returns the LiveProjection instance. The class exposes state via inherited properties: .state (the derived dict) and .position (last processed event ID).

Algorithm

Construction:

1. Calls Projection._init to set up handlers, state, position = 0, and snapshot tracking.

2. Appends self.onevent to the store's _subscribers list — this is the subscription.

On each event (onevent):

1. Idempotency guard: If event.eventid <= self.position, bail out. This prevents double-processing if catch_up() was called before subscribing, or if events somehow arrive out of order.

2. Dispatch: Look up event.eventtype in handlers. If a handler exists, call it with (self.state, event) — the handler mutates state in place.

3. Advance position: Set position = event.eventid regardless of whether a handler matched.

4. Snapshot check: Increment eventssince_snapshot. If we've hit the interval, save a snapshot and reset the counter.

Side Effects

Error Handling

There is none. If a handler raises, the exception propagates up through EventStore.append() to the caller. The event is already persisted and appended to _events before subscribers are notified, so:

This is actually a reasonable failure mode for event sourcing, but callers of store.append() need to be aware that subscriber exceptions surface there.

Usage Patterns


store = EventStore()
projection = LiveProjection("order_totals", store, snapshot_interval=100)

# Register handlers BEFORE any appends
projection.when("OrderPlaced", lambda state, e: state.update(
    {e.stream_id: state.get(e.stream_id, 0) + e.data["amount"]}
))

# Optionally catch up on historical events first
projection.load_snapshot()   # restore from last snapshot
projection.catch_up()        # replay events since snapshot

# From here, state is always current
store.append("order-1", "OrderPlaced", {"amount": 50})
assert projection.state["order-1"] == 50  # immediately visible

Caller obligations:

Dependencies