Date: 2026-05-29
Time: 13:31
This is the test suite for a MapReduce framework implementation — one of the DDIA reference modules. It validates that MapReduceJob and MapReducePipeline behave correctly across the full spectrum of MapReduce semantics: basic map/reduce, partitioning determinism, combiners, file I/O, fault tolerance, multi-stage pipelines, and job statistics. It serves as both a correctness harness and a living spec for the framework's API contract.
wordcountmapper(key, value) — The canonical MapReduce example. Takes a line of text and emits (word, 1) pairs. Lowercases input, splits on whitespace. Used by most tests as the default mapper.
wordcountreducer(key, values) — Sums all values for a given key. Returns a single (key, total) pair. Also doubles as the combiner in test_combiner.
| Test | What it validates |
|------|-------------------|
| testwordcount | Core map-shuffle-reduce on 3 input lines; asserts exact counts and total unique keys (8) |
| testmultiworkerconsistency | Determinism — results are identical across 4 different (nummappers, num_reducers) configurations |
| test_combiner | Combiner produces identical final results; intermediate record counts match (combiner is applied post-counting) |
| testfileinput | job.run() accepts a file path string, not just in-memory pairs |
| testemptyinput | Edge case — empty list yields empty output and zero stats |
| testsinglekey | All mapper output collapses to one key; reducer sees the full value set |
| testfaulttolerantmode | faulttolerant=True silently skips records where the mapper raises |
| teststrictmode | Default mode propagates mapper exceptions to the caller |
| test_pipeline | MapReducePipeline chains two stages; output of stage 1 feeds into stage 2 |
| test_statistics | job.stats tracks input/output record counts, worker counts, and elapsed time |
Self-contained test runner. The if _name == 'main' block implements a minimal test harness — iterates over a list of test functions, catches exceptions, prints PASS/FAIL, and exits with appropriate status. This avoids a pytest dependency while still being pytest-compatible (all tests are module-level test* functions).
Fixture-free design. Each test constructs its own MapReduceJob with inline mapper/reducer lambdas or local functions. No shared state between tests, so they're safe to run in any order.
Parameterized-by-hand. testmultiworker_consistency manually loops over worker configurations rather than using parameterized test fixtures — keeps the test self-explanatory.
Cleanup via try/finally. testfileinput uses tempfile.NamedTemporaryFile with delete=False and explicitly os.unlinks in a finally block, ensuring cleanup even on assertion failure.
Imports:
os, tempfile — file I/O for testfileinputsys — sys.exit() in the manual runnermapreduce.MapReduceJob — the core job abstraction under testmapreduce.MapReducePipeline — multi-stage chainingImported by: Nothing — this is a leaf test module.
Each test follows the same pattern:
1. Define mapper/reducer (or reuse the module-level word-count pair)
2. Construct a MapReduceJob with configuration (nummappers, numreducers, optional combiner, fault_tolerant)
3. Call job.run(inputdata) where inputdata is either a list of (key, value) tuples or a file path string
4. Assert on the returned list of (key, value) result pairs and/or job.stats
The pipeline test (test_pipeline) adds a second stage where the mapper swaps key/value from stage 1's output, creating an inverted index from count→words.
testmultiworkerconsistency asserts that varying nummappers and num_reducers never changes the result set — the shuffle/partition step must be deterministic for a given input.test_combiner asserts r1 == r2 regardless of combiner presence.test_statistics hardcodes expected record counts (3 input, 11 intermediate, 8 output for the fox/dog corpus), so any change to the shuffle or dedup logic will break this test.teststrictmode constructs a job with no fault_tolerant flag and expects exceptions to propagate.Two modes are tested:
1. Strict (default): teststrictmode expects RuntimeError to propagate out of job.run(). The test uses a manual try/except with assert False as the fallthrough — a pattern that ensures the test fails if no exception is raised.
2. Fault-tolerant: testfaulttolerantmode sets faulttolerant=True and verifies that a ValueError from the mapper on key=2 is swallowed — the bad record is skipped, and only the good records appear in output.
The test file itself does not catch or wrap errors beyond these two tests; all other tests expect clean execution or will fail with an unhandled assertion error.
mapreduce-framework/mapreduce.py — The implementation under test; shows how shuffle/partition, combiners, stats tracking, and fault tolerance actually workmapreduce-framework/mapreduce.py:MapReduceJob.run — The core execution engine; understanding it explains why worker count doesn't affect resultsmapreduce-framework/mapreduce.py:MapReducePipeline.run — How stage outputs are wired to stage inputs, and how stage_stats is accumulatedcombiner-vs-reducer-semantics — Why the combiner must be associative/commutative and why mapoutputrecords is counted before the combiner runsbatch-word-count/pipeline.py — A related batch processing implementation; compare its pipeline abstraction with MapReducePipelinemapreduce-results-deterministic — MapReduceJob.run() produces identical output regardless of nummappers/numreducers configuration for the same input datamapreduce-combiner-transparent — Applying a combiner never changes final results; it only reduces intermediate record volumemapreduce-strict-mode-default — MapReduceJob defaults to strict mode where mapper/reducer exceptions propagate to the callermapreduce-run-accepts-filepath — MapReduceJob.run() accepts either a list of (key, value) tuples or a file path string as inputmapreduce-stats-tracks-records — job.stats accurately counts mapinputrecords, mapoutputrecords, reduceoutputrecords, worker counts, and elapsed time after each run