Watermarks, Joins, and Fault Tolerance

CS 6500 — Week 14, Session 2

CS 6500 — Big Data Analytics | Week 14

What Happens to Your 5-Minute Click Count When a Phone Dies?

"A user clicks at 9:02 AM, but their phone loses signal and buffers the event. It arrives at Spark at 9:14 AM — eight minutes late. Your 9:00–9:05 window already closed. Does Spark update the count, drop the event, or keep state forever waiting for stragglers?"

CS 6500 — Big Data Analytics | Week 14

Today's Answer: Watermarks Bound the Trade-Off

You declare a maximum lateness tolerance. Spark uses it to:

What watermarks enable

  • Accept events up to N minutes late
  • Update the correct window when they arrive
  • Evict finalized window state from memory
  • Enable append output mode (emit once, final)

Today's lab

  • Watermark configuration + late event injection
  • Tumbling and sliding window pipelines to HDFS
  • Stream-static join (sensor enrichment)
  • Stream-stream join (ad impression matching)
  • Checkpoint recovery demonstration

Everything runs in Docker against the Kafka + Spark stack you used last week.

CS 6500 — Big Data Analytics | Week 14

Lab Goals + Environment Check

By the end of this session you will have built:

  1. A windowed sensor aggregation pipeline writing Parquet to HDFS
  2. A stream-static join enriching live events with a dimension table
  3. A stream-stream join matching ad impressions to clicks
  4. A recovered streaming query that resumes from a checkpoint

Check your environment:

docker-compose ps   # kafka, spark-master, spark-worker should be Running

# Optional (from docker/): reset Week 14 topics + one smoke-test click event
make week14-reset

# Verify topics
docker exec -it kafka-broker kafka-topics --list --bootstrap-server localhost:9092
CS 6500 — Big Data Analytics | Week 14

Part 1 — Watermarks and Output Modes

15 minutes of theory before the lab

CS 6500 — Big Data Analytics | Week 14

The Late Data Problem

Without watermarks, Spark must keep all window state forever.

Event at 9:02 AM → arrives at 9:14 AM (8 min late)

Tumbling windows:
  [9:00–9:05]  ← should this window update?
  [9:05–9:10]  ← or this one?
  [9:10–9:15]  ← the event's processing time falls here
  • Without watermark: Spark retains all windows indefinitely — unbounded memory growth
  • With watermark: Spark tracks the maximum event time seen, and discards events/state older than max_event_time − watermark_delay

The trade-off: larger delay → more late events accepted, more memory; smaller delay → less memory, more events dropped

CS 6500 — Big Data Analytics | Week 14

How Watermarks Work

# "Accept events up to 10 minutes late. Drop anything older."
watermarked = clicks.withWatermark("event_time", "10 minutes")

counts = watermarked.groupBy(
    window(col("event_time"), "5 minutes"),
    col("page")
).count()

Watermark value = max(event_time seen so far) − watermark_delay

Event arrives Max event_time Watermark value 9:00–9:05 window state
9:08 event 9:08 8:58 Still open
9:12 event 9:12 9:02 Still open
9:16 event 9:16 9:06 Finalized and evicted

After 9:16, any event for 9:00–9:05 is silently dropped.

CS 6500 — Big Data Analytics | Week 14

Output Modes

Mode What Gets Written Valid When
Append Only new, finalized rows — written once, never updated Aggregations with watermark; append-only sinks
Update Only rows that changed since the last trigger Any aggregation; sinks that support UPSERT
Complete The entire result table, every trigger Small global aggregations only (total counts, top-N)
# Append: each window result written once after watermark passes
.writeStream.outputMode("append")

# Update: updated rows re-emitted each trigger (good for live dashboards)
.writeStream.outputMode("update")

# Complete: full table every trigger (only for small result sets!)
.writeStream.outputMode("complete")
CS 6500 — Big Data Analytics | Week 14

Output Mode Selection Rules

Query Type Allowed Modes
Select / filter (no aggregation) Append
Aggregation without watermark Complete, Update
Aggregation with watermark Append, Update, Complete
Stream-stream join Append
Stream-static join Append, Update

The most common production choice: aggregation + watermark + append mode + Parquet sink

This writes each finalized result exactly once — making the output idempotent and easy to consume downstream.

CS 6500 — Big Data Analytics | Week 14

Lab Part 1 — Sensor Stream Pipeline

Tumbling and sliding window aggregations to HDFS

CS 6500 — Big Data Analytics | Week 14

Lab Setup — Start the Sensor Producer

# Terminal 1: start continuous sensor event producer
docker exec -it spark-master \
  python /datasets/sensors/produce_sensors.py
