Apache Flink

CS 6500 — Week 13, Session 1

CS 6500 — Big Data Analytics | Week 13

50ms or Blocked?

"A stolen card swipes at a point-of-sale terminal. Your fraud model needs a decision — approve or deny — in under 50 milliseconds. Kafka delivered the event. What processor is fast enough to beat the transaction?"

CS 6500 — Big Data Analytics | Week 13

Streaming Answers

Batch jobs answer this question in minutes. Spark micro-batch answers in ~100ms. Apache Flink answers in under 10ms — because it processes each record the moment it arrives.

Session 1 — The Foundation

  • Why micro-batch has a latency floor
  • Flink's runtime: JobManager + TaskManagers
  • DataStream API: sources, transforms, sinks
  • Event time and watermarks
  • Demo: streaming word count

Session 2 — Putting It to Work

  • Keyed state: ValueState, ListState
  • Windows in Flink
  • Exactly-once with Kafka
  • Lab: fraud detection pipeline

Flink is the engine behind real-time fraud detection at PayPal, clickstream analytics at Alibaba, and recommendation updates at Netflix.

CS 6500 — Big Data Analytics | Week 13

Week 12 Recap

Kafka: the pipe

  • Topics, partitions, replication
  • Python producers publishing JSON events
  • Consumer groups and offset management
  • At-least-once delivery by default

The gap Kafka leaves

  • Kafka stores and delivers events
  • Kafka does not compute on them
  • Aggregations, joins, and alerts need a stream processor
  • This week: Apache Flink fills that role

The pattern: Kafka is the durable log that buffers events from producers. Flink reads from that log and processes events with microsecond latency.

Kafka and Flink form the most common true-streaming stack in production. Students should think of Kafka as a durable queue and Flink as the computation engine that consumes from it.

CS 6500 — Big Data Analytics | Week 13

Why Latency Matters

The case for true stream processing

CS 6500 — Big Data Analytics | Week 13

The Latency Spectrum

Latency Class Range Examples
Real-time < 10ms Fraud block, trading, IoT control
Near real-time 100ms–1s Live dashboards, alerting, recommendations
Micro-batch 1s–30s Spark Structured Streaming
Mini-batch 1–5 min Some Flink applications
Batch Minutes–hours Nightly ETL, ML training

The question to ask: What is the business cost of a 100ms delay? A 1-second delay?

Spark Structured Streaming's minimum trigger interval is about 100ms in practice. Flink processes each record as it arrives — latency is bounded by network and deserialization time, typically 1–10ms end-to-end.

For credit card fraud, ~80% of fraud damage is done in the first 10 minutes. A 100ms decision vs. a 500ms decision can be the difference between blocking the first transaction and missing the fifth.

CS 6500 — Big Data Analytics | Week 13

Batch vs Streaming

Micro-batch (Spark)

  • Collect events for trigger interval (e.g., 5s)
  • Run a Spark job on the mini-batch
  • Emit results, advance offsets
  • Minimum latency: ~100ms
  • Simpler: same DataFrame API

True streaming (Flink)

  • Process each record the moment it arrives
  • No batching step — pipelined execution
  • Latency: 1–10ms typical
  • More complex: DataStream API
  • Stateful operators with native checkpointing

Rule of thumb: Use Spark Structured Streaming (Week 14) for near-real-time. Use Flink when latency must be under 10ms or when stateful operator complexity requires fine-grained control.

Micro-batch is easier to reason about and covers ~90% of "streaming" requirements. True streaming with Flink is needed when the business genuinely requires sub-100ms latency or when Flink's richer stateful API (ProcessFunction, timers, RocksDB state) is required.

CS 6500 — Big Data Analytics | Week 13

Flink Architecture

JobManagers, TaskManagers, and data flow

CS 6500 — Big Data Analytics | Week 13

Architecture Diagram

Flink Architecture

CS 6500 — Big Data Analytics | Week 13

