Writing MapReduce Programs with mrjob

CS 6500 — Week 3, Session 2

Today's Learning Journey

Session 1 Recap: MapReduce architecture, data flow, shuffle/sort mechanics

Today's Goal: Write, test, and deploy your first MapReduce programs

What You'll Master:

  • mrjob framework (Pythonic MapReduce development)
  • Class-based mapper and reducer patterns
  • Local testing strategies (catch bugs before cluster submission)
  • Cluster job submission and monitoring
  • Debugging techniques for distributed failures

Why mrjob?

The Challenge: Raw Hadoop requires Java OR complex stdin/stdout scripts

The Solution: mrjob = Python library for elegant MapReduce

  • Write MapReduce as Python classes (object-oriented, clean)
  • Same code runs locally, on Hadoop, or on AWS EMR
  • Automatic job configuration and dependency management
  • Built-in testing and debugging tools

Perfect for: Python developers, data scientists, analysts

Bonus: Industry-proven (developed by Yelp for production analytics)

mrjob Philosophy

Key Insight: MapReduce job = Python class with methods

from mrjob.job import MRJob

class MyJob(MRJob):
    def mapper(self, key, value):
        # Your map logic here
        yield new_key, new_value
    
    def reducer(self, key, values):
        # Your reduce logic here
        yield key, aggregated_value

SQL Analogy: Like defining a stored procedure or view

  • mapper = SELECT statement
  • reducer = GROUP BY aggregation

Installation

Install mrjob via pip:

pip install mrjob

That's it! No complex Hadoop configuration needed for local testing

Check installation:

python -c "import mrjob; print(mrjob.__version__)"

Live Coding Demo: Word Count with mrjob

Problem: Count word frequency in text files

mrjob Approach:

  1. Create a class that inherits from MRJob
  2. Define mapper() method to process each line
  3. Define reducer() method to aggregate counts

Let's code wordcount.py together...

Word Count: Mapper

from mrjob.job import MRJob
import re

class WordCount(MRJob):
    def mapper(self, _, line):
        words = re.findall(r'\w+', line.lower())
        for word in words:
            yield word, 1

Emits: ("word", 1) for each word

Word Count: Reducer

    def reducer(self, word, counts):
        yield word, sum(counts)

if __name__ == '__main__':
    WordCount.run()

Total: ~15 lines for complete MapReduce!

mrjob Mapper: Key Details

Method signature:

def mapper(self, _, line):
  • _ = key (unused, None for text files)
  • line = input line value
  • yield = Pythonic generator (cleaner than print)

SQL Analogy: SELECT word, 1 FROM input

mrjob Reducer Pattern

def reducer(self, word, counts):
    yield word, sum(counts)
  • word = key (auto-grouped by framework)
  • counts = iterator of all values for key
  • Framework handles sorting and grouping!

SQL Analogy: GROUP BY word

Local Testing Workflow

Test before cluster submission:

# Single file
python wordcount.py input.txt

# Multiple files
python wordcount.py file1.txt file2.txt

# From stdin
cat shakespeare.txt | python wordcount.py

Why: Catches 90% of bugs in 30 seconds

SQL Analogy: Test query before production

mrjob Output Formats

Default: Tab-separated

apple	5
banana	3

JSON output:

OUTPUT_PROTOCOL = JSONValueProtocol

def reducer(self, word, counts):
    yield word, {"count": sum(counts)}

Common Local Testing Tips

# Test with sample input
echo "hello world hello" | python wordcount.py

# View top results
python wordcount.py input.txt | sort -t $'\t' -k2 -nr | head -10

Debugging: Add print(f"DEBUG: {var}", file=sys.stderr) to mapper/reducer

Pro Tip: Test with small files first, then scale up

Running on Hadoop Cluster: Step 1a

Copy job script to Docker:

# Option 1: Use docker cp
docker cp wordcount.py hadoop-namenode:/tmp/

# Option 2: Copy to shared volume
cp wordcount.py /path/to/docker/share/

Enter container:

docker exec -it hadoop-namenode bash
cd /tmp  # or /shared

Running on Hadoop Cluster: Step 1b

Upload input to HDFS:

# Create directory in HDFS
hdfs dfs -mkdir -p /user/student/wordcount/input

# Upload from local/Docker to HDFS
hdfs dfs -put shakespeare.txt \
  /user/student/wordcount/input/

# Verify
hdfs dfs -ls /user/student/wordcount/input

Running on Hadoop Cluster: Step 2

Submit mrjob to Hadoop:

python wordcount.py \
  -r hadoop \
  hdfs:///user/student/wordcount/input/*

That's it! mrjob handles all configuration

Running on Hadoop Cluster: Step 2b

Specify output directory and reducers:

python wordcount.py -r hadoop \
  --output-dir hdfs:///user/student/wordcount/output \
  hdfs:///user/student/wordcount/input/*

Control parallelism:

python wordcount.py -r hadoop \
  --num-reducers 4 \
  hdfs:///user/student/wordcount/input/*

mrjob Runner Modes

Mode Command Use Case
local python job.py file.txt Testing
hadoop python job.py -r hadoop hdfs://... Cluster
emr python job.py -r emr s3://... AWS
inline python job.py -r inline file.txt Single-process

Same code, different runners = true portability!

Monitoring Jobs: YARN ResourceManager UI

Access: http://localhost:8088

Key Screens:

  • Applications: Running/completed jobs with status
  • Application Details: Click job ID → see tasks, progress, logs
  • Task Logs: Click individual tasks → view stdout/stderr

