Function: catchup in event-sourcing-store/eventstore.py

Date: 2026-05-29

Time: 11:14

Projection.catch_up — Poll-based event replay

Purpose

catchup is the polling mechanism for a Projection. It reads all events the projection hasn't seen yet from the EventStore, applies registered handlers to update derived state, and optionally takes snapshots at configured intervals. This is the pull-based counterpart to LiveProjection.on_event (which is push-based via the subscriber callback).

It exists because projections are read models — materialized views derived from the event log. The event log is the source of truth; projections must periodically or on-demand replay new events to stay current.

Contract

Preconditions:

Postconditions:

Invariant: self._position only advances forward, never backwards.

Parameters

None beyond self. The method takes no arguments — it implicitly reads the gap between its own _position and the store's current tail.

Return Value

Returns int — the count of events processed this call. Zero means the projection was already caught up. The caller can use this to decide whether derived state changed.

Algorithm

1. Query the gap: Call self.store.readall(fromposition=self.position + 1) to get every event the projection hasn't seen. The +1 skips the event already processed at the current position.

2. For each event:

3. Return the total count.

Side Effects

Error Handling

None. If a handler raises, the exception propagates uncaught. This means a poison event (one whose handler throws) will halt the projection mid-catch-up. The position will have been advanced up to but not including the failing event's eventid assignment — actually, looking more carefully: position is set *after* the handler call on the same iteration, so if the handler raises, position is not advanced past the previous event. This means retrying catchup will re-encounter the same poison event and fail again — a classic poison-message problem.

However, there's a subtlety: position is updated unconditionally for events that have no handler. The position assignment is outside the if block. So if event N has a handler that raises, position was set to event N-1's eventid on the prior iteration. On retry, readall(from_position=N) will return event N again, and the handler will throw again. The projection is stuck.

Usage Patterns


# Typical: build a read model and catch up on demand
projection = Projection("order-totals", store)
projection.when("OrderPlaced", lambda state, e: ...)
projection.when("OrderCancelled", lambda state, e: ...)

# Initial catch-up from beginning
projection.catch_up()

# Later, after more events were appended
new_count = projection.catch_up()
if new_count > 0:
    print(f"Processed {new_count} new events")

# With snapshots for faster restart
projection = Projection("order-totals", store, snapshot_interval=100)
projection.load_snapshot()  # restore from last snapshot
projection.catch_up()       # only replay events since snapshot

The caller's obligation is to call catch_up whenever it wants fresh state. For automatic updates, use LiveProjection instead.

Dependencies

Assumptions Not Enforced by Types

1. eventid values are monotonically increasing — the fromposition=self.position + 1 logic assumes eventid acts as a sequential cursor. If IDs have gaps, events could be skipped.

2. Handlers mutate _state in place — the signature Callable[[dict, Event], None] doesn't enforce mutation; a handler that returns a new dict instead of mutating would silently do nothing.

3. readall returns events ordered by eventid — if they came back out of order, position tracking would break (earlier events would be skipped on the next catch-up).

4. No concurrent callers — there's no locking. Two threads calling catch_up simultaneously would corrupt state.