Spark DataFrames & the Structured API

CS 6500 — Week 6, Session 1

Week 5 Recap

RDD Essentials:

  • Transformations (lazy) vs. Actions (eager)
  • Pair RDDs: reduceByKey >> groupByKey
  • Caching for repeated queries
  • Spark UI for debugging

How did the Spark RDD homework go? Any questions on RDD programming?

Project proposals submitted? Feedback coming next week.

The Problem with RDDs

# RDD: Calculate average price per category
rdd.map(lambda l: l.split(",")) \
   .map(lambda p: (p[3], (float(p[5]), 1))) \
   .reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])) \
   .mapValues(lambda x: x[0]/x[1])

What's wrong here?

  • Spark has no idea what's in those tuples
  • Can't optimize — doesn't know column types or intent
  • Errors only caught at runtime (wrong index → crash)
  • Verbose and hard to read

The DataFrame Solution

# DataFrame: Same computation
df.groupBy("category").agg(avg("price"))

What changed?

  • Spark knows the schema (column names, types)
  • Catalyst optimizer rewrites your query for best performance
  • Errors caught at planning time (misspelled column → immediate error)
  • Concise and readable

Why DataFrames?

Dimension RDD DataFrame
Schema None (opaque objects) Named, typed columns
Optimization Manual (you optimize) Automatic (Catalyst)
Code length Verbose lambdas Concise, declarative
Error detection Runtime Planning time
Cross-language API differs per language Same API everywhere

Rule of thumb: Use DataFrames by default. Use RDDs when you need low-level control.

Spark's Evolution

2012: RDDs          → Low-level, full control
2015: DataFrames    → Schema + optimization
2016: Datasets      → Type-safe DataFrames (Scala/Java only)
Today: DataFrames   → Recommended for most use cases

DataFrames aren't replacing RDDs — they're built on top of them.

Every DataFrame operation compiles down to optimized RDD code under the hood.

What Is a DataFrame?

A distributed collection of Row objects organized into named columns.

  • Like a SQL table — columns with names and types
  • Like a Pandas DataFrame — but distributed across a cluster
  • Like an RDD — immutable, lazy, partitioned
+------+---+-------------+
| name | age| department |
+------+---+-------------+
| Alice| 28| Engineering |
| Bob  | 35| Sales       |
+------+---+-------------+

Schema: name: String, age: Integer, department: String

SparkSession: The New Entry Point

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("DataFrame Demo") \
    .getOrCreate()

SparkSession replaces SparkContext for structured APIs.

  • Combines SQLContext + HiveContext + SparkContext
  • One unified entry point for all Spark operations
  • Access SparkContext via spark.sparkContext when needed

Creating DataFrames

From a collection:

data = [("Alice", 28, "Engineering"), ("Bob", 35, "Sales")]
df = spark.createDataFrame(data, ["name", "age", "department"])

From a CSV file:

df = spark.read.csv("hdfs:///datasets/data.csv",
                     header=True, inferSchema=True)

From JSON / Parquet:

df = spark.read.json("hdfs:///datasets/data.json")
df = spark.read.parquet("hdfs:///datasets/data.parquet")

Schema: Inferred vs. Explicit

Inferred Explicit
How inferSchema=True Pass a StructType object
Speed Slower (extra data pass) Faster (no scan needed)
Safety May guess wrong types Guaranteed correct types
Use for Exploration Production
# Explicit schema (preferred for production)
schema = StructType([StructField("name", StringType()),
                     StructField("age", IntegerType())])
df = spark.read.csv("data.csv", header=True, schema=schema)

Inspecting a DataFrame

df.printSchema()     # Column names and types
df.show(5)           # First 5 rows (formatted table)
df.columns           # List of column names
df.count()           # Total row count (action!)
df.describe().show() # Summary statistics

These are your first tools when exploring any dataset.

Live Demo: Loading Transaction Data

transactions = spark.read.csv(
    "hdfs:///datasets/ecommerce/transactions.csv",
    header=True, inferSchema=True)

transactions.printSchema()
transactions.show(5)
print(f"Rows: {transactions.count()}")

Basic Operations: Select & Filter

Select columns:

transactions.select("user_id", "category", "price").show(5)

Filter rows:

# Column object syntax
transactions.filter(transactions["price"] > 100).show(5)

# SQL string syntax (same result)
transactions.filter("price > 100").show(5)

# col() function (preferred for complex expressions)
from pyspark.sql.functions import col
transactions.filter(col("price") > 100).show(5)

Adding & Transforming Columns

# Add / compute a column
df = transactions.withColumn("tax", col("price") * 0.08)
df = df.withColumn("total", col("price") * col("quantity"))

# Rename / drop
df = df.withColumnRenamed("price", "unit_price")
df = df.drop("unnecessary_column")

Key pattern: withColumn adds or replaces; returns a new DataFrame (immutable).

Sorting and Limiting

Sort (ascending):