Flink's Runtime

JobManager (master)

  • Receives the job graph from the client
  • Schedules tasks onto TaskManagers
  • Coordinates checkpoints
  • High-availability: ZooKeeper or Kubernetes

TaskManagers (workers)

  • Execute operator subtasks
  • Each has a fixed number of slots
  • Slots share network + memory buffers
  • Communicate via network shuffle channels
Client → JobManager → [TaskManager 1: slots 1-4]
                    → [TaskManager 2: slots 1-4]
                    → [TaskManager 3: slots 1-4]

A slot is the unit of resource allocation. One slot can run one pipeline of operators (a "slot-sharing group"). The number of slots per TaskManager is set at startup and determines maximum parallelism.

CS 6500 — Big Data Analytics | Week 13

Parallelism

Flink scales by running multiple subtask copies of each operator.

Source (p=3)  →  Map (p=3)  →  KeyBy  →  Reduce (p=3)
  [s1][s2][s3]    [m1][m2][m3]            [r1][r2][r3]
  • Parallelism = number of subtask instances per operator
  • Each subtask runs in one slot on one TaskManager
  • Flink redistributes records between operators via network channels
# Set global parallelism
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

# Override per operator
stream.map(my_fn).set_parallelism(2)

keyBy is a shuffling operation — all records with the same key go to the same subtask. This enables per-key state to be local to a single subtask, which is the foundation of Flink's stateful streaming model.

CS 6500 — Big Data Analytics | Week 13

Backpressure

When a downstream operator is slow, Flink signals upstream operators to slow down — rather than drop records or crash.

How it works

  • Each operator pair shares a network buffer
  • When the buffer fills, the upstream operator blocks
  • The signal propagates all the way back to the source
  • Source reads from Kafka slower → no data loss

Why it matters

  • Without it: fast source overwhelms slow operator → OOM
  • With it: the pipeline self-regulates automatically
  • Flink Web UI shows backpressure per operator (red = problem)
  • Fix: increase parallelism on the bottleneck operator

Backpressure is Flink's built-in flow control mechanism. It is implemented via TCP flow control on the network channels between TaskManagers. The Web UI's backpressure tab samples operators periodically — a ratio above 0.5 indicates the operator is a bottleneck and is holding up the entire pipeline.

Backpressure prevents cascading failures. Without it, a spike in traffic or a slow external sink (e.g., a saturated database) would cause the in-memory queue to overflow and the job to crash. Monitoring backpressure in production is the first diagnostic step when pipeline latency increases.

CS 6500 — Big Data Analytics | Week 13

The DataStream API

Sources, transformations, and sinks

CS 6500 — Big Data Analytics | Week 13

DataStream Basics

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types

env = StreamExecutionEnvironment.get_execution_environment()

# 1. Source — read from a collection (demo) or Kafka (production)
stream = env.from_collection(
    [(1, "click"), (2, "view"), (3, "click")],
    type_info=Types.TUPLE([Types.INT(), Types.STRING()])
)

# 2. Transform — pure Python functions
filtered = stream.filter(lambda x: x[1] == "click")
mapped   = filtered.map(lambda x: (x[0], x[1].upper()))

# 3. Sink — print (demo) or Kafka/HDFS (production)
mapped.print()

# 4. Execute — nothing runs until this line
env.execute("My First Flink Job")

PyFlink wraps the Java DataStream API. The execution model is lazy: from_collection, filter, map all just build the job graph. env.execute() submits the graph to the JobManager and blocks until the job finishes (or runs forever for streaming).

CS 6500 — Big Data Analytics | Week 13

Transformations

Operation Description Example
map 1-to-1 record transform parse JSON → typed tuple
flatMap 1-to-N (or 0) tokenize sentence into words
filter Keep matching records keep only status == "error"
keyBy Partition by key group by user_id
reduce Stateful aggregation per key running sum per user
process Full control with state + timers custom stateful logic
CS 6500 — Big Data Analytics | Week 13

