Spark SQL & Advanced DataFrame Operations

CS 6500 — Week 6, Session 2

What Is Spark SQL?

Module for structured data processing with SQL syntax.

  • Write standard SQL against distributed data
  • Same Catalyst optimizer as DataFrame API
  • Supports Hive-compatible queries
  • Unified engine: batch + interactive queries

Key idea: Register a DataFrame as a table → query it with SQL.

Temporary Views

transactions.createOrReplaceTempView("transactions")

result = spark.sql("""
    SELECT category, COUNT(*) as cnt
    FROM transactions
    GROUP BY category
    ORDER BY cnt DESC
""")
result.show()

View scope:

  • createOrReplaceTempView — session-scoped (disappears when session ends)
  • createOrReplaceGlobalTempView — shared across sessions

SQL vs. DataFrame API: Same Result

SQL:

spark.sql("""
    SELECT category, AVG(price) as avg_price
    FROM transactions GROUP BY category
""")

DataFrame API:

transactions.groupBy("category") \
    .agg(avg("price").alias("avg_price"))

Both compile to the same execution plan. Choose whichever reads better for your use case.

Live Demo: Aggregation Queries

# Register view
transactions.createOrReplaceTempView("transactions")

# Revenue by category
spark.sql("""
    SELECT category,
           COUNT(*) as num_txns,
           ROUND(AVG(price), 2) as avg_price,
           SUM(price) as total_revenue
    FROM transactions
    GROUP BY category
    ORDER BY total_revenue DESC
""").show()

If you know SQL, you already know Spark SQL.

Multi-Function Aggregation (DataFrame API)

from pyspark.sql.functions import *

summary = transactions.groupBy("category").agg(
    count("*").alias("count"),
    min("price").alias("min_price"),
    max("price").alias("max_price"),
    avg("price").alias("avg_price"),
    sum("price").alias("total_revenue")
)
summary.show()

Tip: Import pyspark.sql.functions as * or F to access avg, sum, count, etc.

Joins in Spark SQL

users.createOrReplaceTempView("users")

spark.sql("""
    SELECT u.name, u.state,
           COUNT(t.transaction_id) as purchases,
           SUM(t.price) as total_spent
    FROM users u
    LEFT JOIN transactions t ON u.user_id = t.user_id
    GROUP BY u.name, u.state
    ORDER BY total_spent DESC
""").show()

Supported join types: inner, left, right, full, cross, semi, anti

Joins with DataFrame API

result = users.join(transactions, "user_id", "left") \
    .groupBy("name", "state") \
    .agg(
        count("transaction_id").alias("purchases"),
        sum("price").alias("total_spent")
    ).orderBy(col("total_spent").desc())

result.show()

Common pitfall: Ambiguous column names after join.
Fix: use df["col"] or rename before joining.

The Catalyst Optimizer

Four-phase query optimization:

Your Code → Analysis → Logical Optimization → Physical Planning → Code Gen
              ↓              ↓                      ↓               ↓
         Resolve names   Predicate pushdown    Join strategy    JVM bytecode
         & types         Constant folding      Broadcast vs.   Whole-stage
                         Projection pruning    shuffle join     code gen

You write what you want. Catalyst decides how to execute it efficiently.

Catalyst in Action

What you write:

df.filter(col("price") > 100).select("category", "price")

What Catalyst does:

  1. Projection pruning — reads only category and price columns from source
  2. Predicate pushdown — applies filter at data source level (skips irrelevant data)
  3. Reorders operations for minimum data movement

See the plan:

df.filter(col("price") > 100).select("category", "price").explain(True)

Why Catalyst Matters

