File: mapreduce-framework/mapreduce.py

Date: 2026-05-29

Time: 09:00

mapreduce-framework/mapreduce.py — Single-Machine MapReduce Framework

Purpose

This file implements a pedagogical single-machine MapReduce framework, modeling the programming model described in DDIA Chapter 10 (Batch Processing). It doesn't distribute work across machines — instead it simulates the MapReduce contract (map → shuffle/sort → reduce) using the local filesystem for intermediate data, exactly as a teaching tool should: it preserves the computational model while stripping away the distributed systems complexity.

The file owns two responsibilities: executing a single MapReduce job (MapReduceJob) and chaining multiple jobs into a pipeline (MapReducePipeline).

Key Components

JobStats (dataclass)

A plain data container tracking counters for a completed job: input/output record counts, worker counts, and wall-clock time. It's a read-only view — populated internally by MapReduceJob.run() and exposed via the stats property.

MapReduceJob

The core abstraction. Accepts user-defined mapper and reducer functions with these contracts:

Constructor parameters nummappers and numreducers control the parallelism topology, though execution is sequential. fault_tolerant controls whether mapper/reducer exceptions are swallowed or propagated.

MapReducePipeline

A lightweight chaining mechanism: stores an ordered list of MapReduceJob stages, feeds the output of each into the next. Uses builder-pattern (add_stage returns self).

Patterns

Three-phase execution model. run() follows the canonical MapReduce pipeline:

1. Map: Split input → run mapper on each chunk → write partitioned intermediate files

2. Shuffle/Sort: Collect intermediate files by partition → sort by key → group by key

3. Reduce: Run reducer on each key-group → collect results

Filesystem as intermediate storage. Intermediate data is serialized as JSON to a temp directory (map-{mapper_id}-part-{partition}.json). This mirrors how real MapReduce uses a distributed filesystem for shuffle data. The naming convention encodes both the source mapper and target partition.

Hash partitioning. hash(k) % num_reducers assigns each intermediate key to a reducer partition — the same scheme used in Hadoop's HashPartitioner. This ensures all values for a given key land in the same reducer.

Combiner as map-side optimization. The combiner runs in runmapper after all pairs for a chunk are collected, applying group-by-key + reduce locally before writing to disk. This reduces shuffle volume.

Dependencies

Imports: All stdlib — json for intermediate serialization, os/shutil/tempfile for filesystem management, time for elapsed timing, groupby from itertools for the key-grouping step, and typing for signatures.

Imported by: test_mapreduce.py, which exercises the framework with word count, numeric aggregation, and pipeline scenarios.

Flow

MapReduceJob.run() orchestrates the full execution:

1. Input normalization: If input_data is a string, it's treated as a file path — lines are read and keyed by line number (1-indexed).

2. Splitting: splitinput distributes records across num_mappers chunks using integer division with remainder distribution (first remainder chunks get one extra record).

3. Map phase: For each chunk, runmapper applies the mapper to every record, partitions the output by hash(key) % num_reducers, optionally applies the combiner per partition, then writes each partition's pairs to a JSON file.

4. Reduce phase: For each partition, runreducer reads all intermediate files matching that partition suffix, sorts all pairs by key, groups with itertools.groupby, and calls the reducer per group.

5. Final sort: Results are sorted by key before returning.

6. Cleanup: The temp directory is always removed via finally.

For MapReducePipeline.run(): output of stage N becomes input of stage N+1. Each stage's JobStats are collected in stagestats.

Invariants

Error Handling

Two modes controlled by fault_tolerant:

There's no retry logic — fault tolerance here means "skip and continue," not "retry until success." This is a simplification; real MapReduce frameworks would re-execute failed tasks on different workers.

Topics to Explore

Beliefs