Stateless vs Stateful

Stateless (map, flatMap, filter)

  • No memory of past records
  • Each record processed independently
  • Scales linearly — no coordination

Stateful (keyBy → reduce/process)

  • Maintains per-key state across records
  • Requires keyBy before state access
  • State backed by memory or RocksDB
CS 6500 — Big Data Analytics | Week 13

Sources and Sinks

Source Description Production?
from_collection In-memory list Dev/testing only
KafkaSource (builder API) Kafka topic with offset tracking ✅ Yes
FileSystem HDFS/S3 file discovery ✅ Yes
SocketTextStream TCP socket Dev/testing only
CS 6500 — Big Data Analytics | Week 13

Sinks

Sink Description Production?
print() Console output Dev/testing only
FlinkKafkaProducer Kafka topic ✅ Yes
FileSink (streaming) Parquet/ORC to HDFS ✅ Yes
JDBC Relational database ✅ Yes

Production rule: always Kafka or FileSink as the sink. Never print() at scale.

CS 6500 — Big Data Analytics | Week 13

Demo — Word Count

Building the first Flink pipeline

CS 6500 — Big Data Analytics | Week 13

Demo: Setup

# Start the Flink cluster in Docker
docker-compose up -d flink-jobmanager flink-taskmanager

# Verify cluster is ready
curl http://localhost:8081/overview
# {"taskmanagers":1,"slots-total":4,"slots-available":4,...}

# Open the Flink Web UI
# http://localhost:8081  — shows job list, task manager status, checkpoints

Flink Web UI walkthrough:

  • Job Manager tab: memory, uptime
  • Task Managers: slot count, memory usage
  • Submit New Job: upload a JAR or Python script
  • Running Jobs: live throughput, latency metrics per operator
CS 6500 — Big Data Analytics | Week 13

Demo: Word Count

# word_count_stream.py
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common.typeinfo import Types

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

# Source: socket (for live demo — type sentences into terminal)
stream = env.socket_text_stream("localhost", 9999)

# Transform: tokenize → count per word
word_counts = (stream
    .flat_map(lambda line: [(w.lower(), 1) for w in line.split()],
              output_type=Types.TUPLE([Types.STRING(), Types.INT()]))
    .key_by(lambda x: x[0])
    .reduce(lambda a, b: (a[0], a[1] + b[1])))

word_counts.print()
env.execute("Streaming Word Count")
CS 6500 — Big Data Analytics | Week 13

Demo: Running in Docker

# Terminal 1 — open a socket inside the container (data source)
docker exec -it flink-taskmanager nc -lk 9999

# Terminal 2 — submit the Flink job inside the same container
docker exec -it flink-taskmanager \
  python /opt/flink_lab/word_count_stream.py

# Type sentences in Terminal 1; counts appear instantly in Terminal 2

Both commands run inside flink-taskmanager so localhost:9999 resolves correctly. The examples/13_Flink/ directory is mounted read-only at /opt/flink_lab — no copying needed.

CS 6500 — Big Data Analytics | Week 13

Event Time in Flink

Processing records where they belong in time

CS 6500 — Big Data Analytics | Week 13

Event vs Processing

Processing time

  • When Flink receives the record
  • Simplest — no timestamp needed
  • Problem: mobile events buffer offline
    • Click at 9:00 AM, arrives at 9:15 AM
    • Processing time: 9:15 ❌ (wrong window)

Event time

  • When the event actually occurred
  • Timestamp embedded in the payload
  • Click at 9:00 → 9:00–9:05 window ✓
  • Requires watermarks to handle lateness
# Event time is declared per source via WatermarkStrategy
# (global TimeCharacteristic was removed in Flink 1.17)
from pyflink.common import WatermarkStrategy, Duration

strategy = WatermarkStrategy.for_bounded_out_of_orderness(
    Duration.of_seconds(10)
)
stream = env.from_source(my_source, strategy, "my-source")

