Converting Algorithms to MapReduce

CS 6500 — Week 4, Session 1

Why This Matters

  • MapReduce isn't a magic black box—it's a computational pattern
  • Every algorithm can be mapped to: parse → group → aggregate
  • The same problem can be solved different ways with different trade-offs
  • Understanding trade-offs makes you a better engineer

Today's Targets

You will be able to:

  • Translate an algorithm into mapper and reducer logic
  • Identify what gets emitted at each phase
  • Reason about storage and computation trade-offs
  • Implement solutions with increasing efficiency

The Dataset

Gas Sensors for Home Activity Monitoring

Our task: Calculate mean (average) of column R2

Task 1: One-Step Mean

The Simple Approach

Problem: Calculate mean of R2 column

Formula:

Key idea: Emit partial sums and counts, then divide at the end

Task 1: Mapper (mrjob)

from mrjob.job import MRJob

class OneStepMean(MRJob):
    def mapper(self, _, line):
        if line.startswith('id'):
            return
        try:
            fields = line.split()
            r2_value = float(fields[3])  # R2 at index 3
            yield 'sum', r2_value
            yield 'count', 1
        except:
            pass

Emits: sum → R2_value and count → 1 per record

Task 1: Reducer (mrjob)

    def reducer(self, key, values):
        total = sum(values)
        yield key, total

Storage: O(N) in shuffle

Task 2: Two-Step Mean (Efficient)

Partition → Aggregate → Merge

Problem: Same, but we want to reduce shuffle volume

Key insight: Pre-aggregate in mapper, then merge at the end

Strategy:

  1. Step 1: Partition R2 into √N groups, sum each group
  2. Step 2: Merge partial sums to compute final mean

Task 2, Step 1: Mapper (mrjob)

def mapper_step1(self, _, line):
    if line.startswith('id'):
        return
    try:
        fields = line.split()
        r2_value = float(fields[3])  # R2 at index 3
        num_partitions = int(math.sqrt(self.TOTAL_RECORDS))
        bucket_id = hash(line) % num_partitions
        yield f'bucket_{bucket_id}', r2_value
    except:
        pass

Distributes records across √N buckets

Task 2, Step 1: Reducer (mrjob)

def reducer_step1(self, bucket_id, values):
    values_list = list(values)
    bucket_sum = sum(values_list)
    bucket_count = len(values_list)
    yield 'result', (bucket_sum, bucket_count)

Output: √N partial aggregates — O(√N) shuffle

Task 2, Step 2: Mapper (mrjob)

def mapper_step2(self, key, value):
    yield key, value

Pass-through (Step 1 output → Step 2 input)

Task 2, Step 2: Reducer (mrjob)

def reducer_step2(self, key, values):
    total_sum = total_count = 0
    for bucket_sum, bucket_count in values:
        total_sum += bucket_sum
        total_count += bucket_count
    if total_count > 0:
        yield 'mean', total_sum / total_count

Same result, O(√N) shuffle total

Task 2 Storage Comparison

Phase Task 1 Task 2
Step 1 shuffle O(N) O(√N)
Step 2 shuffle O(√N)
Total O(N) O(√N)

Trade-off: Add one extra job → save network bandwidth

Task 3: All Columns Mean (Stretch Goal)

Efficient Multi-Column Aggregation

Problem: Calculate mean for all numeric columns (R1, R2, ..., R13)

Key idea: Emit ALL column stats in a single key-value pair

Why efficiency? One shuffle instead of 13 shuffles

Task 3: Mapper (mrjob)

class AllColumnsMean(MRJob):
    def mapper(self, _, line):
        if line.startswith('id'):
            return
        try:
            fields = line.split()
            # R1-R8 are at indices 2-9 (fields 3-10)
            for col_idx in range(2, 10):
                col_value = float(fields[col_idx])
                col_name = f'R{col_idx - 1}'
                yield col_name, col_value
        except:
            pass

Emits (column_name, value) for R1-R8

Task 3: Reducer (mrjob)

    def reducer(self, col_name, values):
        values_list = list(values)
        count = len(values_list)
        total = sum(values_list)
        if count > 0:
            yield col_name, total / count

All 8 sensor means in one job

Key Takeaways

  1. Mapper = "what facts do I extract from each input record?"
  2. Reducer = "how do I combine facts to answer the question?"
  3. Shuffle = "what data moves between mapper and reducer?"

Trade-offs:

  • Task 1: Simple, but O(N) shuffle
  • Task 2: Two jobs, but O(√N) shuffle
  • Task 3: One job, handles multiple columns

Activity: Redesign a Problem (15 min)

Format: Pairs

Task: Choose one of these problems and sketch the mapper + reducer:

  • Calculate median of R2 column
  • Count occurrences of each R2 value
  • Find max and min of R2 simultaneously

Deliverable: Pseudocode for mapper and reducer

Debrief Prompts

  • What gets emitted from the mapper?
  • How does the reducer know when one input ends and another begins?
  • What trade-offs did you make?

Wrap-Up & Next Steps

Next Session: Assignment #1

Speaker context: This session teaches students how to think algorithmically about MapReduce problems. We take a simple task (calculating mean) and explore three different approaches with increasing efficiency. Students learn to translate "what computation do I need?" into "what does my mapper do?" and "what does my reducer do?". Using real data (gas sensors dataset) keeps it grounded.

Speaker notes: Frame this as demystifying MapReduce. Once students see how to break down a real problem (mean calculation), they can apply the pattern to any aggregation, join, or transformation task.

Speaker notes: - Emphasize "translation" not "magic". Mapper asks "what facts do I extract?". Reducer asks "how do I combine them?". - Trade-offs: Task 1 = simple but big storage; Task 2 = complex but O(√N) storage; Task 3 = all columns efficiently. - This is the mental model students apply to Spark, Hive, and beyond.