Date: 2026-05-29
Time: 11:14
Projection.catch_up — Poll-based event replaycatchup is the polling mechanism for a Projection. It reads all events the projection hasn't seen yet from the EventStore, applies registered handlers to update derived state, and optionally takes snapshots at configured intervals. This is the pull-based counterpart to LiveProjection.on_event (which is push-based via the subscriber callback).
It exists because projections are read models — materialized views derived from the event log. The event log is the source of truth; projections must periodically or on-demand replay new events to stay current.
Preconditions:
self.store is an EventStore with a functioning readall method.self.position reflects the eventid of the last event this projection processed (0 if none).when() must accept (dict, Event) and mutate the dict in place.Postconditions:
self.position equals the eventid of the last event in the store (if any new events existed), or is unchanged (if none).self._state reflects all handler mutations from newly processed events.self.eventssince_snapshot is updated and potentially reset to 0 if a snapshot was triggered.Invariant: self._position only advances forward, never backwards.
None beyond self. The method takes no arguments — it implicitly reads the gap between its own _position and the store's current tail.
Returns int — the count of events processed this call. Zero means the projection was already caught up. The caller can use this to decide whether derived state changed.
1. Query the gap: Call self.store.readall(fromposition=self.position + 1) to get every event the projection hasn't seen. The +1 skips the event already processed at the current position.
2. For each event:
event.eventtype, call it with (self.state, event). Events with no registered handler are acknowledged but not handled — the position still advances.self.position = event.eventid. This happens unconditionally, even for unhandled event types — the projection tracks global position, not just "interesting" events.count and eventssince_snapshot.snapshotinterval is set and the events-since-snapshot counter has reached it, call self.save_snapshot() and reset the counter.3. Return the total count.
self._state: Handlers modify the state dict in place. This is the whole point.self._position: Advances the high-water mark.self.eventssince_snapshot: Tracks progress toward the next snapshot.savesnapshot(): Which deep-copies self.state and writes it to self.store.snapshots (a dict monkey-patched onto the store instance via setattr semantics in save_snapshot).save_snapshot mutates store state.None. If a handler raises, the exception propagates uncaught. This means a poison event (one whose handler throws) will halt the projection mid-catch-up. The position will have been advanced up to but not including the failing event's eventid assignment — actually, looking more carefully: position is set *after* the handler call on the same iteration, so if the handler raises, position is not advanced past the previous event. This means retrying catchup will re-encounter the same poison event and fail again — a classic poison-message problem.
However, there's a subtlety: position is updated unconditionally for events that have no handler. The position assignment is outside the if block. So if event N has a handler that raises, position was set to event N-1's eventid on the prior iteration. On retry, readall(from_position=N) will return event N again, and the handler will throw again. The projection is stuck.
# Typical: build a read model and catch up on demand
projection = Projection("order-totals", store)
projection.when("OrderPlaced", lambda state, e: ...)
projection.when("OrderCancelled", lambda state, e: ...)
# Initial catch-up from beginning
projection.catch_up()
# Later, after more events were appended
new_count = projection.catch_up()
if new_count > 0:
print(f"Processed {new_count} new events")
# With snapshots for faster restart
projection = Projection("order-totals", store, snapshot_interval=100)
projection.load_snapshot() # restore from last snapshot
projection.catch_up() # only replay events since snapshot
The caller's obligation is to call catch_up whenever it wants fresh state. For automatic updates, use LiveProjection instead.
EventStore.readall: The data source. Returns events filtered by fromposition.self.savesnapshot: Writes snapshot to self.store.snapshots, which is an ad-hoc attribute on the store (not part of EventStore.init_).when().1. eventid values are monotonically increasing — the fromposition=self.position + 1 logic assumes eventid acts as a sequential cursor. If IDs have gaps, events could be skipped.
2. Handlers mutate _state in place — the signature Callable[[dict, Event], None] doesn't enforce mutation; a handler that returns a new dict instead of mutating would silently do nothing.
3. readall returns events ordered by eventid — if they came back out of order, position tracking would break (earlier events would be skipped on the next catch-up).
4. No concurrent callers — there's no locking. Two threads calling catch_up simultaneously would corrupt state.