Approach Developer effort Performance
Hand-tuned RDD High (you optimize) Good (if you're skilled)
Naive DataFrame Low (declarative) Good (Catalyst optimizes)
Tuned DataFrame Medium Excellent

Key insight: Declarative code + optimizer often beats hand-optimized imperative code.

Write readable code. Let Catalyst do the heavy lifting.

Window Functions: The Power Tool

Window functions compute values across a "window" of rows related to the current row.

Use cases:

  • Ranking — top N per group
  • Running totals — cumulative sum
  • Moving averages — sliding window stats
  • Lag/Lead — compare to previous/next row

SQL equivalent: OVER (PARTITION BY ... ORDER BY ...)

Window Function Syntax

from pyspark.sql.window import Window

# Define the window
w = Window.partitionBy("category") \
           .orderBy(col("price").desc())

# Apply a window function
ranked = transactions.withColumn(
    "price_rank", rank().over(w))

# Top 3 most expensive per category
ranked.filter(col("price_rank") <= 3).show()

Three parts: partition (group), order (sort), function (compute).

Running Total Example

w = Window.partitionBy("user_id") \
           .orderBy("timestamp") \
           .rowsBetween(Window.unboundedPreceding,
                        Window.currentRow)

result = transactions.withColumn(
    "running_total", sum("price").over(w))
user_id timestamp price running_total
U501 01-15 50 50
U501 02-10 30 80
U501 03-05 45 125

Common Window Functions

Function Description
rank() Rank with gaps (1, 2, 2, 4)
dense_rank() Rank without gaps (1, 2, 2, 3)
row_number() Unique sequential number
lag(col, n) Value n rows before current
lead(col, n) Value n rows after current
sum().over(w) Running/cumulative sum
avg().over(w) Moving average

All require a window spec defined with Window.partitionBy(...).orderBy(...).

Converting: RDD ↔ DataFrame

DataFrame → RDD:

rdd = df.rdd                  # RDD of Row objects
rdd.first()                   # Row(name='Alice', age=28)

RDD → DataFrame:

df = spark.createDataFrame(rdd, schema)
# or with column names:
df = rdd.toDF(["name", "age"])

When to drop to RDD: unstructured text, custom partitioning, complex control flow not expressible in DataFrame API.

When to Use DataFrames vs. RDDs

Use DataFrames Use RDDs
Structured/semi-structured data Raw text, binary data
Standard analytics (filter, group, join) Custom aggregation logic
SQL-like queries Fine-grained partition control
Production pipelines Legacy RDD codebases
When performance matters most zipWithIndex, glom, etc.

Default choice: DataFrame. Drop to RDD only when necessary.

Activity: E-Commerce Analytics (10 min)

Using transactions and users views, answer any 2:

  1. Which state generates the most revenue?
  2. Average order value per category?
  3. Users with more than 10 purchases?
  4. Month-over-month revenue growth? (window function)

Use SQL or DataFrame API — your choice.

Activity Debrief

Key observations:

  • SQL syntax felt familiar for aggregation + join queries
  • Window functions are powerful but syntax takes practice
  • Both SQL and DataFrame API produce the same execution plan

Discussion: Which API do you prefer and why?

Activity Solutions (1/4)

Q1 — State with most revenue:

spark.sql("""
    SELECT u.state,
           ROUND(SUM(t.price * t.quantity), 2) AS revenue
    FROM transactions t
    JOIN users u ON t.user_id = u.user_id
    GROUP BY u.state
    ORDER BY revenue DESC
    LIMIT 5
""").show()

Activity Solutions (2/4)

Q2 — Average order value per category:

spark.sql("""
    SELECT category,
           ROUND(AVG(price * quantity), 2) AS avg_order_value
    FROM transactions
    GROUP BY category
    ORDER BY avg_order_value DESC
""").show()

Activity Solutions (3/4)

Q3 — Users with more than 10 purchases:

spark.sql("""
    SELECT u.name, COUNT(*) AS purchases
    FROM transactions t
    JOIN users u ON t.user_id = u.user_id
    GROUP BY u.name
    HAVING COUNT(*) > 10
    ORDER BY purchases DESC
""").show()

Activity Solutions (4/4)

Q4 — Month-over-month revenue growth (window function):

spark.sql("""
    SELECT month, revenue,
           ROUND(revenue - LAG(revenue) OVER (ORDER BY month), 2)
               AS growth
    FROM (
        SELECT MONTH(timestamp) AS month,
               SUM(price * quantity) AS revenue
        FROM transactions
        GROUP BY MONTH(timestamp)
    )
    ORDER BY month
""").show()

Saving Results

Write as Parquet (recommended):

df.write.parquet("hdfs:///user/student/output.parquet")

Write as CSV:

df.write.csv("hdfs:///user/student/output.csv", header=True)

Partitioned writes (for large datasets):

df.write.partitionBy("category") \
    .parquet("hdfs:///user/student/output_partitioned/")

Parquet >> CSV: columnar format, compressed, schema preserved, 10× faster reads.

Key Takeaways

  1. Spark SQL — query DataFrames with familiar SQL syntax
  2. SQL and DataFrame API compile identically — choose for readability
  3. Catalyst optimizer — predicate pushdown, projection pruning, join selection
  4. Window functions — ranking, running totals, lag/lead without self-joins
  5. DataFrames by default, RDDs when needed — structured wins

What's Next: Week 7

Dataflow Languages: Hive & Pig

  • HiveQL for data warehousing on Hadoop
  • Pig Latin for ETL pipelines
  • Comparing Spark SQL vs. Hive vs. Pig — when to use each
  • Assignment 2 released (Spark + Dataflow, due end of Week 7)

To Do:

  • 📖 Read: Hive Language Manual (basics)
  • 📖 Read: "Pig Latin: A Not-So-Foreign Language for Data Processing"
  • 📖 Optional: Review SQL joins and window functions

References

  • Armbrust et al. (2015): "Spark SQL: Relational Data Processing in Spark"
  • Learning Spark, 2nd Edition (Chapter 4: Structured APIs)
  • Apache Spark Documentation: SQL Programming Guide
  • PySpark API: pyspark.sql.functions

Speaker context: Students can now create DataFrames, select, filter, and add columns. This session elevates their skills: SQL queries on distributed data, multi-function aggregations, joins, and window functions. The Catalyst optimizer section shows *why* declarative code wins—Spark rewrites their queries automatically. End with RDD↔DataFrame conversion so they know when to drop down to RDDs. Remind: Assignment 2 preview next week.

Speaker notes: Run both live, then show .explain() for each. The plans will be identical. "This is Catalyst at work—it doesn't care how you wrote the query, it optimizes the same way."

Speaker notes: Run .explain(True) live and walk through the physical plan. Point out "PushedFilters" in the scan node. "Spark read fewer bytes from disk because it pushed your filter down to the data source. You didn't ask for this—Catalyst did it automatically."

Speaker notes: Poll the class. Typically SQL-experienced students prefer SQL strings; CS students prefer DataFrame API. Emphasize: there's no wrong answer, both compile identically.