File: stream-join-processor/streamjoinprocessor.py

Date: 2026-05-29

Time: 09:06

streamjoinprocessor.py — Time-Windowed Stream Join

Purpose

This file implements a stream join processor — the core primitive for combining two real-time event streams by matching events on a shared key within a time window. It corresponds to the stream-stream join concept from DDIA Chapter 11, where Kleppmann discusses how stream processors join events from two input streams (e.g., matching ad impressions with clicks, or correlating user activity across services).

The module owns two responsibilities:

1. Stream joining — buffering events from two named streams and emitting joined results when matching keys arrive within the configured time window.

2. Tumbling window aggregation — grouping join results into fixed, non-overlapping time windows and applying an aggregation function when windows close.

Key Components

Data Classes

JoinType Enum

Three join semantics:

TimeWindow

A simple duration-based window. contains(t1, t2) checks whether two timestamps are within duration seconds of each other (absolute difference). This is a symmetric check — it doesn't matter which event arrived first.

StreamJoinProcessor

The central class. Key API:

| Method | Contract |

|--------|----------|

| process_event(event) | Buffer the event, probe the opposite buffer for matches, advance watermark, expire old events. Returns join results produced by this event. |

| advance_time(timestamp) | Manually advance the watermark without an event. Triggers expiration and miss emission. Returns any miss results. |

| get_results() | Drain and return the accumulated results list (destructive read). |

| stats | Read-only access to JoinStats. |

| buffersize | Returns (leftcount, right_count) tuple of currently buffered events. |

TumblingWindowAggregator

A post-join aggregation stage. Groups JoinResult objects into non-overlapping windows by (key, windowstart) and applies a user-supplied aggregatefn when advance_time closes a window. Window boundaries are computed by floor-dividing the timestamp by window size — standard tumbling window semantics.

Patterns

Event-time processing with watermarks. The processor tracks a watermark (the highest event timestamp seen) and uses it to decide when events are "expired" and when late events should be dropped. This is the same model as Apache Flink/Beam watermarks, simplified to a single-source max-timestamp heuristic.

Probe-on-arrival join. When an event arrives, it is buffered on its side and then probed against the opposite buffer. This is a nested loop join scoped to matching keys — efficient because the buffers are keyed by join key (defaultdict(list)), so only events with the same key are compared.

Deferred miss emission. Outer-join misses aren't emitted eagerly. Instead, each buffered event carries a matched flag. When the event expires (falls below the watermark minus window duration), if matched is still False and the join type allows it, a miss result is emitted. This correctly handles the case where a match might arrive late.

Destructive drain on getresults. The results list accumulates across calls to processevent and advancetime, but getresults swaps it out and resets to empty. This is a pull-based consumption model — the caller decides when to consume.

Dependencies

Imports: Standard library only — typing, enum, dataclasses, collections.defaultdict. No external dependencies.

Imported by: The test file teststreamjoinprocessor.py and a tester validation file testerteststreamjoin_processor.py, both exercising the join processor's behavior.

Flow

Event Processing (process_event)


1. Identify side (left or right) → increment stats
2. Late-event check: if timestamp < watermark - allowed_lateness → drop, return []
3. Compute new watermark (max of current and event timestamp)
4. Wrap event in _BufferedEvent, append to appropriate buffer
5. Probe opposite buffer for same key:
   - For each candidate, check TimeWindow.contains()
   - On match: create JoinResult, mark both sides matched, emit
6. Advance watermark to new value
7. Call _expire_events() to clean old entries and emit misses
8. Return slice of results produced during this call

The "return slice" trick (capturing resultsbefore = len(self.results) then returning self.results[resultsbefore:]) avoids copying and lets each process_event call return only its own results while still accumulating into the shared list.

Expiration (expireevents)


1. Compute cutoff = watermark - window_duration
2. For each buffer (left, right):
   - For each key, partition events into expired vs remaining
   - For expired + unmatched events: emit miss if join type allows
   - Replace buffer entry with remaining, or delete if empty

Tumbling Aggregation (TumblingWindowAggregator)


1. add(result) → floor-divide timestamp to window_start, append to (key, window_start) bucket
2. advance_time(t) → close all windows whose end (start + duration) ≤ current window_start
   - Apply aggregate_fn to each closed window's results
   - Return sorted list of (key, window_start, window_end, aggregated_value)

Invariants

1. Watermark monotonicity. The watermark only advances forward — max(self.watermark, event.timestamp). advancetime explicitly guards timestamp <= self._watermark.

2. Once matched, always matched. The _BufferedEvent.matched flag is only ever set from False to True, never reset. This means an event that matches at least once will never produce a miss.

3. Match symmetry. Both sides of a match are marked: buffered.matched = True and other.matched = True. A single event can match multiple events on the opposite side (one-to-many join).

4. Expiration cutoff = watermark - window_duration. Events expire when their timestamp is strictly less than this cutoff. The window check on arrival uses <= (inclusive), so an event at exactly the boundary can still match but will expire on the next watermark advance past it.

5. Left join only emits left-side misses. In expireevents, JoinType.LEFT only triggers miss emission for left-buffer events (isleftbuf must be True). Right-side unmatched events are silently discarded.

6. No right-only join. The enum doesn't include RIGHT. To get right-join semantics, you'd swap the stream names.

Error Handling

There is essentially none — this is an in-memory, single-threaded processor with no I/O. The design relies on:

Topics to Explore

Beliefs