Previous slide Next slide Toggle fullscreen Open presenter view
RDD Programming with PySpark
CS 6500 — Week 5, Session 2
Warm-Up: RDD Lineage & Fault Tolerance
What is lineage?
The complete record of transformations used to build an RDD from its source.
textFile("data.txt") ──→ flatMap(split) ──→ map(lower) ──→ filter(len > 3)
parent parent parent current
Think about it:
Why does Spark record every transformation instead of just saving intermediate results?
How is this different from MapReduce's approach to fault tolerance?
RDD Lineage & Fault Tolerance
How lineage enables fault tolerance:
If a partition is lost (node crash), Spark replays transformations for that partition only
No replication needed for intermediate data (unlike MapReduce disk writes)
Trade-off: recomputation time vs. replication storage
MapReduce vs. Spark fault tolerance:
MapReduce
Spark
Intermediate data
Written to disk (HDFS)
Recorded as lineage graph
Recovery method
Re-read from replicated disk
Recompute lost partitions
Cost
Storage + I/O overhead
CPU time for recomputation
Narrow vs. Wide Dependencies
Narrow dependency: Each parent partition → at most one child partition
map, filter, flatMap, union
Can be pipelined in a single stage
No shuffle required
Wide dependency: Each parent partition → multiple child partitions
reduceByKey, groupByKey, join, repartition
Requires shuffle (data movement across network)
Creates new stage boundary
Narrow: map ──→ filter ──→ map (same stage, pipelined)
Wide: map ──→ reduceByKey (stage boundary at reduceByKey)
Key-Value Pair RDDs
Most real-world Spark programs use Pair RDDs: RDD[(Key, Value)]
Creating Pair RDDs:
lines = sc.textFile("sales.csv" )
pairs = lines.map (lambda l: (l.split("," )[0 ], float (l.split("," )[2 ])))
SQL Analogy:
Pair RDD ≈ table with two columns (key, value)
Pair operations ≈ GROUP BY, JOIN, ORDER BY
Essential Pair RDD Operations
Operation
Description
SQL Analogy
reduceByKey(f)
Aggregate values per key
GROUP BY + SUM
groupByKey()
Group all values per key
GROUP BY
mapValues(f)
Transform values only
SELECT key, f(value)
keys()
Extract all keys
SELECT DISTINCT key
values()
Extract all values
SELECT value
sortByKey()
Sort by key
ORDER BY key
join(other)
Inner join on key
INNER JOIN
countByKey()
Count per key (action!)
GROUP BY + COUNT
Critical: reduceByKey vs. groupByKey
reduceByKey
groupByKey
Behavior
Combines locally, then shuffles
Shuffles ALL values, then groups
Network
Minimal (pre-aggregated)
Maximal (every value sent)
Memory
Low (only aggregated values)
High (all values in memory)
Use for
Aggregation (sum, count, max)
When you need all values per key
Bottom line: If you're aggregating, always reach for reduceByKey.
reduceByKey vs. groupByKey: Visual
Step
reduceByKey
groupByKey
Partition 1
(a,1) (a,1) (b,1)
(a,1) (a,1) (b,1)
Local combine
(a,2) (b,1)
(no combine)
↓ Shuffle
2 records sent
3 records sent (3× more!)
Final result
(a, 5)
(a, [1,1,1,1,1]) → sum → (a, 5)
On large datasets, reduceByKey can be 10× faster
Live Coding: Word Count in Spark
Compare to your 50+ line MapReduce version from Week 3:
lines = sc.textFile("hdfs://namenode:9000/user/student/pg77849.txt" )
words = lines.flatMap(lambda line: line.split())
pairs = words.map (lambda word: (word.lower(), 1 ))
counts = pairs.reduceByKey(lambda a, b: a + b)
top_20 = counts.sortBy(lambda x: x[1 ], ascending=False ).take(20 )
5 lines of logic. Same result, radically less code.
Let's run this live and inspect the Spark UI...
Word Count: Line-by-Line (1/2)
lines = sc.textFile("..." )
→ RDD of strings, one per line. Transformation (lazy).
words = lines.flatMap(lambda line: line.split())
→ Split each line → flatten into single RDD of words. Transformation.
pairs = words.map (lambda word: (word.lower(), 1 ))
→ Create pair RDD: (word, 1). Transformation.
All three are narrow dependencies — pipelined in a single stage.
Word Count: Line-by-Line (2/2)
counts = pairs.reduceByKey(lambda a, b: a + b)
→ Sum counts per word. Transformation (but triggers shuffle — wide dependency).
top_20 = counts.sortBy(lambda x: x[1 ], ascending=False ).take(20 )
→ Sort descending, return top 20 to driver. Action — NOW everything executes!
Key takeaway: Nothing runs until an action (take, collect, count) is called.
Spark builds the entire plan first, then optimizes and executes.
Spark UI: What Just Happened?
Job 0 (triggered by take):
Stage 0: textFile → flatMap → map (narrow transforms, pipelined)
Stage 1: reduceByKey → sortBy → take (after shuffle)
Look for:
Shuffle Read/Write bytes (how much data moved)
Task count per stage (= partition count)
Task duration (any skew?)
Demo: Let's look at the Spark UI together...
MapReduce vs. Spark: Side-by-Side
Aspect
MapReduce (mrjob)
Spark (PySpark)
Code lines
~50
~5
Paradigm
Class-based (mapper/reducer)
Functional chain
Intermediate data
Disk
Memory
Execution
Submit → wait → check
Interactive / notebook
Iteration
New job per iteration
In-memory loop
Debugging
Log files
Spark UI + REPL
Same computation model, radically different developer experience.
Hands-On: Temperature Analysis Challenge
Task: Calculate average temperature per station using Spark RDDs
Dataset: weather_data.csv (same as Week 3!)
station_id,date,temperature,humidity
KNYC,2024-01-15,32,65
KLAX,2024-01-15,68,45
KORD,2024-01-15,18,72
Your mission (15 minutes):
Load the CSV file as an RDD
Skip the header line
Parse each line and extract (station_id, temperature)
Calculate average temperature per station
Print results
Hint: To compute average, you need both sum and count per key.
Temperature Analysis: Starter Code
lines = sc.textFile("hdfs://namenode:9000/datasets/weather/observations.csv" )
Work individually or with a partner. Time: 15 minutes
Hint: You need (sum, count) per key to compute an average with reduceByKey.
Temperature Analysis: Solution
data = lines.filter (lambda line: not line.startswith("station_id" ))
averages = (data
.map (lambda line: line.split(',' ))
.map (lambda p: (p[0 ], (float (p[2 ]), 1 )))
.reduceByKey(lambda a, b: (a[0 ]+b[0 ], a[1 ]+b[1 ]))
.mapValues(lambda x: x[0 ] / x[1 ]))
for station, avg in averages.collect():
print (f"{station} : {avg:.1 f} °F" )
Key insight: Pack (sum, count) into value → reduceByKey aggregates both → mapValues computes average.
Solution Walkthrough
Why (temp, count) as value?
Average = sum / count
reduceByKey can only combine values — needs both sum and count
The reduceByKey trick for averages:
lambda a, b: (a[0 ] + b[0 ], a[1 ] + b[1 ])
This pattern appears everywhere: averages, weighted sums, running statistics.
Why Not groupByKey?
The same average computed with groupByKey:
averages = (data
.map (lambda line: line.split(',' ))
.map (lambda p: (p[0 ], float (p[2 ])))
.groupByKey()
.mapValues(lambda temps: sum (temps) / len (list (temps))))
Compared to the reduceByKey version:
averages = (data
.map (lambda line: line.split(',' ))
.map (lambda p: (p[0 ], (float (p[2 ]), 1 )))
.reduceByKey(lambda a, b: (a[0 ]+b[0 ], a[1 ]+b[1 ]))
.mapValues(lambda x: x[0 ] / x[1 ]))
groupByKey shuffles every temperature value across the network.
reduceByKey sends only (sum, count) per partition — same combiner concept from MapReduce.
The Recomputation Problem
Each action re-executes the entire lineage from scratch
logs = sc.textFile("hdfs:///logs/" )
errors = logs.filter (lambda l: "ERROR" in l)
print (errors.count())
print (errors.take(10 ))
With lazy evaluation, Spark has no memory of previous results.
Every count(), take(), or collect() triggers a full re-read .
Caching: The Solution
Cache the RDD in memory to avoid redundant computation
errors = logs.filter (lambda l: "ERROR" in l)
errors.cache()
print (errors.count())
print (errors.take(10 ))
Result: ~12 sec first query → ~0.1 sec subsequent queries
Rule of thumb: If you call 2+ actions on the same RDD, cache it.
When to Cache (and When Not To)
Cache when:
RDD is used in multiple actions (queries, iterations)
RDD is expensive to recompute (complex transformations)
Interactive analysis (exploring same dataset)
Don't cache when:
RDD is used only once (no benefit, wastes memory)
RDD is too large for memory (will spill or evict other data)
Data is cheap to recompute (simple filter on local file)
Rule of thumb: If you call two or more actions on the same RDD, cache it.
errors = logs.filter (lambda l: "ERROR" in l)
errors.count()
errors.count()
errors.cache()
errors.count()
errors.count()
Expected: 100× speedup on second query after caching.
Verify: Spark UI → Storage tab shows cached RDD.
Partitioning: Controlling Parallelism
Partitions = units of parallelism. Each partition → one task.
rdd = sc.textFile("data.txt" )
print (rdd.getNumPartitions())
Too few partitions: Underutilized cluster (idle cores)
Too many partitions: Excessive task scheduling overhead
Rule of thumb: 2–4 partitions per CPU core
Controlling Partitions
Increase partitions:
rdd2 = rdd.repartition(100 )
Decrease partitions:
rdd2 = rdd.coalesce(4 )
Key difference:
repartition: shuffle, balanced
coalesce: no shuffle, may be unbalanced
Partitioning Impact on Shuffles
pairs = words.map (lambda w: (w, 1 ))
partitioned = pairs.partitionBy(10 )
partitioned = pairs.partitionBy(10 , lambda key: hash (key) % 10 )
Why this matters:
Join two RDDs with same partitioner → no shuffle (co-located keys)
Different partitioners → full shuffle on both sides
Advanced: Custom partitioner for skewed data (same concept as MapReduce)
Putting It All Together
logs = sc.textFile("hdfs:///user/student/access_logs.txt" )
logs.cache()
errors = (logs.filter (lambda l: " 4" in l or " 5" in l)
.map (lambda l: (l.split()[8 ], 1 )).reduceByKey(lambda a,b: a+b))
urls = (logs.map (lambda l: (l.split()[6 ], 1 ))
.reduceByKey(lambda a,b: a+b).sortBy(lambda x: x[1 ], False ))
Pattern: Cache → multiple queries on same RDD → unpersist when done.
Common Mistakes to Avoid
Mistake
Wrong
Right
collect() on large RDD
huge_rdd.collect() → OOM
huge_rdd.take(100) or saveAsTextFile()
groupByKey for aggregation
rdd.groupByKey().mapValues(sum)
rdd.reduceByKey(lambda a, b: a + b)
Forgetting lazy eval
rdd.map(lambda x: x*2) → no output
rdd.map(lambda x: x*2).collect()
Project Proposal — Due Sunday 11:59 PM
Submit: 3–4 page PDF with problem statement, dataset, approach, team roster, timeline
Key Takeaways
Pair RDDs enable key-value operations: reduceByKey, join, sortByKey
reduceByKey >> groupByKey for aggregation (local combine before shuffle)
Caching eliminates redundant computation across multiple actions
Partitioning controls parallelism — 2–4 partitions per core
Spark word count: 5 lines vs. MapReduce: 50+ lines — same result
What's Next: Week 6
Spark DataFrames and SQL
Higher-level API with schema (like a SQL table)
Catalyst optimizer (automatic query optimization)
SparkSQL for SQL queries on big data
Moving from RDDs to DataFrames (and why)
To Do This Weekend:
Complete Spark RDD homework
Submit Project Proposal
Optional: Review SQL joins and window functions
References
Zaharia et al. (2012): "Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing"
Learning Spark , 2nd Edition (Chapters 3–4)
Apache Spark Documentation: RDD Programming Guide
PySpark API Reference: pyspark.RDD
Speaker context: This session is 80% hands-on coding. Students have the architectural understanding from Session 1—now they need to build muscle memory with RDD operations. The centerpiece is comparing Spark word count to their MapReduce version: 5 lines vs. 50+. We'll then progress to pair RDDs, caching, and partitioning. By end of session, students should feel confident writing basic Spark programs independently. Remind them: Project Proposal is due Sunday!
Speaker notes: Draw on whiteboard. "Think of narrow as a one-to-one pipe. Wide is a many-to-many shuffle—every partition needs to send data to every other partition. That's why wide dependencies are expensive and create stage boundaries."
Speaker notes: This is the #1 performance mistake Spark beginners make. Hammer this point. "Every time you write groupByKey, ask yourself: can I use reduceByKey instead? The answer is almost always yes."
Speaker notes: Switch to Spark UI at localhost:4040. Walk through Jobs tab, click into the job, show stages. Click into Stage 0 to show tasks. Point out shuffle write in Stage 0 and shuffle read in Stage 1. "This shuffle is the reduceByKey—same concept as MapReduce shuffle, but Spark keeps results in memory."
Speaker notes: Ask if anyone used groupByKey. If so, use this slide to compare. "This is the same combiner concept from MapReduce—pre-aggregate locally before shuffling."
Speaker notes: Run this live with time.time() around each call. Check Spark UI Storage tab to confirm data is cached. The dramatic difference sells the concept.
Speaker notes: This is pure Spark execution mechanics—if you understand this, you control performance. Start with the mental model: A partition is the unit of parallelism. One task per partition. More partitions → more parallel tasks. Fewer partitions → fewer tasks. Now explain `repartition(n)`: Spark performs a **full shuffle**, redistributing ALL data across the cluster to create evenly-sized partitions. This is expensive (network + disk I/O) but gives you balanced parallelism. Use when: you need more partitions, you need to fix skew, or you're about to do heavy computation like joins. For `coalesce(n)`: Spark merges existing partitions WITHOUT a shuffle—data stays mostly where it already is. This is cheap but may create uneven partition sizes. Use when: reducing partitions before writing output files, or after a filter that drastically shrank your data. Critical insight: `coalesce()` cannot increase partitions effectively—if you have 10 partitions and call `coalesce(100)`, you'd just get empty partitions because there's no data movement. That's why Spark requires `repartition()` for expansion. Draw on whiteboard: show 8 partitions [P1][P2][P3][P4][P5][P6][P7][P8]. With `coalesce(4)`, adjacent partitions merge: [P1+P2][P3+P4][P5+P6][P7+P8]—no data crosses machines. With `repartition(4)`, everything gets shuffled into a balanced mix across 4 new partitions. Under the hood, `repartition(n)` is just `coalesce(n, shuffle=True)`. Pragmatic rule: most people under-partition and over-coalesce. Keep partitions at 2–4× total cluster cores, repartition before heavy joins/aggregations, and coalesce before writing small output files.