Spark Structured Streaming

CS 6500 — Week 14, Session 1

CS 6500 — Big Data Analytics | Week 14

A Card Swipes in a Store — 200ms Later, Approved or Denied

"A stolen credit card makes a purchase at 9:00:00 AM. Your fraud detection batch job runs at 9:30 AM. By then, the fraudster has made eleven more transactions. How do you catch the first one before it clears?"

CS 6500 — Big Data Analytics | Week 14

Today's Answer: Process Events as They Arrive

Batch jobs are excellent for historical analysis — but some decisions can't wait 30 minutes.

Session 1 — The Foundation

  • Why batch processing fails for latency-sensitive use cases
  • Micro-batch vs. continuous processing models
  • The Structured Streaming unified API
  • Input sources and output sinks
  • Live demo: Kafka → Spark → console
  • Tumbling, sliding, and session windows

Session 2 — Putting It to Work

  • Watermarks: handling late-arriving data
  • Output modes: append, update, complete
  • Stream-static and stream-stream joins
  • Checkpointing and fault-tolerant restarts
  • Hands-on lab with IoT sensor data

Structured Streaming is the same DataFrame API you already know — extended to an unbounded, continuously-arriving dataset.

CS 6500 — Big Data Analytics | Week 14

Week 12 Recap

What you built last week:

  • Kafka topics, partitions, replication
  • Python producers publishing JSON events
  • Python consumers with manual offset control
  • Consumer group rebalancing

The connection to this week:

  • Kafka = the pipe that carries events
  • Structured Streaming = the processor at the end of the pipe
  • Offsets you tracked manually → Spark tracks automatically
  • Consumer groups → Spark manages its own group

Assignment 3 (NoSQL Design + Query Federation) — was due at the end of Week 12. Check Canvas if not yet submitted.

CS 6500 — Big Data Analytics | Week 14

Why Batch Isn't Enough

Some decisions expire before the job finishes

CS 6500 — Big Data Analytics | Week 14

The Batch Problem

What happens when the answer arrives too late?

Scenario Batch lag Real cost
Fraud detection 30-minute batch 11 more fraudulent transactions cleared
Ride-sharing demand Hourly batch Surge prediction arrives after the rush
E-commerce trending Daily batch Viral tweet at 2:45 PM missed until tomorrow
Security intrusion Nightly log scan Attacker has been inside for 8 hours

The fundamental mismatch: Batch processing treats data as a bounded snapshot. The real world produces events continuously.

CS 6500 — Big Data Analytics | Week 14

Streaming vs. Batch — The Core Difference

Batch

  • Bounded dataset (finite file or table)
  • Process → produce output → stop
  • Latency: minutes to hours
  • Best for: nightly ETL, historical reports, ML training

Streaming

  • Unbounded dataset (never stops arriving)
  • Process continuously → produce incremental output
  • Latency: milliseconds to seconds
  • Best for: fraud signals, live dashboards, IoT telemetry

The practical boundary is blurry: Micro-batch at 5-second intervals covers most "near real-time" needs without true continuous processing complexity.

CS 6500 — Big Data Analytics | Week 14

Micro-Batch vs. Continuous Processing

Micro-batch (Spark default)

  • Collect events for a trigger interval (e.g., 5 sec)
  • Run a Spark job on the mini-batch
  • Emit results, advance offsets
  • Minimum latency: ~100ms
  • Exactly-once with Kafka + idempotent sinks
  • Supports all aggregations and joins

Continuous processing (experimental)

  • Process each row as it arrives
  • Sub-millisecond latency
  • Supports only stateless map/filter — no aggregations
  • At-least-once semantics only
  • Rarely used in production

Rule of thumb: Use micro-batch for 99% of production workloads. The latency is low enough and the semantics are much stronger.

CS 6500 — Big Data Analytics | Week 14

When to Choose Streaming

Use Structured Streaming when:

  • Sub-minute latency is required
  • Events must trigger real-time actions
  • Data volume is too high to store before processing
  • Windowed aggregations over live event streams

Stick with batch when:

  • Processing historical data
  • Training ML models
  • Full-table joins across large datasets
  • Nightly ETL with no latency requirement