transactions.orderBy("price").show(5)

Sort (descending):

transactions.orderBy(col("price").desc()).show(5)

Top N results:

transactions.orderBy(col("price").desc()).limit(10).show()

All transformations — lazy until an action triggers execution.

Column Selection: Three Syntaxes

# 1. String (simplest)
df.select("name", "age")

# 2. Bracket notation
df.select(df["name"], df["age"])

# 3. col() function (best for expressions)
from pyspark.sql.functions import col
df.select(col("name"), (col("age") + 1).alias("next_age"))

Recommendation: Use col() for anything beyond simple selects. It handles expressions, aliases, and complex logic cleanly.

Activity: DataFrame Exploration (15 min)

Using the weather dataset from Week 5:

  1. Read hdfs:///datasets/weather/observations.csv into a DataFrame
  2. Display schema and first 10 rows
  3. Filter for temperatures below freezing (< 32°F)
  4. Add a column temp_celsius = (temperature − 32) × 5/9
  5. Count observations with precipitation > 0

Compare: How does this feel vs. the RDD approach from last week?

Deliverable: Working code snippet

Activity Solution (1/2) — Load, Inspect & Filter

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName("Weather Exploration").getOrCreate()

# 1. Read into a DataFrame
weather = spark.read.csv("hdfs:///datasets/weather/observations.csv",
                         header=True, inferSchema=True)

# 2. Display schema and first 10 rows
weather.printSchema()
weather.show(10)

# 3. Filter for temperatures below freezing
freezing = weather.filter(col("temperature") < 32)
freezing.show(5)
print(f"Below-freezing observations: {freezing.count()}")

Activity Solution (2/2) — Transform & Aggregate

# 4. Add Celsius column
weather_c = weather.withColumn("temp_celsius",
    (col("temperature") - 32) * 5 / 9)
weather_c.select("station_id", "date",
                 "temperature", "temp_celsius").show(5)

# 5. Count observations with precipitation > 0
rainy = weather.filter(col("precipitation") > 0).count()
print(f"Observations with precipitation: {rainy}")

Compare with RDDs: No tuples, no lambdas, no manual indexing.

Ask: "How many lines would this be with RDDs?" — much longer and harder to read.

Activity Debrief

Common observations:

  • printSchema() immediately shows column names — no guessing indexes
  • Filter syntax reads like English: col("temp") < 32
  • No tuples, no lambdas for basic operations
  • Errors are more descriptive (column name vs. index out of range)

Key takeaway: DataFrames let you focus on what you want, not how to get it.

RDD vs. DataFrame: Temperature Average

RDD (Week 5):

data.map(lambda l: l.split(',')) \
    .map(lambda p: (p[0], (float(p[2]), 1))) \
    .reduceByKey(lambda a,b: (a[0]+b[0], a[1]+b[1])) \
    .mapValues(lambda x: x[0]/x[1])

DataFrame (today):

df.groupBy("station_id").agg(avg("temperature"))

Same result. One line. Automatically optimized.

Key Takeaways

  1. DataFrames = RDDs + schema + optimizer — not a replacement, an upgrade
  2. SparkSession is the unified entry point for structured APIs
  3. Explicit schemas beat inference in production (faster, safer)
  4. Declarative API — say what you want, Catalyst figures out how
  5. Multiple syntaxes for column access; col() is most versatile

Preview: Session 2

Spark SQL and advanced operations:

  • Register DataFrames as SQL tables — query with familiar SQL
  • Aggregations: groupBy, agg, multi-function summaries
  • Joins: inner, left, right, cross
  • Window functions: ranking, running totals, lag/lead
  • The Catalyst optimizer — how Spark optimizes your queries

Come ready to write SQL on big data!

References

  • Armbrust et al. (2015): "Spark SQL: Relational Data Processing in Spark"
  • Learning Spark, 2nd Edition (Chapter 4: Structured APIs)
  • Apache Spark Documentation: SQL Programming Guide
  • PySpark API: pyspark.sql.DataFrame

Speaker context: Students have spent a week wrestling with RDDs—writing lambdas, managing tuples, and manually optimizing. Now we introduce DataFrames: same distributed engine, but with schema awareness and automatic optimization. The "aha" moment is when they see Catalyst produce better plans than their hand-tuned RDD code. Frame DataFrames not as replacing RDDs but as a higher-level abstraction *built on* RDDs.

Speaker notes: Run live in Jupyter. After printSchema(), point out the inferred types. Ask: "What if price was inferred as String instead of Double? What would happen?" → Aggregations would fail. This motivates explicit schemas.

Speaker notes: Steps 1-3 cover reading data and filtering. Point out how printSchema() immediately tells you the column types — no guessing indexes. The filter reads like plain English.

Speaker notes: Steps 4-5 show withColumn for derived columns and filter+count for aggregation. This is a good moment to reinforce that DataFrames let you focus on *what* you want, not *how* to get it.