Previous slide Next slide Toggle fullscreen Toggle overview view Open presenter view
Spark Structured Streaming
CS 6500 — Week 14, Session 1
CS 6500 — Big Data Analytics | Week 14
A Card Swipes in a Store — 200ms Later, Approved or Denied
"A stolen credit card makes a purchase at 9:00:00 AM. Your fraud detection batch job runs at 9:30 AM. By then, the fraudster has made eleven more transactions. How do you catch the first one before it clears?"
CS 6500 — Big Data Analytics | Week 14
Today's Answer: Process Events as They Arrive
Batch jobs are excellent for historical analysis — but some decisions can't wait 30 minutes.
Session 1 — The Foundation
Why batch processing fails for latency-sensitive use cases
Micro-batch vs. continuous processing models
The Structured Streaming unified API
Input sources and output sinks
Live demo: Kafka → Spark → console
Tumbling, sliding, and session windows
Session 2 — Putting It to Work
Watermarks: handling late-arriving data
Output modes: append, update, complete
Stream-static and stream-stream joins
Checkpointing and fault-tolerant restarts
Hands-on lab with IoT sensor data
Structured Streaming is the same DataFrame API you already know — extended to an unbounded, continuously-arriving dataset.
CS 6500 — Big Data Analytics | Week 14
Week 12 Recap
What you built last week:
Kafka topics, partitions, replication
Python producers publishing JSON events
Python consumers with manual offset control
Consumer group rebalancing
The connection to this week:
Kafka = the pipe that carries events
Structured Streaming = the processor at the end of the pipe
Offsets you tracked manually → Spark tracks automatically
Consumer groups → Spark manages its own group
Assignment 3 (NoSQL Design + Query Federation) — was due at the end of Week 12. Check Canvas if not yet submitted.
CS 6500 — Big Data Analytics | Week 14
Why Batch Isn't Enough
Some decisions expire before the job finishes
CS 6500 — Big Data Analytics | Week 14
The Batch Problem
What happens when the answer arrives too late?
Scenario
Batch lag
Real cost
Fraud detection
30-minute batch
11 more fraudulent transactions cleared
Ride-sharing demand
Hourly batch
Surge prediction arrives after the rush
E-commerce trending
Daily batch
Viral tweet at 2:45 PM missed until tomorrow
Security intrusion
Nightly log scan
Attacker has been inside for 8 hours
The fundamental mismatch: Batch processing treats data as a bounded snapshot. The real world produces events continuously.
CS 6500 — Big Data Analytics | Week 14
Streaming vs. Batch — The Core Difference
Batch
Bounded dataset (finite file or table)
Process → produce output → stop
Latency: minutes to hours
Best for: nightly ETL, historical reports, ML training
Streaming
Unbounded dataset (never stops arriving)
Process continuously → produce incremental output
Latency: milliseconds to seconds
Best for: fraud signals, live dashboards, IoT telemetry
The practical boundary is blurry: Micro-batch at 5-second intervals covers most "near real-time" needs without true continuous processing complexity.
CS 6500 — Big Data Analytics | Week 14
Micro-Batch vs. Continuous Processing
Micro-batch (Spark default)
Collect events for a trigger interval (e.g., 5 sec)
Run a Spark job on the mini-batch
Emit results, advance offsets
Minimum latency: ~100ms
Exactly-once with Kafka + idempotent sinks
Supports all aggregations and joins
Continuous processing (experimental)
Process each row as it arrives
Sub-millisecond latency
Supports only stateless map/filter — no aggregations
At-least-once semantics only
Rarely used in production
Rule of thumb: Use micro-batch for 99% of production workloads. The latency is low enough and the semantics are much stronger.
CS 6500 — Big Data Analytics | Week 14
When to Choose Streaming
Use Structured Streaming when:
Sub-minute latency is required
Events must trigger real-time actions
Data volume is too high to store before processing
Windowed aggregations over live event streams
Stick with batch when:
Processing historical data
Training ML models
Full-table joins across large datasets
Nightly ETL with no latency requirement
The question to ask: "What is the business cost of a 5-minute delay?" If the answer is "nothing significant" — batch is simpler.
CS 6500 — Big Data Analytics | Week 14
Structured Streaming Architecture
The same DataFrame API — now applied to an endless table
CS 6500 — Big Data Analytics | Week 14
The Unified API
The key insight: A streaming DataFrame is just a table that keeps growing.
Batch
df = spark.read.csv("input/" )
result = df.groupBy("user_id" ).count()
result.write.parquet("output/" )
Streaming
df = spark.readStream.format ("kafka" )...
result = df.groupBy("user_id" ).count()
result.writeStream.format ("console" ).start()
Same groupBy, same count, same Catalyst optimizer. The only differences: readStream instead of read, writeStream instead of write, and .start() to begin continuous execution.
CS 6500 — Big Data Analytics | Week 14
Source
format(...)
Use Case
Kafka
"kafka"
Event streams — the production standard
File (HDFS/S3)
"parquet", "csv", "json"
Incremental file landing (ETL pipelines)
Socket
"socket"
Development and testing only (no fault tolerance)
Rate
"rate"
Benchmarking and demos (generates synthetic rows)
Kafka is the correct answer for production. Socket should never leave your laptop.
raw = (spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "kafka-broker:9092" )
.option("subscribe" , "clicks" )
.option("startingOffsets" , "latest" )
.load())
CS 6500 — Big Data Analytics | Week 14
Output Sinks
Sink
format(...)
Use Case
Console
"console"
Debugging — never in production
Parquet / CSV
"parquet", "csv"
Persistent data lake output
Kafka
"kafka"
Downstream stream consumers
Memory
"memory"
Unit testing only
Delta Lake
"delta"
Production ACID streaming sink
query = (result.writeStream
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="5 seconds" )
.start())
CS 6500 — Big Data Analytics | Week 14
The Streaming Query Lifecycle
Driver defines the plan — readStream → transformations → writeStream
query.start() — query runs in a background thread
Each trigger interval:
Read new data from source (Kafka offsets, new files)
Run an incremental Spark job on the micro-batch
Write output to sink
Commit offsets and checkpoint state
On failure: restart from last checkpoint, replay Kafka offsets from last committed position — no data lost, no duplicates
query.lastProgress
query.status
query.stop()
CS 6500 — Big Data Analytics | Week 14
Demo — Kafka → Spark
Building the first streaming pipeline
CS 6500 — Big Data Analytics | Week 14
Demo: Setup
docker-compose ps
make week14-reset
docker exec -it jupyter env KAFKA_BOOTSTRAP=kafka-broker:9092 \
python /home/jovyan/week14_clicks/produce_clicks.py --rate 2
docker exec -it jupyter env KAFKA_BOOTSTRAP=kafka-broker:9092 \
python /home/jovyan/week14_clicks/produce_clicks.py --rate 5 --count 100
Keep the producer terminal open — Spark will continuously ingest events while it runs.
CS 6500 — Big Data Analytics | Week 14
Demo: Session Setup
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType
spark = SparkSession.builder \
.appName("ClickStreamDemo" ) \
.config("spark.jars.packages" ,
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.7" ) \
.getOrCreate()
Run this either in the PySpark shell (pyspark --packages ...) or in a Jupyter Notebook cell at http://localhost:8888 (token: bigdata).
CS 6500 — Big Data Analytics | Week 14
Demo: Define Schema
schema = StructType([
StructField("user_id" , StringType()),
StructField("page" , StringType()),
StructField("event_time" , StringType()),
])
raw = (spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "kafka-broker:9092" )
.option("subscribe" , "clicks" )
.option("startingOffsets" , "latest" )
.load())
CS 6500 — Big Data Analytics | Week 14
Demo: Parse JSON and Start Query
clicks = (raw
.select(from_json(col("value" ).cast("string" ), schema).alias("d" ))
.select("d.*" )
.withColumn("event_time" , to_timestamp("event_time" )))
query = (clicks.writeStream
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="5 seconds" )
.start())
query.awaitTermination()
Type these events into the producer terminal:
{"user_id": "u1", "page": "/home", "event_time": "2026-03-17 09:00:01"}
{"user_id": "u2", "page": "/product", "event_time": "2026-03-17 09:00:03"}
{"user_id": "u1", "page": "/cart", "event_time": "2026-03-17 09:00:07"}
Output appears ~5 seconds after producing. Each micro-batch shows a batch number.
CS 6500 — Big Data Analytics | Week 14
Windowing on Event Time
Aggregating streams over time intervals
CS 6500 — Big Data Analytics | Week 14
Event Time vs. Processing Time
Processing time
When Spark receives the event
Easy — just the cluster's wall clock
Problem: mobile apps buffer events offline
User clicks at 9:00 AM
Phone reconnects at 9:15 AM
Processing time: 9:15 AM
Event time
When the event actually occurred (in the payload)
Requires a timestamp field in the data
Correctly places the click in the 9:00–9:05 window ✓
Requires watermarks to handle late arrivals
Rule: Always window on event time when the timestamp is available. Processing time is only for synthetic/benchmarking streams.
CS 6500 — Big Data Analytics | Week 14
Tumbling Windows
Fixed-size, non-overlapping intervals. Each event belongs to exactly one window.
|---5min---|---5min---|---5min---|
[9:00–9:05][9:05–9:10][9:10–9:15]
from pyspark.sql.functions import window
windowed_counts = clicks.groupBy(
window(col("event_time" ), "5 minutes" ),
col("page" )
).count()
Use case: periodic, non-overlapping summaries (billing intervals, per-minute snapshots).
CS 6500 — Big Data Analytics | Week 14
Run Tumbling
tumbling_query = (windowed_counts.writeStream
.outputMode("update" )
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="5 seconds" )
.start())
Output appears every trigger interval with updated window counts.
CS 6500 — Big Data Analytics | Week 14
Sliding Windows
Overlapping intervals. Each event may belong to multiple windows.
|---10min--|
|---10min--|
|---10min--|
← slide: 5 min →
sliding_counts = clicks.groupBy(
window(col("event_time" ), "10 minutes" , "5 minutes" ),
col("page" )
).count()
sliding_query = (sliding_counts.writeStream
.outputMode("update" )
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="5 seconds" )
.start())
Use case: "Rolling 10-minute click rate, updated every 5 minutes" — smoothed metrics, trend detection, moving averages. Expect more output rows than tumbling (each event counted in multiple windows).
CS 6500 — Big Data Analytics | Week 14
Session Windows
Gap-based intervals — window closes after a period of inactivity .
from pyspark.sql.functions import session_window
session_counts = clicks.groupBy(
session_window(col("event_time" ), "10 minutes" ),
col("user_id" )
).count()
session_query = (session_counts.writeStream
.outputMode("update" )
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="5 seconds" )
.start())
Before starting the next example, stop the previous one (tumbling_query.stop(), sliding_query.stop(), or session_query.stop()) to avoid mixed console output.
user1: [9:00]---[9:02]---[9:04] [gap > 10min] [9:25]---[9:27]
|_______ session 1 _________| |__ session 2 __|
Use case: User session analytics, variable-length activity windows. Each user gets their own dynamically-sized session.
CS 6500 — Big Data Analytics | Week 14
Choosing the Right Window
Window Type
Shape
Event Membership
When to Use
Tumbling
Fixed, aligned
Exactly one window
Periodic reports, billing intervals
Sliding
Fixed, overlapping
Multiple windows
Smoothed metrics, rolling averages
Session
Variable, gap-based
One session per user
UX analytics, user journey analysis
Quick decision rule:
"Every N minutes, give me a fresh count" → Tumbling
"Every 5 minutes, give me the last 15 minutes" → Sliding
"How long was each user active?" → Session
CS 6500 — Big Data Analytics | Week 14
Activity: Window Design Challenge
Time: 8 minutes | Individual
For each scenario below, choose the best window type and explain why:
A payment processor wants to count transactions per merchant per hour to detect unusual volume spikes
A streaming platform wants a 30-second rolling average of concurrent viewers, updated every 10 seconds
An e-commerce site wants to track each customer's shopping session — from first page view until 20 minutes of inactivity
A data center monitors server CPU every 10 seconds and wants peak CPU per 5-minute interval
Write your answers — we'll discuss as a class.
CS 6500 — Big Data Analytics | Week 14
Key Takeaways
Structured Streaming = batch DataFrame API + readStream/writeStream — the same transformations, same Catalyst, same SQL
Micro-batch collects events over a trigger interval, runs a Spark job, emits results — ~100ms minimum latency, exactly-once semantics
Event time is when the event occurred (in the payload); processing time is when Spark saw it — always prefer event time
Tumbling: one window per interval, non-overlapping
Sliding: overlapping windows — each event counted in multiple windows
Session: gap-based, dynamic per-entity — closes after inactivity
Session 2: Watermarks, output modes, stream joins, checkpointing — and 60 minutes of hands-on lab
CS 6500 — Big Data Analytics | Week 14
What's Missing?
Spark Structured Streaming processes live events at scale — but it wasn't built for everything
CS 6500 — Big Data Analytics | Week 14
The Gaps Structured Streaming Leaves Open
Sub-millisecond latency — micro-batch minimum is ~100ms; financial trading and telemetry alerting need true per-record processing (covered in Week 13)
Complex batch transformations — SQL-based data modeling, lineage tracking, and test-driven transformation pipelines are not a streaming concern → dbt
Pipeline orchestration — who schedules the streaming job, retries it on failure, coordinates it with batch ETL, and alerts on SLA violations? → Airflow
Cross-source federated queries — joining a live stream with historical S3 data at query time → Trino / Athena
CS 6500 — Big Data Analytics | Week 14
What Comes Next
Gap
Solution
When
Batch SQL transformations with lineage
dbt
Week 15
Pipeline scheduling and retry logic
Apache Airflow
Week 15
Production monitoring and SLA enforcement
Prometheus + Grafana
(beyond course scope)
Cloud-managed streaming at scale
AWS Kinesis / MSK
(reference)
Spark Structured Streaming is the right tool for micro-batch real-time processing — but a production data platform wraps it with orchestration (Airflow), batch transformation (dbt), and observability (Grafana). That full stack is Week 15.
CS 6500 — Big Data Analytics | Week 14
Warm-up question: "What's the difference between a Kafka consumer and a streaming processor?" Answer: consumer reads events and handles them one-at-a-time in application code; streaming processor applies transformations, aggregations, and joins over a continuous stream using a query engine.
Ask: "Which of these would your company lose money over in a real-time vs. 30-minute scenario?"
Note: continuous processing was added in Spark 2.3 but has seen limited adoption. Flink owns the true-streaming market.
Point out: if students already know the DataFrame API from weeks 5-6, they already know ~80% of Structured Streaming. The new concepts are sources, sinks, triggers, output modes, and watermarks.
Trigger options: processingTime="5 seconds" (most common), once=True (process all available, stop), availableNow=True (Spark 3.3+), continuous="1 second" (experimental).
Emphasize: the checkpoint is what makes streaming reliable. Without it, a restart would re-process from the beginning or skip events.
5 minutes. Get this running before class. If Kafka is slow to start, have a backup socket-source version ready.
Point out: raw.isStreaming == True. The DataFrame API is identical — just readStream instead of read.
Watch for: students not seeing output. Usually: wrong topic name, Kafka not running, or startingOffsets="latest" when data was already there. Switch to "earliest" if needed.
Classic misconception: "why can't I just use when Spark sees it?" — because network delays, mobile buffering, and batch uploads mean events arrive out of order. Event time is ground truth.
Session windows were added in Spark 3.2. Require watermarks in production — otherwise state grows unboundedly.
Answers: 1=Tumbling 1hr, 2=Sliding 30s/10s, 3=Session 20min, 4=Tumbling 5min. Ask students to justify their choice — the "why" matters more than the label.