The question to ask: "What is the business cost of a 5-minute delay?" If the answer is "nothing significant" — batch is simpler.

CS 6500 — Big Data Analytics | Week 14

Structured Streaming Architecture

The same DataFrame API — now applied to an endless table

CS 6500 — Big Data Analytics | Week 14

The Unified API

The key insight: A streaming DataFrame is just a table that keeps growing.

Batch

df = spark.read.csv("input/")
result = df.groupBy("user_id").count()
result.write.parquet("output/")

Streaming

df = spark.readStream.format("kafka")...
result = df.groupBy("user_id").count()
result.writeStream.format("console").start()

Same groupBy, same count, same Catalyst optimizer. The only differences: readStream instead of read, writeStream instead of write, and .start() to begin continuous execution.

CS 6500 — Big Data Analytics | Week 14

Input Sources

Source format(...) Use Case
Kafka "kafka" Event streams — the production standard
File (HDFS/S3) "parquet", "csv", "json" Incremental file landing (ETL pipelines)
Socket "socket" Development and testing only (no fault tolerance)
Rate "rate" Benchmarking and demos (generates synthetic rows)

Kafka is the correct answer for production. Socket should never leave your laptop.

raw = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka-broker:9092")
    .option("subscribe", "clicks")
    .option("startingOffsets", "latest")
    .load())
CS 6500 — Big Data Analytics | Week 14

Output Sinks

Sink format(...) Use Case
Console "console" Debugging — never in production
Parquet / CSV "parquet", "csv" Persistent data lake output
Kafka "kafka" Downstream stream consumers
Memory "memory" Unit testing only
Delta Lake "delta" Production ACID streaming sink
query = (result.writeStream
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="5 seconds")
    .start())
CS 6500 — Big Data Analytics | Week 14

The Streaming Query Lifecycle

  1. Driver defines the planreadStream → transformations → writeStream
  2. query.start() — query runs in a background thread
  3. Each trigger interval:
    • Read new data from source (Kafka offsets, new files)
    • Run an incremental Spark job on the micro-batch
    • Write output to sink
    • Commit offsets and checkpoint state
  4. On failure: restart from last checkpoint, replay Kafka offsets from last committed position — no data lost, no duplicates
query.lastProgress   # stats for the most recent micro-batch
query.status         # "is the query running / waiting / error?"
query.stop()         # graceful shutdown
CS 6500 — Big Data Analytics | Week 14

Demo — Kafka → Spark

Building the first streaming pipeline

CS 6500 — Big Data Analytics | Week 14

Demo: Setup

# Verify Kafka and Spark are running
docker-compose ps

# From docker/ directory: reset Week 14 topics and produce one smoke-test event
make week14-reset

# Terminal 2: run automated click producer inside Docker
docker exec -it jupyter env KAFKA_BOOTSTRAP=kafka-broker:9092 \
    python /home/jovyan/week14_clicks/produce_clicks.py --rate 2

# Optional: send exactly 100 events then stop
docker exec -it jupyter env KAFKA_BOOTSTRAP=kafka-broker:9092 \
    python /home/jovyan/week14_clicks/produce_clicks.py --rate 5 --count 100

Keep the producer terminal open — Spark will continuously ingest events while it runs.

CS 6500 — Big Data Analytics | Week 14

Demo: Session Setup

# streaming_demo.py — run in PySpark shell
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType

spark = SparkSession.builder \
    .appName("ClickStreamDemo") \
    .config("spark.jars.packages",
            "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.7") \
    .getOrCreate()

Run this either in the PySpark shell (pyspark --packages ...) or in a Jupyter Notebook cell at http://localhost:8888 (token: bigdata).

CS 6500 — Big Data Analytics | Week 14

Demo: Define Schema

schema = StructType([
    StructField("user_id",    StringType()),
    StructField("page",       StringType()),
    StructField("event_time", StringType()),
])

raw = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka-broker:9092")
    .option("subscribe", "clicks")
    .option("startingOffsets", "latest")
    .load())
CS 6500 — Big Data Analytics | Week 14

Demo: Parse JSON and Start Query

