RDD Programming with PySpark

CS 6500 — Week 5, Session 2

Warm-Up: RDD Lineage & Fault Tolerance

What is lineage?
The complete record of transformations used to build an RDD from its source.

textFile("data.txt") ──→ flatMap(split) ──→ map(lower) ──→ filter(len > 3)
      parent                parent              parent          current

🤔 Think about it:

  • Why does Spark record every transformation instead of just saving intermediate results?
  • How is this different from MapReduce's approach to fault tolerance?

RDD Lineage & Fault Tolerance

How lineage enables fault tolerance:

  • If a partition is lost (node crash), Spark replays transformations for that partition only
  • No replication needed for intermediate data (unlike MapReduce disk writes)
  • Trade-off: recomputation time vs. replication storage

MapReduce vs. Spark fault tolerance:

MapReduce Spark
Intermediate data Written to disk (HDFS) Recorded as lineage graph
Recovery method Re-read from replicated disk Recompute lost partitions
Cost Storage + I/O overhead CPU time for recomputation

Narrow vs. Wide Dependencies

Narrow dependency: Each parent partition → at most one child partition

  • map, filter, flatMap, union
  • Can be pipelined in a single stage
  • No shuffle required

Wide dependency: Each parent partition → multiple child partitions

  • reduceByKey, groupByKey, join, repartition
  • Requires shuffle (data movement across network)
  • Creates new stage boundary
Narrow: map ──→ filter ──→ map     (same stage, pipelined)
Wide:   map ──→ reduceByKey         (stage boundary at reduceByKey)

Key-Value Pair RDDs

Most real-world Spark programs use Pair RDDs: RDD[(Key, Value)]

Creating Pair RDDs:

# From text: split line into (key, value) tuples
lines = sc.textFile("sales.csv")
pairs = lines.map(lambda l: (l.split(",")[0], float(l.split(",")[2])))
#                            ^^^^ key (product)  ^^^^ value (price)

SQL Analogy:

  • Pair RDD ≈ table with two columns (key, value)
  • Pair operations ≈ GROUP BY, JOIN, ORDER BY

Essential Pair RDD Operations

Operation Description SQL Analogy
reduceByKey(f) Aggregate values per key GROUP BY + SUM
groupByKey() Group all values per key GROUP BY
mapValues(f) Transform values only SELECT key, f(value)
keys() Extract all keys SELECT DISTINCT key
values() Extract all values SELECT value
sortByKey() Sort by key ORDER BY key
join(other) Inner join on key INNER JOIN
countByKey() Count per key (action!) GROUP BY + COUNT

Critical: reduceByKey vs. groupByKey

reduceByKey ✅ groupByKey ⚠️
Behavior Combines locally, then shuffles Shuffles ALL values, then groups
Network Minimal (pre-aggregated) Maximal (every value sent)
Memory Low (only aggregated values) High (all values in memory)
Use for Aggregation (sum, count, max) When you need all values per key

Bottom line: If you're aggregating, always reach for reduceByKey.

reduceByKey vs. groupByKey: Visual

Step reduceByKey ✅ groupByKey ⚠️
Partition 1 (a,1) (a,1) (b,1) (a,1) (a,1) (b,1)
Local combine (a,2) (b,1) (no combine)
↓ Shuffle 2 records sent 3 records sent (3× more!)
Final result (a, 5) (a, [1,1,1,1,1]) → sum → (a, 5)

On large datasets, reduceByKey can be 10× faster

Live Coding: Word Count in Spark

Compare to your 50+ line MapReduce version from Week 3:

lines = sc.textFile("hdfs://namenode:9000/user/student/pg77849.txt")
words = lines.flatMap(lambda line: line.split())
pairs = words.map(lambda word: (word.lower(), 1))
counts = pairs.reduceByKey(lambda a, b: a + b)
top_20 = counts.sortBy(lambda x: x[1], ascending=False).take(20)

5 lines of logic. Same result, radically less code.

Let's run this live and inspect the Spark UI...

Word Count: Line-by-Line (1/2)

lines = sc.textFile("...")

→ RDD of strings, one per line. Transformation (lazy).

words = lines.flatMap(lambda line: line.split())

→ Split each line → flatten into single RDD of words. Transformation.

pairs = words.map(lambda word: (word.lower(), 1))

