File: map-side-join/mapsidejoins.py

Date: 2026-05-29

Time: 08:59

Purpose

This file implements three map-side join strategies from DDIA Chapter 10 (Batch Processing). Map-side joins avoid the shuffle/reduce phase of a traditional MapReduce join by exploiting knowledge about the input data (size, partitioning, sort order). The file provides BroadcastHashJoin, PartitionedHashJoin, and SortMergeJoin, plus a comparejoinstrategies function that runs all three and verifies they produce identical results.

Each strategy trades different assumptions about input data for different performance characteristics — the core pedagogical point of DDIA's treatment of joins.

Key Components

JoinResult

Simple container for join output: a list of records (dicts) and a stats dict with operational metrics (hash lookups, comparisons, records read, etc.). The .count property is sugar over len(records).

BroadcastHashJoin

Assumption: One dataset is small enough to fit in memory on every mapper.

PartitionedHashJoin

Assumption: Both datasets are partitioned on the join key using the same hash function, so matching records land in the same partition.

SortMergeJoin

Assumption: Both datasets are sorted by the join key (or can be sorted).

Helper Functions

| Function | Contract |

|----------|----------|

| mergerecords(left, right, leftkey, rightkey) | Merges two dicts. The join key appears once. Conflicting field names get left/right prefixes. |

| buildhashtable(dataset, key) | Returns (defaultdict(list), skippedcount). Records missing the key are silently skipped. |

| partition_dataset(dataset, key, n) | Hash-partitions into n buckets using Python's hash(). Records missing the key are dropped. |

| sort_dataset(dataset, key) | Returns a new sorted list. Records missing the key are appended at the end unsorted. |

| issorted(dataset, key) | Linear scan; skips records missing the key. |

| comparejoinstrategies(left, right, joinkey) | Runs all three strategies on inner join, normalizes results (strips mapper_id, sorts), and returns a verification boolean confirming equivalence. |

Patterns

Simulated parallelism. There are no threads or processes — mapper parallelism is modeled by chunking data and tagging output with mapperid. This keeps the code deterministic and testable while illustrating the concept.

Conflict-prefixed merge. When both sides share a field name (other than the join key), the merge disambiguates with left/right prefixes. This convention is applied consistently across all three join types, including the None-fill paths for left joins.

Stats instrumentation. Every join returns operational counters (hash lookups, comparisons, records read, skipped). This supports the pedagogical goal of comparing strategies quantitatively via comparejoinstrategies.

Defensive key-missing handling. Every path that reads a join key checks for its presence first. Missing-key records are counted as skipped_records rather than raising exceptions.

Dependencies

Imports: Only collections.defaultdict — no external dependencies.

Imported by: The test files testmapsidejoins.py and testertestmapside_joins.py exercise all three join strategies.

Flow

Taking BroadcastHashJoin as the canonical example:

1. Build phase (constructor): Iterate small dataset → populate defaultdict(list) keyed on small_key.

2. Distribute phase (join): Round-robin large dataset records into num_mappers chunks.

3. Probe phase: For each large-side record, look up its key in the hash table.

4. Collect phase: Accumulate all merged records, compute stats, return JoinResult.

PartitionedHashJoin replaces steps 1–2 with co-partitioning (both sides hashed into the same buckets), then runs the build/probe locally per partition.

SortMergeJoin replaces the hash table entirely with a two-pointer merge, grouping equal keys for cartesian product.

Invariants

Error Handling

There is essentially none. The code takes a silent-skip approach: records missing the join key are filtered out and counted, but no exceptions are raised. buildhashtable returns the skip count; SortMergeJoin accumulates it in a local variable. There's no validation on input types, no bounds checking on numpartitions/num_mappers, and no handling of unhashable key values (which would raise from hash() or dict lookup).

This is appropriate for a reference implementation — it prioritizes clarity of the algorithm over production robustness.

Topics to Explore

Beliefs