Date: 2026-05-29
Time: 14:19
The StreamJoinProcessor in stream-join-processor/streamjoinprocessor.py implements a pure event-time model. Every decision — buffering, matching, expiring, and dropping — is driven by timestamps embedded in the events themselves, not by when the system processes them.
The watermark is the central coordination mechanism. It's initialized to negative infinity (line 81) and advances monotonically as events arrive:
new_watermark = max(self._watermark, event.timestamp) # line 114
This single line captures the core assumption: the watermark is the system's best guess at "how far along are we in event time." It never goes backward. When the watermark advances, it triggers expiration of buffered events whose timestamps have fallen below the window cutoff:
cutoff = self._watermark - self._window.duration # line 166
Events older than this cutoff are removed from buffers. For outer joins (LEFT or FULL_OUTER), unmatched events emit miss results before expiring — this is how the system produces "no match found" outputs.
DDIA Ch. 11 identifies three fundamental tensions in stream processing around event time. This implementation addresses each one explicitly.
The watermark answers: *can we safely say that no more events with timestamp < W will arrive?* In this implementation, the answer is conservative — the watermark is simply the max timestamp seen. A real distributed system (like Apache Flink or Google Dataflow) would compute watermarks from source metadata, acknowledging that the max-seen heuristic can be wrong when events arrive out of order across partitions.
The advance_time() method at line 155 is the escape hatch for this. It lets an external orchestrator push the watermark forward without receiving an actual event — essential for handling idle streams or end-of-input:
def advance_time(self, timestamp: float) -> list[JoinResult]:
if timestamp <= self._watermark:
return []
self._watermark = timestamp
...
The tests use this heavily. testleftjoinmiss calls p.advancetime(110.0) to force expiration and miss emission — simulating a trigger that fires based on watermark progress rather than event arrival.
The allowed_lateness parameter (line 74) creates a grace period below the watermark. The late-event check at line 109:
if event.timestamp < self._watermark - self._allowed_lateness:
self._stats.late_events_dropped += 1
return []
This is the tradeoff DDIA describes: wider lateness windows mean more complete results but higher memory costs (buffers must be kept longer) and higher latency before results are finalized. The test at line 82 (testoutoforderwithinlateness) demonstrates the sweet spot — an event at t=103 arrives after the watermark has advanced to 105, but with allowedlateness=3.0, the cutoff is 102, so the event is accepted and successfully joins.
Contrast this with testlateeventdropped (line 96): allowedlateness=2.0, watermark at 110, cutoff at 108, and an event at t=107 is dropped. The system has decided completeness isn't worth the cost of keeping that buffer state around.
This implementation uses an eager trigger model for matches and a watermark-driven trigger for misses. When a new event arrives and matches something in the opposite buffer, the result is emitted immediately (lines 125–148). But miss detection — knowing that no match *will ever come* — requires the watermark to advance past the window boundary, which happens either through new event arrival or explicit advance_time calls.
A processing-time system would replace all of this with wall-clock logic: "buffer events for 10 real-world seconds, then emit whatever matched." The TimeWindow.contains() check (line 53) would compare time.time() deltas instead of event timestamps. There would be no watermark, no allowed lateness, and no advance_time.
The advantage is simplicity — no out-of-order headaches, no watermark computation, no lateness decisions. The disadvantage is non-determinism and incorrectness under load. If the system slows down (GC pause, network delay, backpressure), events that belong together in event-time can land in different processing-time windows. A 2-second network hiccup could cause a click that happened 3 seconds after an impression to appear 13 seconds later in processing time, falling outside a 10-second window and producing a false miss.
Notice how the tests in teststreamjoinprocessor.py never call time.time(). Every timestamp is explicit: makeevent("L", "k1", 100.0). This is a direct benefit of the event-time model — the join logic is fully deterministic and testable regardless of how fast or slow the test machine runs. A processing-time implementation would need sleeps or clock mocking, and would still be flaky under CI load.
The event-time model requires buffering, and testbuffercleanup (line 158) validates that this buffering is bounded. After processing 1000 events with a 5-second window, the buffer contains at most ~10 events per side. The watermark-driven expiration at line 166 is what makes this possible — without it, buffers would grow without bound.
The TumblingWindowAggregator (tested at line 141) extends this pattern by grouping join results into fixed time buckets. This is the "tumbling window" from DDIA Ch. 11 — non-overlapping, fixed-duration intervals where results accumulate until the window closes. The advance_time call triggers window closure, again driven by event-time watermarks rather than wall clocks.
stream-join-processor/streamjoinprocessor.py:expireevents — The core watermark-driven expiration logic that decides when buffered events are finalized or emitted as missespartitioned-log/partitioned_log.py — The Kafka-style log this processor would consume from; understanding how Message.timestamp is assigned at produce time (line 143 uses time.time()) reveals the event-time/ingestion-time boundarywatermark-computation-in-distributed-joins — This implementation derives watermarks from a single stream; in a partitioned system, the watermark must be the minimum across all partitions, which is significantly harderstream-join-processor/streamjoinprocessor.py:advance_time — How external watermark injection enables idle-stream handling and explicit trigger semanticsunbundled-database/unbundled_database.py — The CDC-driven derived systems use LSN-based ordering (a form of processing-time ordering) rather than event-time watermarks; comparing the two approaches clarifies when each model is appropriatewatermark-monotonic-advance — The watermark in StreamJoinProcessor is monotonically non-decreasing: it is set via max(self.watermark, event.timestamp) and advancetime rejects timestamps at or below the current watermarklate-event-drop-is-hard-cutoff — Events with timestamp < watermark - allowedlateness are unconditionally dropped and counted in stats.lateevents_dropped; there is no secondary path to recover themmiss-emission-requires-expiration — Outer join miss results (left-with-null-right or right-with-null-left) are only emitted during expireevents, never at event arrival time; an unmatched event stays buffered until the watermark advances past the window boundaryjoin-determinism-from-event-time — The join processor's output is fully determined by the sequence of (streamname, key, value, timestamp) inputs and advancetime calls, with no dependency on wall-clock time, making it deterministically testablebuffer-bounded-by-window-plus-lateness — Buffer size is bounded by the number of distinct keys with events within window.duration of the current watermark; expireevents garbage-collects everything below that cutoff on every event or time advance