→ Create pair RDD: (word, 1). Transformation.

All three are narrow dependencies — pipelined in a single stage.

Word Count: Line-by-Line (2/2)

counts = pairs.reduceByKey(lambda a, b: a + b)

→ Sum counts per word. Transformation (but triggers shuffle — wide dependency).

top_20 = counts.sortBy(lambda x: x[1], ascending=False).take(20)

→ Sort descending, return top 20 to driver. Action — NOW everything executes!

Key takeaway: Nothing runs until an action (take, collect, count) is called.
Spark builds the entire plan first, then optimizes and executes.

Spark UI: What Just Happened?

Job 0 (triggered by take):

  • Stage 0: textFile → flatMap → map (narrow transforms, pipelined)
  • Stage 1: reduceByKey → sortBy → take (after shuffle)

Look for:

  • Shuffle Read/Write bytes (how much data moved)
  • Task count per stage (= partition count)
  • Task duration (any skew?)

Demo: Let's look at the Spark UI together...

MapReduce vs. Spark: Side-by-Side

Aspect MapReduce (mrjob) Spark (PySpark)
Code lines ~50 ~5
Paradigm Class-based (mapper/reducer) Functional chain
Intermediate data Disk Memory
Execution Submit → wait → check Interactive / notebook
Iteration New job per iteration In-memory loop
Debugging Log files Spark UI + REPL

Same computation model, radically different developer experience.

Hands-On: Temperature Analysis Challenge

Task: Calculate average temperature per station using Spark RDDs

Dataset: weather_data.csv (same as Week 3!)

station_id,date,temperature,humidity
KNYC,2024-01-15,32,65
KLAX,2024-01-15,68,45
KORD,2024-01-15,18,72

Your mission (15 minutes):

  1. Load the CSV file as an RDD
  2. Skip the header line
  3. Parse each line and extract (station_id, temperature)
  4. Calculate average temperature per station
  5. Print results

Hint: To compute average, you need both sum and count per key.

Temperature Analysis: Starter Code

lines = sc.textFile("hdfs://namenode:9000/datasets/weather/observations.csv")

# TODO:
# 1. Skip header   2. Parse CSV   3. Extract (station, temp)
# 4. Average per station   5. Print results

Work individually or with a partner. Time: 15 minutes ⏱️

Hint: You need (sum, count) per key to compute an average with reduceByKey.

Temperature Analysis: Solution

data = lines.filter(lambda line: not line.startswith("station_id"))

averages = (data
    .map(lambda line: line.split(','))
    .map(lambda p: (p[0], (float(p[2]), 1)))
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
    .mapValues(lambda x: x[0] / x[1]))

for station, avg in averages.collect():
    print(f"{station}: {avg:.1f}°F")

Key insight: Pack (sum, count) into value → reduceByKey aggregates both → mapValues computes average.

Solution Walkthrough

Why (temp, count) as value?

  • Average = sum / count
  • reduceByKey can only combine values — needs both sum and count

The reduceByKey trick for averages:

# (sum1, count1) + (sum2, count2) = (sum1+sum2, count1+count2)
lambda a, b: (a[0] + b[0], a[1] + b[1])

This pattern appears everywhere: averages, weighted sums, running statistics.

Why Not groupByKey?

The same average computed with groupByKey:

averages = (data
    .map(lambda line: line.split(','))
    .map(lambda p: (p[0], float(p[2])))
    .groupByKey()
    .mapValues(lambda temps: sum(temps) / len(list(temps))))

Compared to the reduceByKey version:

averages = (data
    .map(lambda line: line.split(','))
    .map(lambda p: (p[0], (float(p[2]), 1)))
    .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1]))
    .mapValues(lambda x: x[0] / x[1]))

⚠️ groupByKey shuffles every temperature value across the network.
✅ reduceByKey sends only (sum, count) per partition — same combiner concept from MapReduce.

The Recomputation Problem

Each action re-executes the entire lineage from scratch

logs = sc.textFile("hdfs:///logs/")     # 500GB
errors = logs.filter(lambda l: "ERROR" in l)

# Each action re-reads 500GB from HDFS!
print(errors.count())          # Read 500GB, filter, count
print(errors.take(10))         # Read 500GB again!