CS 6500 — Big Data Analytics | Week 14

Lab Setup — Run Spark

Open a second terminal for your PySpark job:

docker exec -it spark-master pyspark \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.7
CS 6500 — Big Data Analytics | Week 14

Lab Setup — Jupyter Option

Or open a Jupyter Notebook at http://localhost:8888 (token: bigdata) and use the PySpark kernel.

We'll build sensor_pipeline.py step by step.

CS 6500 — Big Data Analytics | Week 14

Activity 1

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, window, avg, max as spark_max
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

spark = SparkSession.builder.appName("SensorPipeline").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

schema = StructType([
    StructField("sensor_id",   StringType()),
    StructField("temperature", DoubleType()),
    StructField("ts",          TimestampType()),
])

raw = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "kafka-broker:9092")
    .option("subscribe", "sensors")
    .option("startingOffsets", "earliest")
    .load())

readings = (raw
    .select(from_json(col("value").cast("string"), schema).alias("d"))
    .select("d.*")
    .withWatermark("ts", "2 minutes"))
CS 6500 — Big Data Analytics | Week 14

Activity 1

# Tumbling 5-minute window: avg + max temperature per sensor
tumbling = (readings
    .groupBy(window("ts", "5 minutes"), "sensor_id")
    .agg(avg("temperature").alias("avg_temp"),
         spark_max("temperature").alias("max_temp")))

# Sliding 10-minute window, 5-minute slide
sliding = (readings
    .groupBy(window("ts", "10 minutes", "5 minutes"), "sensor_id")
    .agg(avg("temperature").alias("avg_temp")))

# Query A: show tumbling output in console (easy to verify in class)
q1_view = (tumbling.writeStream
  .outputMode("update")
  .format("console")
  .option("truncate", False)
  .trigger(processingTime="10 seconds")
  .queryName("tumbling_view")
  .start())

# Query B: persist tumbling to HDFS Parquet for checkpoint recovery later
q1 = (tumbling.writeStream
  .outputMode("append")
  .format("parquet")
  .option("checkpointLocation", "hdfs:///checkpoints/tumbling/")
  .option("path", "hdfs:///output/sensor_tumbling/")
  .trigger(processingTime="10 seconds")
  .queryName("tumbling_5min")
  .start())

# Query C: sliding to console (update mode — re-emits changed windows)
q2 = (sliding.writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="10 seconds")
    .queryName("sliding_10min")
    .start())

spark.streams.awaitAnyTermination()
CS 6500 — Big Data Analytics | Week 14

Demo: Inject a Late Event

After the pipeline has been running for ~3 minutes:

# Late by 90 seconds — WITHIN the 2-minute watermark → accepted
echo '{"sensor_id":"s1","temperature":99.9,"ts":"2026-03-17T08:58:30"}' | \
  docker exec -i kafka-broker kafka-console-producer \
    --topic sensors --bootstrap-server localhost:9092
CS 6500 — Big Data Analytics | Week 14

Inject Dropped Event

# Late by 5 minutes — PAST the watermark → silently dropped
echo '{"sensor_id":"s1","temperature":99.9,"ts":"2026-03-17T08:55:00"}' | \
  docker exec -i kafka-broker kafka-console-producer \
    --topic sensors --bootstrap-server localhost:9092
CS 6500 — Big Data Analytics | Week 14

Observe Results

  • First event may update an open window.
  • Second event is dropped silently.
  • Check q1.lastProgress["numInputRows"] after each batch.
CS 6500 — Big Data Analytics | Week 14

Checkpoint: Lab Part 1

Before moving to Part 2, verify:

# HDFS output should have Parquet files
docker exec -it spark-master \
  hdfs dfs -ls /output/sensor_tumbling/

# Checkpoint directory should have offsets/ and commits/
docker exec -it spark-master \
  hdfs dfs -ls /checkpoints/tumbling/
CS 6500 — Big Data Analytics | Week 14

Verify Output

  • /output/sensor_tumbling/ should contain .parquet files.
  • /checkpoints/tumbling/offsets/ should contain numbered files.
  • If empty, inspect q1.lastProgress before continuing.
CS 6500 — Big Data Analytics | Week 14

Lab Part 2 — Joins and Fault Tolerance

Stream-static, stream-stream, and checkpoint recovery

CS 6500 — Big Data Analytics | Week 14

Activity 2: Static Join

Enrich the live sensor stream with a static metadata table

# sensor_metadata.csv: sensor_id, location, building, floor
sensor_meta = spark.read.csv(
    "hdfs:///datasets/sensors/sensor_metadata.csv",
    header=True, inferSchema=True)

enriched = readings.join(sensor_meta, on="sensor_id", how="left")
CS 6500 — Big Data Analytics | Week 14

