Flink Stateful Apps

CS 6500 — Week 13, Session 2

CS 6500 — Big Data Analytics | Week 13

Crash and Recover

"Your fraud pipeline has processed 900 million transactions. The cluster crashes mid-job and restarts — claiming exactly-once semantics. No double-blocked cards. No missed fraudulent charges. How is that mathematically possible across distributed workers?"

CS 6500 — Big Data Analytics | Week 13

Checkpoints Deliver

Flink takes periodic consistent snapshots of all operator state and Kafka offsets simultaneously. On recovery, the entire system rewinds to the last successful snapshot — as if the crash never happened.

Today's Topics

  • Keyed state: ValueState, ListState, MapState
  • ProcessFunction: custom logic + timers
  • Tumbling, sliding, and session windows
  • Checkpointing and exactly-once semantics

Today's Lab

  • Fraud detection with per-user ValueState
  • Transaction velocity alerting with timers
  • Kafka source + Flink exactly-once
  • Checkpoint recovery demonstration

Everything runs in Docker against the Kafka + Flink stack from Session 1.

CS 6500 — Big Data Analytics | Week 13

Environment Check

# Verify Flink cluster and Kafka are running
docker-compose ps
curl http://localhost:8081/overview

# Verify the transactions topic exists
docker exec -it kafka kafka-topics.sh --list \
  --bootstrap-server localhost:9092

# If missing, create it
docker exec -it kafka kafka-topics.sh --create \
  --topic transactions --partitions 3 --replication-factor 1 \
  --bootstrap-server localhost:9092

Start the transaction producer:

docker exec -it flink-jobmanager \
  python /datasets/transactions/produce_transactions.py
CS 6500 — Big Data Analytics | Week 13

Stateful Operators

Maintaining per-key context across events

CS 6500 — Big Data Analytics | Week 13

What Is State?

Stateless processing: each record is transformed independently — no memory of past records.

Stateful processing: the output depends on the current record and previously seen records.

Use Case State Needed
Count transactions per user Integer count per user_id
Running average spend (sum, count) per user_id
Detect 3 failures in 60 seconds List of recent failure timestamps
Session-level revenue Map of item → quantity in session

Flink state is always keyed — partitioned by key, local to the subtask that owns that key.

State in Flink is stored in a configurable backend. The default is memory (heap), which is fast but limited. RocksDB is the production choice for large state — it stores state on disk with an in-memory write buffer.

CS 6500 — Big Data Analytics | Week 13

Keyed State Types

Type Stores Use When
ValueState[T] A single value per key Counters, flags, last-seen values
ListState[T] A list of values per key Collecting events before aggregating
MapState[K, V] A map per key Session item counts, feature vectors
ReducingState[T] Pre-aggregated value per key Running sum, min, max
# Declare state in open() — Flink restores it from checkpoint on recovery
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common.typeinfo import Types

class FraudDetector(KeyedProcessFunction):
    def open(self, ctx):
        desc = ValueStateDescriptor("tx_count", Types.INT())
        self.tx_count = self.get_runtime_context().get_state(desc)

State descriptors are registered in open(), not in __init__. This allows Flink to restore the state from the last checkpoint before the first process_element call, giving seamless recovery semantics.

CS 6500 — Big Data Analytics | Week 13

ProcessFunction

The low-level API: state access + event-time timers + flexible output.

from pyflink.datastream import KeyedProcessFunction

class VelocityAlert(KeyedProcessFunction):
    """Alert if a user makes > 5 transactions in 60 seconds."""

    def open(self, ctx):
        desc = ValueStateDescriptor("count_in_window", Types.INT())
        self.count = self.get_runtime_context().get_state(desc)

open() runs once per subtask on startup — and again on checkpoint recovery, restoring state automatically before the first process_element call.

CS 6500 — Big Data Analytics | Week 13

