from mrjob.job import MRJob
class OneStepMean(MRJob):
def mapper(self, _, line):
if line.startswith('id'):
return
try:
fields = line.split()
r2_value = float(fields[3]) # R2 at index 3
yield 'sum', r2_value
yield 'count', 1
except:
pass
Emits: sum → R2_value and count → 1 per record
def reducer(self, key, values):
total = sum(values)
yield key, total
Storage: O(N) in shuffle
Problem: Same, but we want to reduce shuffle volume
Key insight: Pre-aggregate in mapper, then merge at the end
Strategy:
def mapper_step1(self, _, line):
if line.startswith('id'):
return
try:
fields = line.split()
r2_value = float(fields[3]) # R2 at index 3
num_partitions = int(math.sqrt(self.TOTAL_RECORDS))
bucket_id = hash(line) % num_partitions
yield f'bucket_{bucket_id}', r2_value
except:
pass
Distributes records across √N buckets
def reducer_step1(self, bucket_id, values):
values_list = list(values)
bucket_sum = sum(values_list)
bucket_count = len(values_list)
yield 'result', (bucket_sum, bucket_count)
Output: √N partial aggregates — O(√N) shuffle
def mapper_step2(self, key, value):
yield key, value
Pass-through (Step 1 output → Step 2 input)
def reducer_step2(self, key, values):
total_sum = total_count = 0
for bucket_sum, bucket_count in values:
total_sum += bucket_sum
total_count += bucket_count
if total_count > 0:
yield 'mean', total_sum / total_count
Same result, O(√N) shuffle total
| Phase | Task 1 | Task 2 |
|---|---|---|
| Step 1 shuffle | O(N) | O(√N) |
| Step 2 shuffle | — | O(√N) |
| Total | O(N) | O(√N) |
Trade-off: Add one extra job → save network bandwidth
Problem: Calculate mean for all numeric columns (R1, R2, ..., R13)
Key idea: Emit ALL column stats in a single key-value pair
Why efficiency? One shuffle instead of 13 shuffles
class AllColumnsMean(MRJob):
def mapper(self, _, line):
if line.startswith('id'):
return
try:
fields = line.split()
# R1-R8 are at indices 2-9 (fields 3-10)
for col_idx in range(2, 10):
col_value = float(fields[col_idx])
col_name = f'R{col_idx - 1}'
yield col_name, col_value
except:
pass
Emits (column_name, value) for R1-R8
def reducer(self, col_name, values):
values_list = list(values)
count = len(values_list)
total = sum(values_list)
if count > 0:
yield col_name, total / count
All 8 sensor means in one job
Trade-offs:
Format: Pairs
Task: Choose one of these problems and sketch the mapper + reducer:
Deliverable: Pseudocode for mapper and reducer
Next Session: Assignment #1
Speaker context: This session teaches students how to think algorithmically about MapReduce problems. We take a simple task (calculating mean) and explore three different approaches with increasing efficiency. Students learn to translate "what computation do I need?" into "what does my mapper do?" and "what does my reducer do?". Using real data (gas sensors dataset) keeps it grounded.
Speaker notes: Frame this as demystifying MapReduce. Once students see how to break down a real problem (mean calculation), they can apply the pattern to any aggregation, join, or transformation task.
Speaker notes: - Emphasize "translation" not "magic". Mapper asks "what facts do I extract?". Reducer asks "how do I combine them?". - Trade-offs: Task 1 = simple but big storage; Task 2 = complex but O(√N) storage; Task 3 = all columns efficiently. - This is the mental model students apply to Spark, Hive, and beyond.