Date: 2026-05-29
Time: 08:59
This file implements three map-side join strategies from DDIA Chapter 10 (Batch Processing). Map-side joins avoid the shuffle/reduce phase of a traditional MapReduce join by exploiting knowledge about the input data (size, partitioning, sort order). The file provides BroadcastHashJoin, PartitionedHashJoin, and SortMergeJoin, plus a comparejoinstrategies function that runs all three and verifies they produce identical results.
Each strategy trades different assumptions about input data for different performance characteristics — the core pedagogical point of DDIA's treatment of joins.
JoinResultSimple container for join output: a list of records (dicts) and a stats dict with operational metrics (hash lookups, comparisons, records read, etc.). The .count property is sugar over len(records).
BroadcastHashJoinAssumption: One dataset is small enough to fit in memory on every mapper.
buildhash_table).join() simulates distributing the large dataset across num_mappers via round-robin chunking, then each "mapper" probes the hash table."inner" and "left" join types (left = large side preserved when unmatched).mapperid to show which mapper produced it.PartitionedHashJoinAssumption: Both datasets are partitioned on the join key using the same hash function, so matching records land in the same partition.
join() partitions both datasets via partition_dataset(), then runs a local hash join within each partition.matchedleftkeys per partition to emit unmatched left records afterward.mapperid = part_id).SortMergeJoinAssumption: Both datasets are sorted by the join key (or can be sorted).
join() checks sort order with issorted(), sorts if needed, then does a linear merge.lk < rk) with None fills via leftunmatched().stats["sorted_input"].| Function | Contract |
|----------|----------|
| mergerecords(left, right, leftkey, rightkey) | Merges two dicts. The join key appears once. Conflicting field names get left/right prefixes. |
| buildhashtable(dataset, key) | Returns (defaultdict(list), skippedcount). Records missing the key are silently skipped. |
| partition_dataset(dataset, key, n) | Hash-partitions into n buckets using Python's hash(). Records missing the key are dropped. |
| sort_dataset(dataset, key) | Returns a new sorted list. Records missing the key are appended at the end unsorted. |
| issorted(dataset, key) | Linear scan; skips records missing the key. |
| comparejoinstrategies(left, right, joinkey) | Runs all three strategies on inner join, normalizes results (strips mapper_id, sorts), and returns a verification boolean confirming equivalence. |
Simulated parallelism. There are no threads or processes — mapper parallelism is modeled by chunking data and tagging output with mapperid. This keeps the code deterministic and testable while illustrating the concept.
Conflict-prefixed merge. When both sides share a field name (other than the join key), the merge disambiguates with left/right prefixes. This convention is applied consistently across all three join types, including the None-fill paths for left joins.
Stats instrumentation. Every join returns operational counters (hash lookups, comparisons, records read, skipped). This supports the pedagogical goal of comparing strategies quantitatively via comparejoinstrategies.
Defensive key-missing handling. Every path that reads a join key checks for its presence first. Missing-key records are counted as skipped_records rather than raising exceptions.
Imports: Only collections.defaultdict — no external dependencies.
Imported by: The test files testmapsidejoins.py and testertestmapside_joins.py exercise all three join strategies.
Taking BroadcastHashJoin as the canonical example:
1. Build phase (constructor): Iterate small dataset → populate defaultdict(list) keyed on small_key.
2. Distribute phase (join): Round-robin large dataset records into num_mappers chunks.
3. Probe phase: For each large-side record, look up its key in the hash table.
None fills for small-side fields.4. Collect phase: Accumulate all merged records, compute stats, return JoinResult.
PartitionedHashJoin replaces steps 1–2 with co-partitioning (both sides hashed into the same buckets), then runs the build/probe locally per partition.
SortMergeJoin replaces the hash table entirely with a two-pointer merge, grouping equal keys for cartesian product.
mergerecords includes the left key and skips both keys from their respective records.left prefix for left-side conflicts, right for right-side, across all code paths including None fills.mapperid is always set on every output record, even for SortMergeJoin (hardcoded to 0).comparejoinstrategies normalizes away mapperid before comparing, so mapper assignment differences don't cause false verification failures.skipped_records and dropped.There is essentially none. The code takes a silent-skip approach: records missing the join key are filtered out and counted, but no exceptions are raised. buildhashtable returns the skip count; SortMergeJoin accumulates it in a local variable. There's no validation on input types, no bounds checking on numpartitions/num_mappers, and no handling of unhashable key values (which would raise from hash() or dict lookup).
This is appropriate for a reference implementation — it prioritizes clarity of the algorithm over production robustness.
map-side-join/testmapside_joins.py — See which edge cases are tested (left joins, missing keys, field conflicts, multi-mapper verification)mapsidejoins.py:comparejoinstrategies — The normalization logic reveals what the implementation considers semantically equivalent across strategiesbatch-word-count/pipeline.py — Another Ch.10 batch processing implementation; compare how it models mapper parallelismddia-ch10-reduce-side-joins — How reduce-side joins (shuffle + sort + group) differ from these map-side approaches and when each is preferredmapsidejoins.py:mergerecords — The conflict-resolution logic is the trickiest shared code; understanding it clarifies all three join typesmap-side-join-three-strategies — The module implements exactly three join strategies (broadcast hash, partitioned hash, sort-merge) that all produce identical inner-join results when verified by comparejoinstrategiesmap-side-join-conflict-prefix — When left and right datasets share a non-key field name, the merged record uses left and right prefixes to disambiguate; this applies in all join types including None-fill pathsmap-side-join-no-real-parallelism — Mapper parallelism is simulated via round-robin chunking and mapperid tagging; no threads or processes are usedmap-side-join-missing-key-skip — Records missing the join key are silently dropped and counted in stats["skipped_records"] rather than raising exceptionsmap-side-join-cartesian-on-duplicates — When multiple records share the same join key value, all strategies produce the cartesian product of the matching groups