File: batch-word-count/pipeline.py

Date: 2026-05-29

Time: 08:53

Purpose

pipeline.py implements a composable batch processing framework modeled after Unix pipes. Each processing step is a Stage that consumes an iterator and yields records, and the Pipeline class chains them together into a lazy, pull-based dataflow. The canonical use case is word counting (ReadLines → Tokenize → Count → Sort), but the framework is general-purpose — it corresponds to the batch processing concepts from DDIA Chapter 10 (MapReduce and beyond), distilled into a single-process, iterator-based design.

Key Components

Stage (base class)

The abstract unit of work. Every stage implements process(input_iter) as a generator that consumes input records and yields output records. The base class provides two pieces of infrastructure:

Concrete Stages

| Stage | Input | Output | Materialization |

|-------|-------|--------|-----------------|

| ReadLines | file path or list of strings | (line_number, text) | Streaming |

| Tokenize | (key, text) | (word, 1) | Streaming |

| Count | (key, value) | (key, total) | Full — accumulates all keys in a defaultdict before yielding |

| Sort | any tuple | same tuples, sorted | Full (in-memory) or external (spills to disk) |

| Filter | any record | same record (if predicate passes) | Streaming |

| TopN | any tuple | top N tuples by key or value | Partial — maintains a min-heap of size N |

| Partition | any record | same record (pass-through), plus side-effect writes | Streaming with file I/O side effects |

| FlatMap | any record | zero or more records from fn(record) | Streaming |

Pipeline

The orchestrator. add_stage() appends stages and returns self for chaining. Three execution modes:

Patterns

Pull-based iterator chaining. Each stage wraps the previous stage's output iterator — the pipeline is a nested generator chain evaluated lazily from the tail. This is the Unix pipe model: data flows record-by-record, and only materializing stages (Count, Sort) buffer data.

External merge sort. Sort implements a textbook external sort: when the in-memory buffer hits memorylimit, it flushes a sorted chunk to a temp file as JSONL. After all input is consumed, it uses heapq.merge over the chunks. The KeyedRecord wrapper class with custom lt_ handles both ascending/descending order and stable sequencing via a seq tiebreaker.

Downstream time subtraction. trackedprocess measures elapsed time but subtracts time spent after each yield (which is time the downstream stage is processing). This gives accurate per-stage attribution in a lazy pipeline where stages interleave execution.

Builder pattern. Pipeline.addstage() returns self, enabling p.addstage(A()).add_stage(B()) fluent construction.

Dependencies

Imports: All stdlib — heapq for merge sort and TopN, json for serialization in Sort/Partition, tempfile/os for external sort spill files, re for punctuation stripping in Tokenize, time for stats, defaultdict for Count.

Imported by: testpipeline.py and testertest_pipeline.py — the test suite and its meta-test harness.

Flow

A typical word-count pipeline executes as:

1. Pipeline.run() calls builditerator(), which nests the stages: Sort.trackedprocess(Count.trackedprocess(Tokenize.trackedprocess(ReadLines.trackedprocess(...)))).

2. list(...) pulls from the outermost iterator.

3. Sort pulls from Count, which pulls from Tokenize, which pulls from ReadLines — all lazily.

4. Count is a barrier: it consumes its entire input before yielding anything, so everything upstream of Count runs to completion before Sort sees any records.

5. After the iterator is exhausted (or errors), buildstats collects per-stage metrics.

The builditerator method has special-case handling for the first stage: if it's a ReadLines, it uses its own source; otherwise, if inputdata is provided, a temporary ReadLines is prepended (but *not* added to self.stages, so its stats aren't tracked).

Invariants

Error Handling

Minimal and deliberate. There are no try/except blocks that catch and swallow errors — exceptions propagate up the generator chain naturally. The finally blocks in Sort, Partition, and the Pipeline.run* methods ensure cleanup (temp files, open file handles, stats collection) happens even on failure. ReadLines will raise FileNotFoundError if given a nonexistent path. The pipeline does not validate stage compatibility at construction time.

Topics to Explore

Beliefs