The same reasoning applies as in Spark Structured Streaming (Week 14). Event time is always preferred when the payload contains a timestamp. Processing time should only be used for benchmarking or synthetic data.

CS 6500 — Big Data Analytics | Week 13

Watermarks in Flink

A watermark is a signal that says: "all events with timestamp < W have arrived."

from pyflink.datastream.functions import WatermarkStrategy
from pyflink.common import Duration

# Accept events up to 10 seconds late
strategy = (WatermarkStrategy
    .for_bounded_out_of_orderness(Duration.of_seconds(10))
    .with_timestamp_assigner(MyTimestampAssigner()))

stream = env.from_source(kafka_source, strategy, "Kafka")

How the watermark advances:

  • Flink tracks the maximum event timestamp seen so far
  • Watermark = max_event_time − out_of_orderness_bound
  • When watermark passes a window's end → window fires (results emitted)
  • Events arriving after the window fires are late → configurable behavior

Without watermarks, Flink would hold window state forever waiting for late records. The watermark tells the system when it's safe to finalize a window and release its memory.

CS 6500 — Big Data Analytics | Week 13

Ordered Stream

Watermark in an ordered stream

The watermark advances with every record — no gaps, no waiting.

CS 6500 — Big Data Analytics | Week 13

Late Arrivals

Watermark in an out-of-order stream

The watermark lags behind max_event_time by the out-of-orderness bound — late events land inside the lag and are accepted.

CS 6500 — Big Data Analytics | Week 13

Parallel Watermarks

Watermarks in parallel streams

Each source subtask generates its own watermark. Downstream operators take the minimum watermark across all inputs before advancing.

This is the key reason why low-parallelism sources with slow partitions can stall watermark progress for the entire pipeline. If one Kafka partition goes idle (no new events), its subtask's watermark freezes — and the minimum across all subtasks stops advancing. Always monitor per-subtask watermarks in the Flink Web UI when debugging late firing windows.

CS 6500 — Big Data Analytics | Week 13

Choosing the Delay

The out-of-orderness bound = the maximum time an event can arrive late.

Source type Typical delay Reasoning
Internal service (same DC) 0–1 s Sub-millisecond network; rare reordering
Mobile / IoT device 10–60 s Offline buffering, reconnect bursts
Cross-region pipeline 5–30 s Variable WAN latency
Kafka (same cluster) 0–5 s Partition rebalance or broker pause

Rule: set the bound to the 99th-percentile late-arrival time for your source — not the maximum, or you waste memory; not the median, or you drop valid events.

You can measure the actual out-of-orderness of your source by plotting processing_time − event_time per record over several hours. The 99th percentile of that distribution is your bound. Start conservative (10–30 s) and tighten after observing production data.

CS 6500 — Big Data Analytics | Week 13

Window Types

Type Shape Use case
Tumbling Fixed, non-overlapping "Totals per 5-minute interval"
Sliding Fixed size, advances by step "5-min avg, updated every 1 min"
Session Closes after N minutes of inactivity "Events in one user session"
Global No time boundary; fires every N events "Micro-batch by count"
  • Tumbling: every event belongs to exactly one window
  • Sliding: one event can belong to multiple overlapping windows
  • Session: window size is data-driven — no fixed duration

Session 2 covers the PyFlink API for each type with code examples.

Tumbling windows are the most common and the easiest to reason about. Sliding windows are used for moving averages and rolling metrics. Session windows are the right choice when user activity has natural pauses — e-commerce browsing, gaming sessions, support chat.

CS 6500 — Big Data Analytics | Week 13

Window Shapes

Flink window types — tumbling, sliding, session

CS 6500 — Big Data Analytics | Week 13

Watermark Activity

8 minutes | Individual