Timer and Alert

    def process_element(self, value, ctx):
        cur = self.count.value() or 0
        cur += 1
        self.count.update(cur)
        if cur == 1:
            # Set ONE timer 60 s from the first tx in this window
            ctx.timer_service().register_event_time_timer(
                ctx.timestamp() + 60_000)
        if cur > 5:
            yield f"ALERT: user {value['user_id']} hit {cur} tx in 60s"

    def on_timer(self, timestamp, ctx):
        self.count.clear()   # window expired — reset for next window
CS 6500 — Big Data Analytics | Week 13

Windows in Flink

Grouping a stream into finite, processable chunks

CS 6500 — Big Data Analytics | Week 13

Window Types

Tumbling

stream \
  .key_by(lambda x: x["user_id"]) \
  .window(TumblingEventTimeWindows
          .of(Time.minutes(5))) \
  .aggregate(CountAggregate())

Fixed, non-overlapping 5-minute buckets.

Sliding

.window(SlidingEventTimeWindows
        .of(Time.minutes(10),
            Time.minutes(5)))

10-min window, advances every 5 min.

Session

.window(EventTimeSessionWindows
        .with_gap(Time.minutes(10)))

Window closes after 10 min of inactivity.

Global (no time boundary)

.count_window(100)
# fire every 100 events

Fires when N events accumulate.

Flink window API is nearly identical in concept to Spark Structured Streaming windows (covered in Week 14). The key difference: Flink fires window results immediately when the watermark passes the window end, with sub-millisecond latency rather than Spark's micro-batch interval.

CS 6500 — Big Data Analytics | Week 13

Exactly-Once Kafka

The checkpoint protocol explained

CS 6500 — Big Data Analytics | Week 13

Checkpoint Protocol

Flink's fault tolerance uses consistent distributed snapshots:

  1. JobManager injects a barrier into each source partition
  2. Barriers flow through the job graph with the data
  3. When an operator receives a barrier on all inputs → it snapshots its state
  4. When all operators have snapshotted → checkpoint is complete
  5. JobManager records the Kafka offsets at this barrier

On recovery:

  • Restore state from the last completed checkpoint
  • Reset Kafka consumer offsets to the checkpoint's committed offset
  • Re-read events since the checkpoint — idempotent sinks ensure no duplicates

The barrier mechanism snapshots state without pausing the stream. Production systems checkpoint every 1–5 minutes with negligible latency impact.

CS 6500 — Big Data Analytics | Week 13

Checkpoint Config

from pyflink.datastream import HashMapStateBackend
from pyflink.datastream.checkpoint_storage import (
    FileSystemCheckpointStorage)

env = StreamExecutionEnvironment.get_execution_environment()

# Checkpoint every 60 seconds, exactly-once
env.enable_checkpointing(60_000)
env.get_checkpoint_config().set_checkpointing_mode(
    CheckpointingMode.EXACTLY_ONCE
)
env.get_checkpoint_config().set_min_pause_between_checkpoints(30_000)
env.get_checkpoint_config().set_checkpoint_timeout(120_000)

# State backend (in-memory) + durable checkpoint storage on HDFS
env.set_state_backend(HashMapStateBackend())
env.get_checkpoint_config().set_checkpoint_storage(
    FileSystemCheckpointStorage(
        "hdfs:///flink-checkpoints/fraud-detector/")
)
CS 6500 — Big Data Analytics | Week 13

State Backends

State backend options:

Backend Storage Use When
Memory JVM heap Dev / tiny state
FsStateBackend Heap + HDFS snapshots Medium state
RocksDB Disk (incremental snapshots) Large state (GBs+)
CS 6500 — Big Data Analytics | Week 13

Fraud Detection Lab

Build a stateful Flink pipeline end to end

CS 6500 — Big Data Analytics | Week 13

Lab Setup

# In Docker container
docker exec -it flink-jobmanager bash
cd /opt/flink_lab

# Start the transaction producer (keep running)
python /datasets/transactions/produce_transactions.py &