clicks = (raw
    .select(from_json(col("value").cast("string"), schema).alias("d"))
    .select("d.*")
    .withColumn("event_time", to_timestamp("event_time")))

query = (clicks.writeStream
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="5 seconds")
    .start())

query.awaitTermination()

Type these events into the producer terminal:

{"user_id": "u1", "page": "/home",    "event_time": "2026-03-17 09:00:01"}
{"user_id": "u2", "page": "/product", "event_time": "2026-03-17 09:00:03"}
{"user_id": "u1", "page": "/cart",    "event_time": "2026-03-17 09:00:07"}

Output appears ~5 seconds after producing. Each micro-batch shows a batch number.

CS 6500 — Big Data Analytics | Week 14

Windowing on Event Time

Aggregating streams over time intervals

CS 6500 — Big Data Analytics | Week 14

Event Time vs. Processing Time

Processing time

  • When Spark receives the event
  • Easy — just the cluster's wall clock
  • Problem: mobile apps buffer events offline
    • User clicks at 9:00 AM
    • Phone reconnects at 9:15 AM
    • Processing time: 9:15 AM ❌

Event time

  • When the event actually occurred (in the payload)
  • Requires a timestamp field in the data
  • Correctly places the click in the 9:00–9:05 window ✓
  • Requires watermarks to handle late arrivals

Rule: Always window on event time when the timestamp is available. Processing time is only for synthetic/benchmarking streams.

CS 6500 — Big Data Analytics | Week 14

Tumbling Windows

Fixed-size, non-overlapping intervals. Each event belongs to exactly one window.

|---5min---|---5min---|---5min---|
[9:00–9:05][9:05–9:10][9:10–9:15]
from pyspark.sql.functions import window

windowed_counts = clicks.groupBy(
    window(col("event_time"), "5 minutes"),
    col("page")
).count()

Use case: periodic, non-overlapping summaries (billing intervals, per-minute snapshots).

CS 6500 — Big Data Analytics | Week 14

Run Tumbling

tumbling_query = (windowed_counts.writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="5 seconds")
    .start())

Output appears every trigger interval with updated window counts.

CS 6500 — Big Data Analytics | Week 14

Sliding Windows

Overlapping intervals. Each event may belong to multiple windows.

|---10min--|
    |---10min--|
        |---10min--|
← slide: 5 min →
sliding_counts = clicks.groupBy(
    window(col("event_time"), "10 minutes", "5 minutes"),
    col("page")
).count()

sliding_query = (sliding_counts.writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="5 seconds")
    .start())
# window("column", window_duration, slide_duration)

Use case: "Rolling 10-minute click rate, updated every 5 minutes" — smoothed metrics, trend detection, moving averages. Expect more output rows than tumbling (each event counted in multiple windows).

CS 6500 — Big Data Analytics | Week 14

Session Windows

Gap-based intervals — window closes after a period of inactivity.

from pyspark.sql.functions import session_window

session_counts = clicks.groupBy(
    session_window(col("event_time"), "10 minutes"),
    col("user_id")
).count()

session_query = (session_counts.writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="5 seconds")
    .start())
# window closes if no event arrives within 10 minutes

Before starting the next example, stop the previous one (tumbling_query.stop(), sliding_query.stop(), or session_query.stop()) to avoid mixed console output.

user1: [9:00]---[9:02]---[9:04]     [gap > 10min]     [9:25]---[9:27]
       |_______ session 1 _________|                   |__ session 2 __|

Use case: User session analytics, variable-length activity windows. Each user gets their own dynamically-sized session.

CS 6500 — Big Data Analytics | Week 14

Choosing the Right Window

Window Type Shape Event Membership When to Use
Tumbling Fixed, aligned Exactly one window Periodic reports, billing intervals
Sliding Fixed, overlapping Multiple windows Smoothed metrics, rolling averages
Session Variable, gap-based One session per user UX analytics, user journey analysis

Quick decision rule:

  • "Every N minutes, give me a fresh count" → Tumbling
  • "Every 5 minutes, give me the last 15 minutes" → Sliding
  • "How long was each user active?" → Session
CS 6500 — Big Data Analytics | Week 14

Activity: Window Design Challenge

Time: 8 minutes | Individual