Consider a stream of sensor readings with timestamps embedded in each event. Answer the following:

  1. A sensor buffers events for up to 30 seconds when offline. What watermark delay do you set?
  2. You want "average temperature per sensor per 1-minute interval." Which window type and size?
  3. With a 30-second watermark and 1-minute tumbling windows, when does the 9:00–9:01 window fire?
  4. A reading arrives 45 seconds late. Is it accepted or dropped?

Write your answers — we'll discuss as a class.

CS 6500 — Big Data Analytics | Week 13

Key Takeaways

  • Flink = true streaming: each record processed as it arrives, 1–10ms latency
  • Architecture: JobManager (scheduler) + TaskManagers (workers with slots)
  • Parallelism: each operator runs as multiple subtasks; keyBy routes records to the correct subtask
  • DataStream API: source → map / flatMap / filter / keyBy / reduce / process → sink
  • Event time is when the event occurred; watermarks signal when late events can be dropped

Session 2: Keyed state (ValueState, ListState), windows, exactly-once with Kafka, and a 60-minute fraud detection lab

CS 6500 — Big Data Analytics | Week 13

What's Missing?

Apache Flink handles stateful streaming at low latency — but it wasn't built for everything

CS 6500 — Big Data Analytics | Week 13

Flink's Gaps

  • Micro-batch simplicity — Flink's DataStream API is more complex than Spark's DataFrame API; for most near-real-time needs (>100ms), Spark Structured Streaming (Week 14) is simpler and sufficient
  • SQL transformation pipelines — Flink SQL exists, but dbt's lineage tracking, schema tests, and version-controlled models are not part of Flink's scope
  • Pipeline orchestration — Flink jobs run continuously but don't schedule themselves alongside batch ETL, handle retries across systems, or coordinate with upstream data landing
  • Historical reprocessing — Flink is optimized for live streams; reprocessing months of data efficiently is Spark's domain
CS 6500 — Big Data Analytics | Week 13

What Comes Next

Gap Solution When
Micro-batch with simpler API Spark Structured Streaming Week 14
SQL transformations with testing dbt Week 15
Pipeline scheduling + retry logic Apache Airflow Week 15

Apache Flink is the right tool for sub-10ms stateful streaming — but a production data platform also needs Spark for micro-batch workloads, dbt for tested SQL transforms, and Airflow for end-to-end orchestration. That stack comes together in Weeks 14–15.

CS 6500 — Big Data Analytics | Week 13

Welcome. Today introduces true stream processing — the hardest topic in the course conceptually. Pace slowly through architecture; students will need Session 2's lab to cement it.

Let this hang. Ask: "How fast is 50ms? Blink takes about 150ms." Don't answer the question yet — carry it through the whole session.

Tell students the fraud scenario from slide 2 is real — PayPal runs Flink at ~1M events/sec. That's the scale we're building toward in Session 2's lab.

Quick show of hands: "Who got their Kafka producer working in last week's lab?" Use this to gauge where to slow down if needed.

Ask: "Where does your current job's data pipeline fall on this table?" Most students have only seen batch. Use this to make the spectrum feel real.

Common student reaction: "Why not always use Flink?" Answer: operational complexity. Flink requires understanding state backends, watermarks, and checkpoint tuning. Spark Structured Streaming gives 90% of the value with 30% of the complexity.

Walk through the diagram left-to-right: client submits a job → JobManager turns it into a task graph → TaskManagers execute subtasks in slots. Point out that each colored box is an operator, each row is a TaskManager.

Analogy: JobManager is the conductor, TaskManagers are section players, slots are individual musicians. Each slot can play one part of the pipeline simultaneously.

Ask: "If you have 3 TaskManagers each with 4 slots, what's your maximum parallelism?" Answer: 12. Then: "If your source Kafka topic has only 3 partitions, what's the effective source parallelism ceiling?" Answer: 3 — partitions limit source parallelism.

Demo point: during Session 2's lab, open the Web UI backpressure tab while the job is running. Point out the green/yellow/red indicators. This makes an abstract concept visible.