# Transaction format:
# {"user_id": "u42", "amount": 99.95,
#  "merchant": "gas_station", "ts": "2026-03-17T09:00:01.234Z"}

Lab goals:

Activity What You Build
1 Parse Kafka stream + count per user (ValueState)
2 Alert on 5+ transactions in 60 seconds
3 Add exactly-once checkpointing
4 Kill and restart — verify no duplicate alerts
CS 6500 — Big Data Analytics | Week 13

Lab: Read Kafka

Challenge: The transactions topic is live and filling up. Before writing any fraud logic, you need to prove the data is flowing and formatted correctly — bad assumptions here waste 30 minutes of debugging later.

Think before coding:

  • How do you connect a Flink source to a Kafka topic? What three things does the source need to know?
  • key_by(user_id) routes all records for a user to the same subtask. Why does the fraud rule require this?
  • What does success look like? How many records per second do you expect?

Open fraud_detector_template.py, uncomment the Activity 1 block, run it, and confirm transactions stream continuously before moving on.

CS 6500 — Big Data Analytics | Week 13

Activity 1 — Step 1

# fraud_detector.py — Step 1 of 3: environment
from pyflink.common import WatermarkStrategy
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
    KafkaSource, KafkaOffsetsInitializer)
import json

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)  # one subtask per Kafka partition

set_parallelism(2) — one subtask per Kafka partition. The env object holds the entire job graph.

CS 6500 — Big Data Analytics | Week 13

Activity 1 — Step 2

# Step 2 of 3 — add after the env block ↑
source = (KafkaSource.builder()
    .set_bootstrap_servers("kafka-broker:9092")
    .set_topics("transactions")
    .set_group_id("fraud-detector")
    .set_starting_offsets(KafkaOffsetsInitializer.earliest())
    .set_value_only_deserializer(SimpleStringSchema())
    .build())

earliest() reads from offset 0 on first run — on restart from checkpoint, Flink ignores this and resumes from the committed offset instead.

CS 6500 — Big Data Analytics | Week 13

Activity 1 — Step 3

# Step 3 of 3 — add after the source block ↑
stream = (env
    .from_source(source,
                 WatermarkStrategy.for_monotonous_timestamps(),
                 "Kafka transactions")
    .map(json.loads,
         output_type=Types.MAP(Types.STRING(), Types.STRING()))
    .key_by(lambda x: x["user_id"]))

stream.print()
env.execute("Fraud Detector v1")

key_by(user_id) routes all records for a user to the same subtask — required for per-key state. Verify transactions are printing before moving to Activity 2.

CS 6500 — Big Data Analytics | Week 13

Lab: Add Alerts

Challenge: The fraud rule is: flag any user who makes more than 5 transactions within a 60-second window. This rule depends on the history of past events — a map or filter can't do it alone.

Design before coding:

  • Which Flink operator gives you access to per-key state and a timestamp?
  • A ValueState(count) would count transactions, but couldn't expire old ones. What state type stores individual timestamps so you can prune the ones outside 60 seconds?
  • Where do you declare state, and why does it have to be in open() instead of __init__?

Implement FraudDetector in the template. Users u901 and u902 burst every 30 seconds — you should see ALERT lines within a minute of running.

CS 6500 — Big Data Analytics | Week 13

Activity 2 — Step 1

# Step 1 of 3 — class definition and state registration
from pyflink.datastream import KeyedProcessFunction
from pyflink.datastream.state import ListStateDescriptor

class FraudDetector(KeyedProcessFunction):
    THRESHOLD = 5        # alert when strictly exceeding this
    WINDOW_MS  = 60_000  # 60-second sliding look-back

    def open(self, ctx):
        self.ts_list = self.get_runtime_context().get_list_state(
            ListStateDescriptor("timestamps", Types.LONG()))

open() runs before the first event — Flink restores ts_list from the last checkpoint here, before process_element is ever called.

CS 6500 — Big Data Analytics | Week 13