Run Static Join

query = (enriched.writeStream
    .format("console")
    .option("truncate", False)
    .trigger(processingTime="5 seconds")
    .start())
query.awaitTermination()

Key point: The static DataFrame is loaded once and broadcast to executors. It does not need a watermark. New rows added to the CSV after the query starts will not be picked up automatically.

CS 6500 — Big Data Analytics | Week 14

Activity 3: Stream-Stream Join

Match ad impressions to click events within a 5-minute window

from pyspark.sql.functions import expr

impression_schema = StructType([
    StructField("ad_id",    StringType()),
    StructField("user_id",  StringType()),
    StructField("shown_at", TimestampType()),
])
click_schema = StructType([
    StructField("ad_id",      StringType()),
    StructField("user_id",    StringType()),
    StructField("clicked_at", TimestampType()),
])
CS 6500 — Big Data Analytics | Week 14

Activity 3: Read Streams

def read_topic(topic, schema):
    return (spark.readStream.format("kafka")
        .option("kafka.bootstrap.servers", "kafka-broker:9092")
        .option("subscribe", topic).option("startingOffsets", "latest")
        .load()
        .select(from_json(col("value").cast("string"), schema).alias("d"))
        .select("d.*"))

impressions = read_topic("impressions", impression_schema) \
    .withWatermark("shown_at",   "10 minutes")
clicks      = read_topic("clicks_ads",  click_schema) \
    .withWatermark("clicked_at", "10 minutes")
CS 6500 — Big Data Analytics | Week 14

Activity 3: Join Run

# Join: same ad, same user, click within 5 minutes of impression
matched = impressions.join(clicks,
    expr("""
        impressions.ad_id   = clicks.ad_id   AND
        impressions.user_id = clicks.user_id AND
        clicked_at BETWEEN shown_at AND shown_at + INTERVAL 5 MINUTES
    """),
    joinType="leftOuter")

query = (matched.writeStream
    .format("console")
    .option("truncate", False)
    .outputMode("append")
    .option("checkpointLocation", "hdfs:///checkpoints/ad_join/")
    .trigger(processingTime="10 seconds")
    .start())
query.awaitTermination()

Why watermarks are required here: Both streams buffer rows waiting for a match. Without watermarks, Spark retains the full history of both streams — unbounded memory. Watermarks define when a row can no longer find a match and should be evicted.

CS 6500 — Big Data Analytics | Week 14

Activity 4: Stop and Check

Stop the tumbling pipeline from Activity 1 and inspect offsets

# Step 1: stop the running query
q1.stop()

# Step 2: check the last committed batch number
import os
# Or from HDFS:
# hdfs dfs -ls /checkpoints/tumbling/offsets/
CS 6500 — Big Data Analytics | Week 14

Activity 4: Restart

# Step 3: restart with the SAME checkpoint location
q1_restart = (tumbling.writeStream
    .outputMode("append")
    .format("parquet")
    .option("checkpointLocation", "hdfs:///checkpoints/tumbling/")  # same!
    .option("path", "hdfs:///output/sensor_tumbling/")
    .trigger(processingTime="10 seconds")
    .start())

# Spark reads the checkpoint, finds the last committed Kafka offset,
# and resumes from the next unconsumed message.
print(q1_restart.lastProgress)
CS 6500 — Big Data Analytics | Week 14

Inspect Checkpoint

# Inspect the checkpoint directory structure
docker exec -it spark-master \
  hdfs dfs -ls /checkpoints/tumbling/

# offsets/5 — the Kafka offset record for batch 5
docker exec -it spark-master \
  hdfs dfs -cat /checkpoints/tumbling/offsets/5

Look for numbered files in offsets/ and commits/ — the highest commit number is where Spark will resume.

CS 6500 — Big Data Analytics | Week 14

Checkpoint Anatomy

/checkpoints/tumbling/
  metadata          ← query ID, schema, configuration
  offsets/          ← Kafka partition offsets per batch (what was READ)
    0, 1, 2, 3 ...
  commits/          ← which batches completed successfully (what was WRITTEN)
    0, 1, 2, 3 ...
  state/            ← stateful operator state (window aggregates)

Exactly-once guarantee: On restart, Spark reads the highest commits/N, finds the corresponding offsets/N, and starts reading Kafka from the next offset. Any partial writes from the failed batch are discarded.

CS 6500 — Big Data Analytics | Week 14

Debrief

What did we build and why does it matter?

CS 6500 — Big Data Analytics | Week 14

Discussion Questions