⚠️ With lazy evaluation, Spark has no memory of previous results.
Every count(), take(), or collect() triggers a full re-read.

Caching: The Solution

Cache the RDD in memory to avoid redundant computation

errors = logs.filter(lambda l: "ERROR" in l)
errors.cache()                 # Mark for caching

print(errors.count())          # Read 500GB, filter, cache, count
print(errors.take(10))         # Read from cache! (milliseconds)

Result: ~12 sec first query → ~0.1 sec subsequent queries ⚡

Rule of thumb: If you call 2+ actions on the same RDD, cache it.

Storage Levels

Level Memory Disk Serialized Copies
MEMORY_ONLY ✅ ❌ ❌ 1
MEMORY_AND_DISK ✅ ✅ (spillover) ❌ 1
MEMORY_ONLY_SER ✅ ❌ ✅ 1
DISK_ONLY ❌ ✅ ✅ 1
MEMORY_ONLY_2 ✅ ❌ ❌ 2
from pyspark import StorageLevel

rdd.cache()                                   # = MEMORY_ONLY
rdd.persist(StorageLevel.MEMORY_AND_DISK)     # Spill to disk if needed
rdd.unpersist()                               # Free memory

Default .cache() = MEMORY_ONLY — good for most cases.

When to Cache (and When Not To)

✅ Cache when:

  • RDD is used in multiple actions (queries, iterations)
  • RDD is expensive to recompute (complex transformations)
  • Interactive analysis (exploring same dataset)

❌ Don't cache when:

  • RDD is used only once (no benefit, wastes memory)
  • RDD is too large for memory (will spill or evict other data)
  • Data is cheap to recompute (simple filter on local file)

Rule of thumb: If you call two or more actions on the same RDD, cache it.

Example: Caching Performance

errors = logs.filter(lambda l: "ERROR" in l)

errors.count()   # ~12 sec (reads from HDFS)
errors.count()   # ~12 sec (reads from HDFS again!)

errors.cache()
errors.count()   # ~12 sec (reads + caches)
errors.count()   # ~0.1 sec ⚡ (from memory!)

Expected: 100× speedup on second query after caching.
Verify: Spark UI → Storage tab shows cached RDD.

Partitioning: Controlling Parallelism

Partitions = units of parallelism. Each partition → one task.

rdd = sc.textFile("data.txt")
print(rdd.getNumPartitions())  # Usually = number of HDFS blocks

Too few partitions: Underutilized cluster (idle cores)
Too many partitions: Excessive task scheduling overhead

Rule of thumb: 2–4 partitions per CPU core

Controlling Partitions

Increase partitions:

rdd2 = rdd.repartition(100)   # Triggers shuffle!

Decrease partitions:

rdd2 = rdd.coalesce(4)        # No shuffle (merge)

Key difference:

  • repartition: shuffle, balanced
  • coalesce: no shuffle, may be unbalanced

Partitioning Impact on Shuffles

# Default partitioning: hash(key) % numPartitions
pairs = words.map(lambda w: (w, 1))

# Explicit hash partitioning (no import needed)
partitioned = pairs.partitionBy(10)  # 10 partitions, hash-based

# Custom partitioner: provide number + partition function
partitioned = pairs.partitionBy(10, lambda key: hash(key) % 10)

Why this matters:

  • Join two RDDs with same partitioner → no shuffle (co-located keys)
  • Different partitioners → full shuffle on both sides

Advanced: Custom partitioner for skewed data (same concept as MapReduce)

Putting It All Together

logs = sc.textFile("hdfs:///user/student/access_logs.txt")
logs.cache()                            # reused below

# Query 1: errors by status code
errors = (logs.filter(lambda l: " 4" in l or " 5" in l)
    .map(lambda l: (l.split()[8], 1)).reduceByKey(lambda a,b: a+b))

# Query 2: top 10 URLs
urls = (logs.map(lambda l: (l.split()[6], 1))
    .reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[1], False))

Pattern: Cache → multiple queries on same RDD → unpersist when done.

Common Mistakes to Avoid

Mistake ❌ Wrong ✅ Right
collect() on large RDD huge_rdd.collect() → OOM huge_rdd.take(100) or saveAsTextFile()
groupByKey for aggregation rdd.groupByKey().mapValues(sum) rdd.reduceByKey(lambda a, b: a + b)
Forgetting lazy eval rdd.map(lambda x: x*2) → no output rdd.map(lambda x: x*2).collect()