Activity 2 — Step 2

    # Step 2 of 3 — add process_element with timestamp pruning
    def process_element(self, tx, ctx):
        now_ms = ctx.timer_service().current_processing_time()

        self.ts_list.add(now_ms)          # record this event

        # Discard timestamps outside the 60-second window
        cutoff = now_ms - self.WINDOW_MS
        recent = [t for t in self.ts_list.get() if t >= cutoff]
        self.ts_list.update(recent)       # overwrite with pruned list

Pruning on every event keeps state bounded — ListState does not auto-expire old entries.

CS 6500 — Big Data Analytics | Week 13

Activity 2 — Step 3

        # Step 3 of 3 — alert check (inside process_element, after pruning ↑)
        if len(recent) > self.THRESHOLD:
            yield (f"ALERT user={tx['user_id']} "
                   f"count={len(recent)} "
                   f"last_amount={tx.get('amount', '?')}")

# Wire into the pipeline (replace stream.print() from Activity 1)
stream.process(
    FraudDetector(), output_type=Types.STRING()).print()
env.execute("Fraud Detector v2")

Users u901 and u902 burst every 30 s — ALERT lines should appear within a minute of running.


---

# Lab: Checkpointing

**Challenge:** Your fraud detector is running. The cluster crashes after processing 10 million events. When it restarts, does it re-read from offset 0 — re-alerting on transactions it already processed? Or does it pick up exactly where it left off?

*Predict before running:*

- Without checkpointing, where does Flink's Kafka consumer start on restart?
- What two things must Flink snapshot simultaneously to guarantee no duplicates?
- How long is your worst-case reprocessing window if you checkpoint every 30 seconds?

Add the checkpoint block, run the job, let at least 2 checkpoints complete in the Web UI, then kill and restart. Watch the Kafka offset in the Web UI to verify it resumes mid-stream.

<!-- ~15 minutes -->

---

# Activity 3 — Step 1