Take 5 minutes — discuss with a neighbor:

  1. Output mode: Your pipeline aggregates clicks per user per hour and writes to a live dashboard. Users are refreshed every 30 seconds. Should you use append, update, or complete mode? Why?

  2. Watermark sizing: Your mobile app can buffer events for up to 30 minutes when offline. What watermark delay do you set? What's the memory cost of that choice?

  3. Stream-stream vs. stream-static: You're joining live transactions to a fraud blocklist. The blocklist is updated every 10 minutes. Which join type do you use and why?

  4. Checkpointing: You upgrade your Spark version. Can you restart the streaming job from the existing checkpoint? What risks exist?

CS 6500 — Big Data Analytics | Week 14

Key Takeaways

  • Watermarks bound stateful memory — declare the maximum lateness you'll tolerate; Spark evicts older state
  • Output modes: Append (once, final, requires watermark) → Update (re-emits changed rows) → Complete (entire table, small cardinality only)
  • Stream-static join: dimension enrichment, broadcast once, no watermark needed
  • Stream-stream join: both sides buffer state; watermarks on both sides are required to prevent unbounded memory
  • Checkpointing = fault tolerance: restart resumes from last committed Kafka offset with no duplicates and no data loss
CS 6500 — Big Data Analytics | Week 14

What's Missing?

Spark Structured Streaming processes live events at scale — but it wasn't built for everything

CS 6500 — Big Data Analytics | Week 14

The Gaps Structured Streaming Leaves Open

  • Pipeline orchestration — who schedules the streaming job, retries it on failure, coordinates it with nightly batch ETL, and pages you at 3 AM when it stalls? → Apache Airflow
  • SQL-based batch transformations with lineage — transforming historical data with documented, tested, versioned SQL models is a batch concern → dbt
  • Production monitoring and SLA enforcement — query lag, state size, throughput, and watermark drift need dashboards and alerts → Prometheus + Grafana (Week 15)
  • True sub-millisecond latency — micro-batch floor is ~100ms; for <10ms requirements, Apache Flink (covered in Week 13) is the answer
CS 6500 — Big Data Analytics | Week 14

What Comes Next

Gap Solution When
Schedule streaming + batch jobs together Apache Airflow DAGs Week 15
SQL-based data transformation with testing dbt models Week 15
Cluster health, query lag, cost visibility Prometheus + Grafana (beyond course scope)
SLA contracts on pipeline freshness SLO design patterns (beyond course scope)

Spark Structured Streaming is the right tool for micro-batch real-time processing — but production requires orchestration (Airflow to schedule it), transformation (dbt to model the historical side), and observability (Grafana to know when it breaks). That full stack is Week 15.

CS 6500 — Big Data Analytics | Week 14

Homework Reminder

Streaming Pipeline homework — due Sunday 11:59 PM

Four tasks:

  1. Sensor tumbling pipeline — 1-minute windows to Parquet, 30-second watermark, append mode
  2. Sliding window comparison — 5-minute/1-minute sliding to console; written explanation of why windows repeat
  3. Late data experiment — inject events inside and past the watermark; explain what happened and why
  4. Checkpoint recovery — stop, restart, screenshot lastProgress before and after; explain how Spark knows where to resume

Submit: 4 Python scripts + 1 PDF with screenshots and written explanations

CS 6500 — Big Data Analytics | Week 14

If sensors topic is missing: docker exec -it kafka-broker kafka-topics --create --topic sensors --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092

Draw the timeline on the board: advancing watermark value as events arrive. The watermark is a low-water mark — it advances as new events arrive, never goes backward.

Common mistake: complete mode on a high-cardinality key (millions of user IDs) writes millions of rows every 5 seconds. Always ask: "how many distinct keys are in my result?"

This table is worth memorizing for the quiz. The key insight: Append requires the system to know a row is "final" — which requires a watermark for aggregations. Without a watermark, aggregations can always be updated by late events, so Append is illegal.

Give students 10 minutes to run this and observe output. Then do the late event demo before they move on.

This is the most impactful demo of the session. Students often assume dropped events produce an error. The silent drop surprises them — which is exactly the point: you must choose your watermark carefully.

Stream-static join: always supported, no output mode restriction. The static side is replicated to all executors as a broadcast. If the static table changes, you must restart the query.

The key observation: the batch number in lastProgress picks up where it left off, not from 0. And the Kafka startingOffsets are determined by the checkpoint, not by the query option.

Answers to prompt discussion: 1=update (windows keep changing until finalized; complete would write all users every 30s), 2=30 min watermark; memory scales with cardinality × watermark size, 3=if 10-min refresh is acceptable, stream-static is simpler; if you need immediate blocklist updates, stream-stream. 4=checkpoints are not guaranteed compatible across Spark versions — may need to restart from scratch.