🎯 Project Proposal — Due Sunday 11:59 PM

Submit: 3–4 page PDF with problem statement, dataset, approach, team roster, timeline

Key Takeaways

  1. Pair RDDs enable key-value operations: reduceByKey, join, sortByKey
  2. reduceByKey >> groupByKey for aggregation (local combine before shuffle)
  3. Caching eliminates redundant computation across multiple actions
  4. Partitioning controls parallelism — 2–4 partitions per core
  5. Spark word count: 5 lines vs. MapReduce: 50+ lines — same result

What's Next: Week 6

Spark DataFrames and SQL

  • Higher-level API with schema (like a SQL table)
  • Catalyst optimizer (automatic query optimization)
  • SparkSQL for SQL queries on big data
  • Moving from RDDs to DataFrames (and why)

To Do This Weekend:

  • ✅ Complete Spark RDD homework
  • ✅ Submit Project Proposal
  • 📖 Optional: Review SQL joins and window functions

References

  • Zaharia et al. (2012): "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"
  • Learning Spark, 2nd Edition (Chapters 3–4)
  • Apache Spark Documentation: RDD Programming Guide
  • PySpark API Reference: pyspark.RDD

Speaker context: This session is 80% hands-on coding. Students have the architectural understanding from Session 1—now they need to build muscle memory with RDD operations. The centerpiece is comparing Spark word count to their MapReduce version: 5 lines vs. 50+. We'll then progress to pair RDDs, caching, and partitioning. By end of session, students should feel confident writing basic Spark programs independently. Remind them: Project Proposal is due Sunday!

Speaker notes: Draw on whiteboard. "Think of narrow as a one-to-one pipe. Wide is a many-to-many shuffle—every partition needs to send data to every other partition. That's why wide dependencies are expensive and create stage boundaries."

Speaker notes: This is the #1 performance mistake Spark beginners make. Hammer this point. "Every time you write groupByKey, ask yourself: can I use reduceByKey instead? The answer is almost always yes."

Speaker notes: Switch to Spark UI at localhost:4040. Walk through Jobs tab, click into the job, show stages. Click into Stage 0 to show tasks. Point out shuffle write in Stage 0 and shuffle read in Stage 1. "This shuffle is the reduceByKey—same concept as MapReduce shuffle, but Spark keeps results in memory."

Speaker notes: Ask if anyone used groupByKey. If so, use this slide to compare. "This is the same combiner concept from MapReduce—pre-aggregate locally before shuffling."

Speaker notes: Run this live with time.time() around each call. Check Spark UI Storage tab to confirm data is cached. The dramatic difference sells the concept.

Speaker notes: This is pure Spark execution mechanics—if you understand this, you control performance. Start with the mental model: A partition is the unit of parallelism. One task per partition. More partitions → more parallel tasks. Fewer partitions → fewer tasks. Now explain `repartition(n)`: Spark performs a **full shuffle**, redistributing ALL data across the cluster to create evenly-sized partitions. This is expensive (network + disk I/O) but gives you balanced parallelism. Use when: you need more partitions, you need to fix skew, or you're about to do heavy computation like joins. For `coalesce(n)`: Spark merges existing partitions WITHOUT a shuffle—data stays mostly where it already is. This is cheap but may create uneven partition sizes. Use when: reducing partitions before writing output files, or after a filter that drastically shrank your data. Critical insight: `coalesce()` cannot increase partitions effectively—if you have 10 partitions and call `coalesce(100)`, you'd just get empty partitions because there's no data movement. That's why Spark requires `repartition()` for expansion. Draw on whiteboard: show 8 partitions [P1][P2][P3][P4][P5][P6][P7][P8]. With `coalesce(4)`, adjacent partitions merge: [P1+P2][P3+P4][P5+P6][P7+P8]—no data crosses machines. With `repartition(4)`, everything gets shuffled into a balanced mix across 4 new partitions. Under the hood, `repartition(n)` is just `coalesce(n, shuffle=True)`. Pragmatic rule: most people under-partition and over-coalesce. Keep partitions at 2–4× total cluster cores, repartition before heavy joins/aggregations, and coalesce before writing small output files.