Date: 2026-05-29
Time: 14:13
MultiLeaderClusterMultiLeaderCluster is a simulation harness for multi-leader (multi-master) replication, the architecture described in DDIA Chapter 5 where multiple nodes independently accept writes and asynchronously replicate changes to each other. It owns a set of ReplicaNode instances, wires them together according to a chosen replication topology, and drives the sync protocol that propagates pending writes between nodes.
The class exists to make the replication topology and conflict resolution strategy configurable at the cluster level, so callers can write to any node directly and then call sync() to simulate an asynchronous replication round — without manually wiring up which node talks to which.
Preconditions:
node_ids must be non-empty and contain unique IDs (duplicates silently overwrite earlier nodes in the dict).strategy is CUSTOMMERGE, then mergefn must be a valid callable with the signature (key, localvalue, remotevalue, localts, remotets) -> mergedvalue. This is not enforced at construction — it will blow up at applyremote_change time when the first conflict occurs.Postconditions:
sync(), all pending changes from every node have been delivered to their replication targets (all peers for ALLTOALL, next neighbor for RING).syncuntilconverged() returns successfully, all_converged() is True.Invariants:
nodeorder is a list, not derived from dict key order), which matters for RING topology where the neighbor relationship is positional.sync() is snapshot-isolated within a round: it collects all pending changes *before* delivering any, so a write propagated in round N doesn't cascade further within the same round.| Parameter | Type | Default | Meaning |
|-----------|------|---------|---------|
| node_ids | list[str] | — | Identifiers for each replica node. Order defines the ring for RING topology. |
| strategy | ConflictStrategy | LASTWRITEWINS | How concurrent writes to the same key are resolved. LWW compares (timestamp, node_id) tuples lexicographically. |
| mergefn | Optional[Callable] | None | User-supplied merge function, required when strategy is CUSTOMMERGE. Ignored otherwise. |
| topology | Topology | ALLTOALL | Replication graph shape. ALLTOALL sends every change to every other node per round. RING sends only to the next node in the list. |
Edge cases:
sync() returns 0, all_converged() returns True.nodeids would create a cluster with no nodes; sync() returns 0, allconverged() returns True (falls through the <= 1 check). Not explicitly guarded.| Method | Returns | Conditions |
|--------|---------|------------|
| node(id) | ReplicaNode | Raises KeyError if id is unknown. |
| sync() | int — number of change deliveries attempted | Counts every applyremotechange call, including idempotent no-ops (where the change was already seen). |
| allconverged() | bool | True when all nodes have identical key sets and matching (value, timestamp, istombstone) tuples for every key. |
| syncuntilconverged() | int — rounds taken | Raises RuntimeError if not converged after max_rounds. |
sync()1. Snapshot pending changes: Iterates nodeorder and calls getpendingchanges() on each node. This drains and returns each node's outbound log, so the same change is never sent twice across rounds.
2. Deliver based on topology:
applyremotechange. A change from node A reaches B and C in the same round.(i+1) % len). A change from node 0 reaches node 1 this round, then node 1 will forward it to node 2 *next* round (because applyremotechange enqueues it in node 1's pending list, but that list was already drained for this round).3. The return count is the number of applyremotechange calls, not the number of *accepted* changes. Idempotent skips inside applyremotechange (via the _seen set) are still counted.
all_converged()1. Takes the first node as the reference.
2. Checks that all other nodes have the same set of keys.
3. For each key, compares (value, timestamp, istombstone) — tuple indices (0, 1, 3). Notably skips index 2 (originnode_id), because different nodes may record different origins for the same resolved value depending on arrival order.
syncuntilconverged()Calls sync() then allconverged() in a loop up to maxrounds times. For ALLTOALL, one round suffices if there are no custom-merge cascades. For RING with N nodes, convergence takes at least N−1 rounds because each hop only reaches the next neighbor.
sync() drains every node's pending queue — calling getpendingchanges() is destructive (it clears _pending). Calling sync() twice without intervening writes produces zero propagations on the second call.applyremotechange mutates node state: updates store, clock, seen, pending, and conflictlog on each destination node.pending with a synthetic timestamp and canonical origin, meaning the merged value will propagate further in subsequent rounds.node() raises KeyError for unknown node IDs (dict lookup, no wrapping).syncuntilconverged() raises RuntimeError if convergence isn't reached within max_rounds.strategy is CUSTOMMERGE and mergefn is None, a TypeError will propagate from within applyremotechange when the first conflict triggers merge_fn(...).node_ids — duplicate IDs, empty lists, or non-string values are accepted silently.
cluster = MultiLeaderCluster(["dc1", "dc2", "dc3"])
# Write to different leaders concurrently
cluster.node("dc1").put("user:1", "Alice")
cluster.node("dc2").put("user:1", "Bob") # concurrent write → conflict
# Replicate
cluster.sync()
# Check convergence
assert cluster.all_converged()
# Or just sync until done (useful for RING topology)
cluster = MultiLeaderCluster(["a", "b", "c"], topology=Topology.RING)
cluster.node("a").put("x", 1)
rounds = cluster.sync_until_converged() # will take 2 rounds for 3 nodes
Caller obligations:
sync() after writes to propagate them. Writes are purely local until synced.mergefn when using CUSTOMMERGE, or the first conflict will crash.ReplicaNode (same module): the actual replica with Lamport clock, store, conflict detection, and the _seen deduplication set.ConflictStrategy and Topology enums (same module): configuration discriminators.ConflictRecord dataclass (same module): returned by applyremotechange on conflict, accumulated in each node's conflictlog.multi-leader-replication/multileader.py:ReplicaNode.applyremotechange — The core conflict detection and resolution logic; understanding how seen provides idempotency and how LWW vs custom merge differmulti-leader-replication/testmultileader.py — Test cases that exercise concurrent writes, ring convergence timing, tombstone propagation, and custom merge functionsring-topology-convergence — How many sync rounds RING requires for N nodes, and how applyremotechange re-enqueuing into _pending enables store-and-forward propagationmulti-leader-replication/multileader.py:ReplicaNode.record_seen — The idempotency mechanism that prevents duplicate application of the same change, keyed on (timestamp, origin) pairscustom-merge-cascade-behavior — How CUSTOM_MERGE generates synthetic changes with new timestamps and canonical origins, and whether this can cause infinite convergence loopsmulti-leader-sync-snapshot-isolation — sync() drains all nodes' pending queues before delivering any changes, preventing intra-round cascading; a change propagated in round N cannot cascade further until round N+1multi-leader-ring-convergence-rounds — RING topology requires at least N−1 sync() rounds to fully propagate a single-source change across N nodes, because each round advances the change by exactly one hopmulti-leader-convergence-skips-origin — allconverged() compares (value, timestamp, istombstone) but intentionally ignores originnodeid (tuple index 2), since different arrival orders can record different origins for the same resolved statemulti-leader-sync-count-includes-noops — The count returned by sync() includes idempotent no-op deliveries (changes already in the target's _seen set), so it reflects replication *attempts*, not accepted changesmulti-leader-custom-merge-requires-merge-fn — Constructing a cluster with CUSTOMMERGE strategy and mergefn=None is accepted silently, but will raise TypeError at the first actual conflict during sync()