Emphasize the lazy execution model — identical to PySpark. Nothing runs until `execute()`. Ask: "What does this remind you of from Week 8?" Students should recognize the RDD/DataFrame lazy evaluation pattern.

Ask students to match each operation to something from MapReduce or Spark they already know. map=map, flatMap=flatMap, filter=filter, keyBy=groupBy/partitionBy, reduce=reduceByKey. process has no clean Spark equivalent — that's what makes Flink unique.

Key point: you cannot access state without first calling keyBy. Flink enforces this at the API level — calling get_state() outside a keyed context throws at runtime. Session 2 will drill this in the lab.

Note: FlinkKafkaConsumer is what students will use in Session 2's lab. It integrates with Flink's checkpoint protocol — when a checkpoint completes, committed Kafka offsets are part of the snapshot. That's how exactly-once works end-to-end.

Common mistake: students use print() in their homework and wonder why performance is terrible. print() is synchronous and serialized — it becomes the bottleneck immediately. The autograder accepts it for correctness, but flag this explicitly.

Start this before class. Have the Web UI open on the projector. Walk through each tab briefly — students will need to navigate the UI in Session 2's lab. Point out the "Running Jobs" throughput graph; it updates in real time.

Type: "big data big data streaming" — watch word counts increment live. Then type the same phrase again and point out counts accumulate across inputs: that's the stateful reduce. Ask: "How is this different from a batch word count?" The job never terminates — it waits for the next line.

Type: "big data big data streaming" — watch word counts increment live. Then type the same phrase again and point out counts accumulate across inputs: that's the stateful reduce. Ask: "How is this different from a batch word count?" The job never terminates — it waits for the next line.

Ask: "When would processing time be acceptable?" Answer: when you control the producer and events arrive within milliseconds of occurrence — e.g., internal metrics pipelines, not mobile or IoT. Any time there's a network hop that could buffer, use event time.

Draw on the board: a timeline with events arriving out of order. Mark the watermark as a cursor that advances. Show visually how the window fires when the watermark crosses the window boundary. This is the concept students most often get wrong — the watermark is not a timeout, it's a progress signal derived from the data itself.

In an ordered stream the watermark equals max_event_time at every step. This is the ideal case: every window fires the moment its last event arrives. Point out the W(t) label advancing in lockstep with events.

Walk through the diagram: events arrive out of order; the watermark (dashed line) trails behind. Events that arrive BEFORE the watermark line crosses their window boundary are accepted. Events that arrive AFTER the watermark has passed their window are dropped (or sent to a side output). The bound is the gap between the event line and the watermark line.

Ask: "If subtask 1 has watermark W=9:01 and subtask 2 has W=8:58, what watermark does the downstream operator see?" Answer: 8:58 — the minimum. This is why a single slow partition can hold up an entire pipeline.

Ask: "What happens if you set the bound too small?" → valid late events are dropped. "Too large?" → windows hold state for longer, increasing memory usage and output latency. The tradeoff is memory vs. correctness.

Quick poll: "Which window would you use for hourly revenue totals?" (tumbling 1h). "For a 15-minute moving average updated every 5 minutes?" (sliding 15m/5m). "For counting clicks within a single page visit?" (session with gap = idle timeout). This primes the activity.

Point out: tumbling = no overlap (each event in exactly one bucket); sliding = overlap (event counted in multiple windows); session = variable-width gap-based. The diagram makes the "overlap" property of sliding windows immediately visible.

Answers: 1=30 seconds, 2=tumbling 1 minute, 3=when watermark reaches 9:01:00 → at processing time when max_event_time ≥ 9:01:30, 4=dropped (45s > 30s watermark). For Q3: walk through the math slowly — max_event_time must reach 9:01:30 before Flink considers the 9:00–9:01 window complete.

Give students 2 minutes to write down one concept they're still unclear on. Collect on a sticky note or poll — use the responses to prioritize what to revisit at the start of Session 2.