What to Monitor:

  • Map/reduce tasks completion percentage
  • Counters (input records, output records, shuffle bytes)
  • Failed tasks (red indicators → click for error logs)

Live Demo: End-to-End mrjob Execution

Watch over my shoulder as we:

  1. Write word count job ✅
  2. Test locally with sample file ✅
  3. Upload data to HDFS ✅
  4. Submit to Hadoop cluster with -r hadoop ✅
  5. Monitor in YARN UI ✅
  6. View results in HDFS ✅

Follow along on your laptops!

Viewing Results

# List output files (mrjob creates part-* files)
hdfs dfs -ls /user/student/wordcount/output

# View results
hdfs dfs -cat /user/student/wordcount/output/part-* | head -20

# Sort by frequency (descending)
hdfs dfs -cat /user/student/wordcount/output/part-* \
  | sort -t $'\t' -k2 -nr | head -20

Expected output: Most frequent words with counts

SQL Analogy: SELECT * FROM results ORDER BY count DESC LIMIT 20

Debugging mrjob Jobs

Job fails immediately?

  • Check Python syntax: python wordcount.py --help
  • Verify mrjob installed: pip show mrjob
  • Check imports: python -c "from mrjob.job import MRJob"

Job runs but produces wrong output?

  • Add debug logging (goes to stderr):
    import sys
    print(f"DEBUG: word={word}", file=sys.stderr)
    
  • View task logs in YARN UI

Job hangs or times out?

  • Check HDFS paths are correct
  • Verify input files exist and are readable
  • Look for exceptions in YARN application logs

Common mrjob Debugging Scenarios

Scenario 1: "ImportError: No module named mrjob"

  • Solution: pip install mrjob (or activate correct virtualenv)

Scenario 2: "Output is empty!"

  • Cause: Forgot yield in mapper/reducer (used return instead)
  • Solution: Always use yield, never return

Scenario 3: "TypeError: reducer() takes 2 args, got 3"

  • Cause: Wrong method signature (forgot self parameter)
  • Solution: Use def reducer(self, key, values):

Challenge: Convert SQL to MapReduce

SQL Problem:

SELECT customer_id, SUM(amount) AS total
FROM purchases
GROUP BY customer_id;

Question: How would you implement this in mrjob?

(Think about which part becomes mapper, which becomes reducer...)

Solution: Customer Purchase Analysis

mrjob Version:

class CustomerTotal(MRJob):
    def mapper(self, _, line):
        customer_id, amount = line.split(',')
        yield customer_id, float(amount)
    
    def reducer(self, customer_id, amounts):
        yield customer_id, sum(amounts)

Key Insight: MapReduce = distributed SQL GROUP BY!

Activity: Pair Programming Challenge

Task: Maximum temperature by city using mrjob

Input data (temps.txt):

2024-01-15,NYC,32
2024-01-15,LA,68
2024-01-16,NYC,28

Goal: Find max temperature per city

Create max_temp.py:

from mrjob.job import MRJob

class MaxTempByCity(MRJob):
    def mapper(self, _, line):
        date, city, temp = line.split(',')
        yield city, int(temp)
    
    def reducer(self, city, temps):
        yield city, max(temps)

if __name__ == '__main__':
    MaxTempByCity.run()

Activity: Solution Review

Discussion: How would you find BOTH min and max?

Answer: Return a dictionary with both values:

def reducer(self, city, temps):
    temps_list = list(temps)
    yield city, {
        "min": min(temps_list), 
        "max": max(temps_list)
    }

Key Insight: mrjob handles serialization; you can yield dicts, lists, or complex objects!

Challenge: Calculate Average Temperature by City

Scenario: You have temperature readings for multiple cities over a year:

NYC,32
LA,68
NYC,28
LA,72
NYC,35

Question: How would you calculate the average temperature for each city using MapReduce?

(Hint: Why can't you just average the averages?)

Solution: Computing Averages in MapReduce

Key Insight: Emit (sum, count) pairs; let reducer combine them

class CalculateAverage(MRJob):
    def mapper(self, _, line):
        city, temp = line.split(',')
        yield city, (int(temp), 1)
    
    def reducer(self, city, temp_counts):
        total_temp = sum(t for t, c in temp_counts)
        total_count = sum(c for t, c in temp_counts)
        yield city, total_temp / total_count

Result: Correct per-city averages!

Speaker context: This session transitions from MapReduce theory to hands-on coding practice using mrjob, a Pythonic library that simplifies MapReduce development. The audience includes students from CS, business, and math backgrounds—all with SQL experience but varying programming comfort levels. We'll leverage their SQL intuition (GROUP BY, aggregation) while introducing mrjob's elegant class-based approach. Emphasize local testing workflows to build confidence before cluster submission.

Speaker notes: Many students with business/math backgrounds are more comfortable with Python than Java. mrjob provides a Pythonic API that feels natural while hiding Hadoop complexity. Emphasize portability—write once, run anywhere.

Speaker notes: Show live installation. Mention that Docker environment has mrjob pre-installed. For students' personal machines, they can use pip install --user if they lack admin rights.

Speaker notes: Live code wordcount.py step-by-step. Start with basic class structure, add mapper, then reducer. Emphasize how much cleaner this is than stdin/stdout scripts. For business students, relate class methods to defining functions in Excel VBA or SQL procedures.

Speaker notes: Live demo YARN UI navigation. Show completed job counters. For business students, relate to SQL query execution plans or ETL pipeline dashboards.

Speaker notes: Critical hands-on demo. Go slowly, explain each step before executing. Pause to let students catch up. Point out expected timing (small file = 30-60 seconds). Show mrjob's progress output in terminal.