Topic: How reduce-side joins (shuffle + sort + group) differ from these map-side approaches and when each is preferred

Date: 2026-05-29

Time: 13:30

Reduce-Side Joins vs. Map-Side Joins

The Core Difference: Shuffle or No Shuffle

The fundamental distinction comes down to whether you need a network shuffle to co-locate records by key.

Reduce-Side Join (the MapReduce framework)

In mapreduce-framework/mapreduce.py, the MapReduceJob implements the classic three-phase pipeline. The reducer at line 131 (runreducer) is where the "join" actually happens in a reduce-side approach:


# Sort by key
all_pairs.sort(key=lambda x: x[0])

# Group by key and reduce
for key, group in groupby(all_pairs, key=lambda x: x[0]):
    values = [v for _, v in group]
    output = self.reducer(key, values)

The sequence is:

1. Map: Each mapper tags records with the join key and a source identifier (e.g., "users" vs. "orders")

2. Shuffle: The framework partitions intermediate output by hash(key) % numreducers (line 104) and writes partition files (map-{mapperid}-part-{p}.json)

3. Sort + Group: Each reducer reads all partition files for its slice (line 133–136), sorts by key, and groups with itertools.groupby

4. Reduce: The reducer receives *all* values for a given key — from *both* datasets — and emits joined records

This is maximally general: it works regardless of data size, sort order, or partitioning scheme. But every record from both datasets must be serialized, written to disk, read back, and sorted. That's expensive.

Map-Side Joins (no shuffle at all)

The three strategies in map-side-join/mapsidejoins.py each exploit a precondition to eliminate the shuffle entirely:

#### 1. Broadcast Hash Join (BroadcastHashJoin, line ~68)

Precondition: One dataset fits in memory.

The constructor builds a hash table from the small dataset (buildhash_table, line 60), then each mapper independently probes it:


self.hash_table, self.skipped_small = _build_hash_table(small_dataset, small_key)

During join(), the large dataset is chunked across nummappers (line 83–85), and each mapper does O(1) hash lookups per record. No data moves between mappers — the small dataset is broadcast to all of them. This is visible in the stats: hashlookups counts probes, not shuffled records.

#### 2. Partitioned Hash Join (PartitionedHashJoin, line ~128)

Precondition: Both datasets are already partitioned by the join key (or can be cheaply partitioned).

Both sides are partitioned via partition_dataset(), then each partition is joined independently with a local hash table (line 142–145):


left_parts = partition_dataset(left_dataset, self.left_key, self.num_partitions)
right_parts = partition_dataset(right_dataset, self.right_key, self.num_partitions)

Each partition only builds a hash table from its own slice of the left side, so memory usage is O(leftsize / numpartitions) instead of O(leftsize). The mapper_id field on output records (line 161) tracks which partition produced each result — confirming partitions are fully independent.

#### 3. Sort-Merge Join (SortMergeJoin, referenced in tests)

Precondition: Both datasets are sorted by the join key (or sorting is cheap relative to shuffling).

The test at line 192 shows this detects pre-sorted input (result.stats["sorted_input"] is True) and avoids re-sorting. When both sides are sorted, a single linear pass merges them — O(n + m) time, no hash table needed.

When to Prefer Each

| Strategy | Use When | Cost |

|----------|----------|------|

| Reduce-side | No preconditions met; arbitrary datasets; need full generality | Full shuffle + sort of both datasets |

| Broadcast hash | One side is small enough to fit in memory on every mapper | Memory = small dataset size; no network shuffle |

| Partitioned hash | Both sides already co-partitioned by the join key (e.g., output of a prior MapReduce stage) | Memory = partition slice; no cross-partition traffic |

| Sort-merge | Both sides already sorted by join key (e.g., SSTable output) | O(n + m) linear scan; no hash table memory |

The reduce-side join in mapreduce.py is the fallback — it always works because the framework handles co-location via shuffle. Map-side joins are optimizations that trade generality for performance by pushing preconditions onto the data layout. The comparejoinstrategies function (referenced in the test at line 211) exists precisely to make this tradeoff visible by running all three and comparing stats side-by-side.

One thing worth noting: the stream join processor (stream-join-processor/streamjoinprocessor.py) is a completely different paradigm — it operates on unbounded event streams with time windows rather than bounded batch datasets, making it unsuitable for the batch join comparison but relevant for real-time scenarios.

Topics to Explore

Beliefs