Previous slide Next slide Toggle fullscreen Toggle overview view Open presenter view
Watermarks, Joins, and Fault Tolerance
CS 6500 — Week 14, Session 2
CS 6500 — Big Data Analytics | Week 14
What Happens to Your 5-Minute Click Count When a Phone Dies?
"A user clicks at 9:02 AM, but their phone loses signal and buffers the event. It arrives at Spark at 9:14 AM — eight minutes late. Your 9:00–9:05 window already closed. Does Spark update the count, drop the event, or keep state forever waiting for stragglers?"
CS 6500 — Big Data Analytics | Week 14
Today's Answer: Watermarks Bound the Trade-Off
You declare a maximum lateness tolerance. Spark uses it to:
What watermarks enable
Accept events up to N minutes late
Update the correct window when they arrive
Evict finalized window state from memory
Enable append output mode (emit once, final)
Today's lab
Watermark configuration + late event injection
Tumbling and sliding window pipelines to HDFS
Stream-static join (sensor enrichment)
Stream-stream join (ad impression matching)
Checkpoint recovery demonstration
Everything runs in Docker against the Kafka + Spark stack you used last week.
CS 6500 — Big Data Analytics | Week 14
Lab Goals + Environment Check
By the end of this session you will have built:
A windowed sensor aggregation pipeline writing Parquet to HDFS
A stream-static join enriching live events with a dimension table
A stream-stream join matching ad impressions to clicks
A recovered streaming query that resumes from a checkpoint
Check your environment:
docker-compose ps
make week14-reset
docker exec -it kafka-broker kafka-topics --list --bootstrap-server localhost:9092
CS 6500 — Big Data Analytics | Week 14
Part 1 — Watermarks and Output Modes
15 minutes of theory before the lab
CS 6500 — Big Data Analytics | Week 14
The Late Data Problem
Without watermarks, Spark must keep all window state forever.
Event at 9:02 AM → arrives at 9:14 AM (8 min late)
Tumbling windows:
[9:00–9:05] ← should this window update?
[9:05–9:10] ← or this one?
[9:10–9:15] ← the event's processing time falls here
Without watermark: Spark retains all windows indefinitely — unbounded memory growth
With watermark: Spark tracks the maximum event time seen, and discards events/state older than max_event_time − watermark_delay
The trade-off: larger delay → more late events accepted, more memory; smaller delay → less memory, more events dropped
CS 6500 — Big Data Analytics | Week 14
How Watermarks Work
watermarked = clicks.withWatermark("event_time" , "10 minutes" )
counts = watermarked.groupBy(
window(col("event_time" ), "5 minutes" ),
col("page" )
).count()
Watermark value = max(event_time seen so far) − watermark_delay
Event arrives
Max event_time
Watermark value
9:00–9:05 window state
9:08 event
9:08
8:58
Still open
9:12 event
9:12
9:02
Still open
9:16 event
9:16
9:06
Finalized and evicted
After 9:16, any event for 9:00–9:05 is silently dropped.
CS 6500 — Big Data Analytics | Week 14
Output Modes
Mode
What Gets Written
Valid When
Append
Only new, finalized rows — written once, never updated
Aggregations with watermark; append-only sinks
Update
Only rows that changed since the last trigger
Any aggregation; sinks that support UPSERT
Complete
The entire result table, every trigger
Small global aggregations only (total counts, top-N)
.writeStream.outputMode("append" )
.writeStream.outputMode("update" )
.writeStream.outputMode("complete" )
CS 6500 — Big Data Analytics | Week 14
Output Mode Selection Rules
Query Type
Allowed Modes
Select / filter (no aggregation)
Append
Aggregation without watermark
Complete, Update
Aggregation with watermark
Append , Update, Complete
Stream-stream join
Append
Stream-static join
Append, Update
The most common production choice: aggregation + watermark + append mode + Parquet sink
This writes each finalized result exactly once — making the output idempotent and easy to consume downstream.
CS 6500 — Big Data Analytics | Week 14
Lab Part 1 — Sensor Stream Pipeline
Tumbling and sliding window aggregations to HDFS
CS 6500 — Big Data Analytics | Week 14
Lab Setup — Start the Sensor Producer
docker exec -it spark-master \
python /datasets/sensors/produce_sensors.py
CS 6500 — Big Data Analytics | Week 14
Lab Setup — Run Spark
Open a second terminal for your PySpark job:
docker exec -it spark-master pyspark \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.7
CS 6500 — Big Data Analytics | Week 14
Lab Setup — Jupyter Option
Or open a Jupyter Notebook at http://localhost:8888 (token: bigdata) and use the PySpark kernel.
We'll build sensor_pipeline.py step by step.
CS 6500 — Big Data Analytics | Week 14
Activity 1
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window, avg, max as spark_max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
spark = SparkSession.builder.appName("SensorPipeline" ).getOrCreate()
spark.sparkContext.setLogLevel("WARN" )
schema = StructType([
StructField("sensor_id" , StringType()),
StructField("temperature" , DoubleType()),
StructField("ts" , TimestampType()),
])
raw = (spark.readStream
.format ("kafka" )
.option("kafka.bootstrap.servers" , "kafka-broker:9092" )
.option("subscribe" , "sensors" )
.option("startingOffsets" , "earliest" )
.load())
readings = (raw
.select(from_json(col("value" ).cast("string" ), schema).alias("d" ))
.select("d.*" )
.withWatermark("ts" , "2 minutes" ))
CS 6500 — Big Data Analytics | Week 14
Activity 1
tumbling = (readings
.groupBy(window("ts" , "5 minutes" ), "sensor_id" )
.agg(avg("temperature" ).alias("avg_temp" ),
spark_max("temperature" ).alias("max_temp" )))
sliding = (readings
.groupBy(window("ts" , "10 minutes" , "5 minutes" ), "sensor_id" )
.agg(avg("temperature" ).alias("avg_temp" )))
q1_view = (tumbling.writeStream
.outputMode("update" )
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="10 seconds" )
.queryName("tumbling_view" )
.start())
q1 = (tumbling.writeStream
.outputMode("append" )
.format ("parquet" )
.option("checkpointLocation" , "hdfs:///checkpoints/tumbling/" )
.option("path" , "hdfs:///output/sensor_tumbling/" )
.trigger(processingTime="10 seconds" )
.queryName("tumbling_5min" )
.start())
q2 = (sliding.writeStream
.outputMode("update" )
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="10 seconds" )
.queryName("sliding_10min" )
.start())
spark.streams.awaitAnyTermination()
CS 6500 — Big Data Analytics | Week 14
Demo: Inject a Late Event
After the pipeline has been running for ~3 minutes:
echo '{"sensor_id":"s1","temperature":99.9,"ts":"2026-03-17T08:58:30"}' | \
docker exec -i kafka-broker kafka-console-producer \
--topic sensors --bootstrap-server localhost:9092
CS 6500 — Big Data Analytics | Week 14
Inject Dropped Event
echo '{"sensor_id":"s1","temperature":99.9,"ts":"2026-03-17T08:55:00"}' | \
docker exec -i kafka-broker kafka-console-producer \
--topic sensors --bootstrap-server localhost:9092
CS 6500 — Big Data Analytics | Week 14
Observe Results
First event may update an open window.
Second event is dropped silently.
Check q1.lastProgress["numInputRows"] after each batch.
CS 6500 — Big Data Analytics | Week 14
Checkpoint: Lab Part 1
Before moving to Part 2, verify:
docker exec -it spark-master \
hdfs dfs -ls /output/sensor_tumbling/
docker exec -it spark-master \
hdfs dfs -ls /checkpoints/tumbling/
CS 6500 — Big Data Analytics | Week 14
Verify Output
/output/sensor_tumbling/ should contain .parquet files.
/checkpoints/tumbling/offsets/ should contain numbered files.
If empty, inspect q1.lastProgress before continuing.
CS 6500 — Big Data Analytics | Week 14
Lab Part 2 — Joins and Fault Tolerance
Stream-static, stream-stream, and checkpoint recovery
CS 6500 — Big Data Analytics | Week 14
Activity 2: Static Join
Enrich the live sensor stream with a static metadata table
sensor_meta = spark.read.csv(
"hdfs:///datasets/sensors/sensor_metadata.csv" ,
header=True , inferSchema=True )
enriched = readings.join(sensor_meta, on="sensor_id" , how="left" )
CS 6500 — Big Data Analytics | Week 14
Run Static Join
query = (enriched.writeStream
.format ("console" )
.option("truncate" , False )
.trigger(processingTime="5 seconds" )
.start())
query.awaitTermination()
Key point: The static DataFrame is loaded once and broadcast to executors. It does not need a watermark. New rows added to the CSV after the query starts will not be picked up automatically.
CS 6500 — Big Data Analytics | Week 14
Activity 3: Stream-Stream Join
Match ad impressions to click events within a 5-minute window
from pyspark.sql.functions import expr
impression_schema = StructType([
StructField("ad_id" , StringType()),
StructField("user_id" , StringType()),
StructField("shown_at" , TimestampType()),
])
click_schema = StructType([
StructField("ad_id" , StringType()),
StructField("user_id" , StringType()),
StructField("clicked_at" , TimestampType()),
])
CS 6500 — Big Data Analytics | Week 14
Activity 3: Read Streams
def read_topic (topic, schema ):
return (spark.readStream.format ("kafka" )
.option("kafka.bootstrap.servers" , "kafka-broker:9092" )
.option("subscribe" , topic).option("startingOffsets" , "latest" )
.load()
.select(from_json(col("value" ).cast("string" ), schema).alias("d" ))
.select("d.*" ))
impressions = read_topic("impressions" , impression_schema) \
.withWatermark("shown_at" , "10 minutes" )
clicks = read_topic("clicks_ads" , click_schema) \
.withWatermark("clicked_at" , "10 minutes" )
CS 6500 — Big Data Analytics | Week 14
Activity 3: Join Run
matched = impressions.join(clicks,
expr("""
impressions.ad_id = clicks.ad_id AND
impressions.user_id = clicks.user_id AND
clicked_at BETWEEN shown_at AND shown_at + INTERVAL 5 MINUTES
""" ),
joinType="leftOuter" )
query = (matched.writeStream
.format ("console" )
.option("truncate" , False )
.outputMode("append" )
.option("checkpointLocation" , "hdfs:///checkpoints/ad_join/" )
.trigger(processingTime="10 seconds" )
.start())
query.awaitTermination()
Why watermarks are required here: Both streams buffer rows waiting for a match. Without watermarks, Spark retains the full history of both streams — unbounded memory. Watermarks define when a row can no longer find a match and should be evicted.
CS 6500 — Big Data Analytics | Week 14
Activity 4: Stop and Check
Stop the tumbling pipeline from Activity 1 and inspect offsets
q1.stop()
import os
CS 6500 — Big Data Analytics | Week 14
Activity 4: Restart
q1_restart = (tumbling.writeStream
.outputMode("append" )
.format ("parquet" )
.option("checkpointLocation" , "hdfs:///checkpoints/tumbling/" )
.option("path" , "hdfs:///output/sensor_tumbling/" )
.trigger(processingTime="10 seconds" )
.start())
print (q1_restart.lastProgress)
CS 6500 — Big Data Analytics | Week 14
Inspect Checkpoint
docker exec -it spark-master \
hdfs dfs -ls /checkpoints/tumbling/
docker exec -it spark-master \
hdfs dfs -cat /checkpoints/tumbling/offsets/5
Look for numbered files in offsets/ and commits/ — the highest commit number is where Spark will resume.
CS 6500 — Big Data Analytics | Week 14
Checkpoint Anatomy
/checkpoints/tumbling/
metadata ← query ID, schema, configuration
offsets/ ← Kafka partition offsets per batch (what was READ)
0, 1, 2, 3 ...
commits/ ← which batches completed successfully (what was WRITTEN)
0, 1, 2, 3 ...
state/ ← stateful operator state (window aggregates)
Exactly-once guarantee: On restart, Spark reads the highest commits/N, finds the corresponding offsets/N, and starts reading Kafka from the next offset. Any partial writes from the failed batch are discarded.
CS 6500 — Big Data Analytics | Week 14
Debrief
What did we build and why does it matter?
CS 6500 — Big Data Analytics | Week 14
Discussion Questions
Take 5 minutes — discuss with a neighbor:
Output mode: Your pipeline aggregates clicks per user per hour and writes to a live dashboard. Users are refreshed every 30 seconds. Should you use append, update, or complete mode? Why?
Watermark sizing: Your mobile app can buffer events for up to 30 minutes when offline. What watermark delay do you set? What's the memory cost of that choice?
Stream-stream vs. stream-static: You're joining live transactions to a fraud blocklist. The blocklist is updated every 10 minutes. Which join type do you use and why?
Checkpointing: You upgrade your Spark version. Can you restart the streaming job from the existing checkpoint? What risks exist?
CS 6500 — Big Data Analytics | Week 14
Key Takeaways
Watermarks bound stateful memory — declare the maximum lateness you'll tolerate; Spark evicts older state
Output modes: Append (once, final, requires watermark) → Update (re-emits changed rows) → Complete (entire table, small cardinality only)
Stream-static join: dimension enrichment, broadcast once, no watermark needed
Stream-stream join: both sides buffer state; watermarks on both sides are required to prevent unbounded memory
Checkpointing = fault tolerance: restart resumes from last committed Kafka offset with no duplicates and no data loss
CS 6500 — Big Data Analytics | Week 14
What's Missing?
Spark Structured Streaming processes live events at scale — but it wasn't built for everything
CS 6500 — Big Data Analytics | Week 14
The Gaps Structured Streaming Leaves Open
Pipeline orchestration — who schedules the streaming job, retries it on failure, coordinates it with nightly batch ETL, and pages you at 3 AM when it stalls? → Apache Airflow
SQL-based batch transformations with lineage — transforming historical data with documented, tested, versioned SQL models is a batch concern → dbt
Production monitoring and SLA enforcement — query lag, state size, throughput, and watermark drift need dashboards and alerts → Prometheus + Grafana (Week 15)
True sub-millisecond latency — micro-batch floor is ~100ms; for <10ms requirements, Apache Flink (covered in Week 13) is the answer
CS 6500 — Big Data Analytics | Week 14
What Comes Next
Gap
Solution
When
Schedule streaming + batch jobs together
Apache Airflow DAGs
Week 15
SQL-based data transformation with testing
dbt models
Week 15
Cluster health, query lag, cost visibility
Prometheus + Grafana
(beyond course scope)
SLA contracts on pipeline freshness
SLO design patterns
(beyond course scope)
Spark Structured Streaming is the right tool for micro-batch real-time processing — but production requires orchestration (Airflow to schedule it), transformation (dbt to model the historical side), and observability (Grafana to know when it breaks). That full stack is Week 15.
CS 6500 — Big Data Analytics | Week 14
Homework Reminder
Streaming Pipeline homework — due Sunday 11:59 PM
Four tasks:
Sensor tumbling pipeline — 1-minute windows to Parquet, 30-second watermark, append mode
Sliding window comparison — 5-minute/1-minute sliding to console; written explanation of why windows repeat
Late data experiment — inject events inside and past the watermark; explain what happened and why
Checkpoint recovery — stop, restart, screenshot lastProgress before and after; explain how Spark knows where to resume
Submit: 4 Python scripts + 1 PDF with screenshots and written explanations
CS 6500 — Big Data Analytics | Week 14
If sensors topic is missing: docker exec -it kafka-broker kafka-topics --create --topic sensors --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
Draw the timeline on the board: advancing watermark value as events arrive. The watermark is a low-water mark — it advances as new events arrive, never goes backward.
Common mistake: complete mode on a high-cardinality key (millions of user IDs) writes millions of rows every 5 seconds. Always ask: "how many distinct keys are in my result?"
This table is worth memorizing for the quiz. The key insight: Append requires the system to know a row is "final" — which requires a watermark for aggregations. Without a watermark, aggregations can always be updated by late events, so Append is illegal.
Give students 10 minutes to run this and observe output. Then do the late event demo before they move on.
This is the most impactful demo of the session. Students often assume dropped events produce an error. The silent drop surprises them — which is exactly the point: you must choose your watermark carefully.
Stream-static join: always supported, no output mode restriction. The static side is replicated to all executors as a broadcast. If the static table changes, you must restart the query.
The key observation: the batch number in lastProgress picks up where it left off, not from 0. And the Kafka startingOffsets are determined by the checkpoint, not by the query option.
Answers to prompt discussion: 1=update (windows keep changing until finalized; complete would write all users every 30s), 2=30 min watermark; memory scales with cardinality × watermark size, 3=if 10-min refresh is acceptable, stream-static is simpler; if you need immediate blocklist updates, stream-stream. 4=checkpoints are not guaranteed compatible across Spark versions — may need to restart from scratch.