For each scenario below, choose the best window type and explain why:

  1. A payment processor wants to count transactions per merchant per hour to detect unusual volume spikes
  2. A streaming platform wants a 30-second rolling average of concurrent viewers, updated every 10 seconds
  3. An e-commerce site wants to track each customer's shopping session — from first page view until 20 minutes of inactivity
  4. A data center monitors server CPU every 10 seconds and wants peak CPU per 5-minute interval

Write your answers — we'll discuss as a class.

CS 6500 — Big Data Analytics | Week 14

Key Takeaways

  • Structured Streaming = batch DataFrame API + readStream/writeStream — the same transformations, same Catalyst, same SQL
  • Micro-batch collects events over a trigger interval, runs a Spark job, emits results — ~100ms minimum latency, exactly-once semantics
  • Event time is when the event occurred (in the payload); processing time is when Spark saw it — always prefer event time
  • Tumbling: one window per interval, non-overlapping
  • Sliding: overlapping windows — each event counted in multiple windows
  • Session: gap-based, dynamic per-entity — closes after inactivity

Session 2: Watermarks, output modes, stream joins, checkpointing — and 60 minutes of hands-on lab

CS 6500 — Big Data Analytics | Week 14

What's Missing?

Spark Structured Streaming processes live events at scale — but it wasn't built for everything

CS 6500 — Big Data Analytics | Week 14

The Gaps Structured Streaming Leaves Open

  • Sub-millisecond latency — micro-batch minimum is ~100ms; financial trading and telemetry alerting need true per-record processing (covered in Week 13)
  • Complex batch transformations — SQL-based data modeling, lineage tracking, and test-driven transformation pipelines are not a streaming concern → dbt
  • Pipeline orchestration — who schedules the streaming job, retries it on failure, coordinates it with batch ETL, and alerts on SLA violations? → Airflow
  • Cross-source federated queries — joining a live stream with historical S3 data at query time → Trino / Athena
CS 6500 — Big Data Analytics | Week 14

What Comes Next

Gap Solution When
Batch SQL transformations with lineage dbt Week 15
Pipeline scheduling and retry logic Apache Airflow Week 15
Production monitoring and SLA enforcement Prometheus + Grafana (beyond course scope)
Cloud-managed streaming at scale AWS Kinesis / MSK (reference)

Spark Structured Streaming is the right tool for micro-batch real-time processing — but a production data platform wraps it with orchestration (Airflow), batch transformation (dbt), and observability (Grafana). That full stack is Week 15.

CS 6500 — Big Data Analytics | Week 14

Warm-up question: "What's the difference between a Kafka consumer and a streaming processor?" Answer: consumer reads events and handles them one-at-a-time in application code; streaming processor applies transformations, aggregations, and joins over a continuous stream using a query engine.

Ask: "Which of these would your company lose money over in a real-time vs. 30-minute scenario?"

Note: continuous processing was added in Spark 2.3 but has seen limited adoption. Flink owns the true-streaming market.

Point out: if students already know the DataFrame API from weeks 5-6, they already know ~80% of Structured Streaming. The new concepts are sources, sinks, triggers, output modes, and watermarks.

Trigger options: processingTime="5 seconds" (most common), once=True (process all available, stop), availableNow=True (Spark 3.3+), continuous="1 second" (experimental).

Emphasize: the checkpoint is what makes streaming reliable. Without it, a restart would re-process from the beginning or skip events.

5 minutes. Get this running before class. If Kafka is slow to start, have a backup socket-source version ready.

Point out: raw.isStreaming == True. The DataFrame API is identical — just readStream instead of read.

Watch for: students not seeing output. Usually: wrong topic name, Kafka not running, or startingOffsets="latest" when data was already there. Switch to "earliest" if needed.

Classic misconception: "why can't I just use when Spark sees it?" — because network delays, mobile buffering, and batch uploads mean events arrive out of order. Event time is ground truth.

Session windows were added in Spark 3.2. Require watermarks in production — otherwise state grows unboundedly.

Answers: 1=Tumbling 1hr, 2=Sliding 30s/10s, 3=Session 20min, 4=Tumbling 5min. Ask students to justify their choice — the "why" matters more than the label.