```python
# Step 1 of 3 — add immediately after env = ...get_execution_environment()
env.enable_checkpointing(30_000)  # checkpoint every 30 seconds

Without this, a restart re-reads from Kafka offset 0 — every ALERT fires a second time for already-processed events.

CS 6500 — Big Data Analytics | Week 13

Activity 3 — Step 2

# Step 2 of 3 — add after enable_checkpointing ↑
from pyflink.datastream import HashMapStateBackend
from pyflink.datastream.checkpoint_storage import (
    FileSystemCheckpointStorage)

env.set_state_backend(HashMapStateBackend())
env.get_checkpoint_config().set_checkpoint_storage(
    FileSystemCheckpointStorage(
        "file:///opt/flink/checkpoints/fraud-detector/"))
env.get_checkpoint_config().set_min_pause_between_checkpoints(10_000)

Swap file://hdfs:// in production. HashMapStateBackend stores state on the JVM heap.

CS 6500 — Big Data Analytics | Week 13

Activity 3 — Step 3

# Step 3 of 3 — let 2+ checkpoints complete, then crash and restart

# Web UI → Running Jobs → Checkpoints tab: wait for "Completed" entries

# Cancel the running job
curl -X PATCH http://localhost:8081/jobs/<job-id>?mode=cancel

# Restart
python fraud_detector.py

# Verify: Kafka consumer offset resumes mid-stream (not from 0)
# No duplicate ALERT messages for already-processed events

The Web UI's consumer offset counter is your proof of exactly-once recovery.

CS 6500 — Big Data Analytics | Week 13

Debrief

What did we build and what does it mean?

CS 6500 — Big Data Analytics | Week 13

Debrief Questions

Take 5 minutes — discuss with a neighbor:

  1. State backend: Your fraud model tracks a 30-day window per user — millions of users. Which state backend do you choose and why?

  2. Checkpoint frequency: You checkpoint every 60 seconds. The cluster crashes. What is the worst-case reprocessing cost?

  3. Flink vs. Spark: Your team uses PySpark. A stakeholder wants "real-time" fraud detection. What questions do you ask before deciding whether to add Flink?

  4. Exactly-once: Exactly-once with Kafka requires an idempotent sink. What happens if your downstream database doesn't support idempotent writes?

CS 6500 — Big Data Analytics | Week 13

Key Takeaways

  • Keyed state (ValueState, ListState, MapState) maintains per-key context — always declare in open() for checkpoint-aware recovery
  • ProcessFunction provides full control: state access + event-time timers + flexible output
  • Windows — tumbling, sliding, session — fire when the watermark passes the window boundary
  • Chandy-Lamport checkpointing: barrier-based consistent snapshots enable exactly-once recovery without pausing the stream
  • ValueState — single value per key, restored on checkpoint recovery
  • ProcessFunction — custom logic + timers + state
  • Checkpoint barrier — marker that triggers consistent state snapshot across all operators
  • Exactly-once — checkpointing + idempotent sink + committed Kafka offsets
  • RocksDB — disk-backed state backend for state larger than heap memory
CS 6500 — Big Data Analytics | Week 13

What's Missing?

Apache Flink handles stateful true-streaming — but it wasn't built for everything

CS 6500 — Big Data Analytics | Week 13

Flink's Gaps

  • Micro-batch simplicity — Flink's DataStream/ProcessFunction API is powerful but complex; most teams with >100ms latency requirements prefer Spark Structured Streaming's DataFrame API (Week 14)
  • SQL-first data modeling — Flink SQL exists, but it lacks dbt's schema tests, incremental materializations, and versioned lineage documentation
  • Pipeline orchestration — Flink jobs run indefinitely but don't schedule alongside batch ETL, retry across system boundaries, or send failure alerts
  • Historical batch reprocessing — large-scale backfills over months of data are more efficient with Spark's Catalyst optimizer than re-streaming through Flink
CS 6500 — Big Data Analytics | Week 13

What Comes Next

Gap Solution When
Simpler micro-batch streaming Spark Structured Streaming Week 14
SQL transforms with lineage + tests dbt Week 15
Pipeline scheduling and alerting Apache Airflow Week 15

Apache Flink is the right tool for sub-10ms stateful streaming — but production platforms also need Spark for micro-batch (Week 14), dbt for tested batch transforms, and Airflow for orchestration (Week 15).

CS 6500 — Big Data Analytics | Week 13

Homework

Flink Stateful Streaming — due Sunday 11:59 PM

Four tasks:

  1. Velocity counter — click event stream; count per user per 5-minute tumbling window using ValueState; output: (user_id, window_end, count)
  2. Session detector — detect user sessions using EventTimeSessionWindows with 10-minute gap; output session duration in seconds
  3. Checkpoint recovery — add FsStateBackend; stop and restart the job; screenshot the Checkpoints tab before and after; explain what offset Flink resumed from
  4. Flink vs. Spark memo — 300–400 words: given a use case requiring 50ms fraud detection, justify your choice of Flink over Spark Structured Streaming

Submit: 3 Python scripts + 1 PDF with screenshots and memo

CS 6500 — Big Data Analytics | Week 13

Run this before class. Have Web UI open at http://localhost:8081

Walk through this slowly — it's the most complex code of the week. One timer per window: registering on the FIRST tx (cur==1) means exactly one timer fires 60s later and resets the count. The lab's FraudDetector uses ListState of timestamps for a true sliding window — this single-timer pattern is simpler but counts from the first event in each window, not a rolling 60-second lookback.

~10 minutes

~20 minutes

Key observation: batch counter in Web UI picks up where it left off, not from 0.

Answers: 1=RocksDB (disk-backed, handles large state); 2=60 seconds of reprocessing; 3=what latency is required? < 100ms → Flink, otherwise Spark may suffice; 4=you get at-least-once; must make the sink idempotent (upsert by id, dedup table)