Previous slide Next slide Toggle fullscreen Open presenter view
Kafka in Practice
CS 6500 — Week 12, Session 2
CS 6500 — Big Data Analytics | Week 12
Crash Question
"Your consumer processes 50 payment events and writes them to the billing database. Then it crashes — before committing the Kafka offset. When it restarts, does it charge customers again for those 50 events, or silently skip them?"
CS 6500 — Big Data Analytics | Week 12
It Depends on You
The answer depends entirely on how you write the consumer. Today you'll build both the right way and the wrong way — and watch the difference happen live.
Session 2 covers:
Kafka CLI: topics, producers, consumers, and consumer group inspection
Python producer: publishing JSON events with delivery callbacks
Python consumer: manual offset control and batch commit patterns
Consumer group scaling: partition rebalancing in real time
Failure simulation: at-least-once semantics made concrete
What you'll build:
A producer that publishes 100 synthetic clickstream events to Kafka
A consumer with manual offset control that tracks event-type counts
A consumer group scaling demo and a crash simulation
All exercises run in your Docker environment. Kafka and ZooKeeper should already be running from Session 1.
CS 6500 — Big Data Analytics | Week 12
Environment Check
Verify your environment before the lab starts:
python3 -m pip install confluent-kafka
cd examples/12_Kafka_Labs
docker ps | grep kafka
docker exec -it kafka-broker kafka-topics \
--bootstrap-server localhost:9092 --list
docker exec -it kafka-broker kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic clickstream \
--partitions 3 --replication-factor 1
CS 6500 — Big Data Analytics | Week 12
Everything you need from the terminal
CS 6500 — Big Data Analytics | Week 12
Creating Topics
docker exec -it kafka-broker kafka-topics \
--bootstrap-server localhost:9092 \
--create --topic orders --partitions 3 --replication-factor 1
docker exec -it kafka-broker kafka-topics \
--bootstrap-server localhost:9092 --describe --topic orders
CS 6500 — Big Data Analytics | Week 12
Interactive Mode
docker exec -it kafka-broker kafka-console-producer \
--bootstrap-server localhost:9092 \
--topic orders \
--property "parse.key=true" \
--property "key.separator=:"
Type these messages at the > prompt:
user_123:{"order_id": "A1", "amount": 49.99, "status": "placed"}
user_456:{"order_id": "A2", "amount": 12.50, "status": "placed"}
user_123:{"order_id": "A3", "amount": 89.00, "status": "placed"}
Press Ctrl+C when done. Note: user_123 messages always land on the same partition (same key hash).
CS 6500 — Big Data Analytics | Week 12
Consuming Topics
docker exec -it kafka-broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic orders \
--from-beginning \
--property "print.key=true" \
--property "print.partition=true" \
--property "print.offset=true"
CS 6500 — Big Data Analytics | Week 12
Offset Inspection
docker exec -it kafka-broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic orders \
--group lab-group \
--from-beginning
docker exec -it kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group lab-group --describe
Key observation: --from-beginning replays all messages every time. --group with the same group.id resumes from the last committed offset.
CS 6500 — Big Data Analytics | Week 12
Python Producer
Publishing events with delivery confirmation
CS 6500 — Big Data Analytics | Week 12
Producer Setup
from confluent_kafka import Producer
import json, time, random
conf = {
'bootstrap.servers' : 'localhost:9092' ,
'acks' : 'all' ,
'retries' : 3 ,
'linger.ms' : 5
}
producer = Producer(conf)
CS 6500 — Big Data Analytics | Week 12
Delivery Callback
def delivery_report (err, msg ):
"""Called by Kafka client for every record that is produced."""
if err:
print (f'Delivery FAILED: {err} ' )
else :
print (f'Delivered → topic={msg.topic()} '
f'partition={msg.partition()} offset={msg.offset()} ' )
The delivery_report callback is called asynchronously after the broker acknowledges (or rejects) each message. Use producer.flush() to wait for all outstanding callbacks before exiting.
CS 6500 — Big Data Analytics | Week 12
Publishing Events
event_types = ['page_view' , 'add_to_cart' , 'purchase' , 'search' ]
for i in range (100 ):
user_id = f'user_{random.randint(1 , 20 )} '
event = {
'event_id' : i,
'user_id' : user_id,
'event_type' : random.choice(event_types),
'timestamp' : time.time(),
'amount' : round (random.uniform(10 , 500 ), 2 )
if random.random() > 0.7 else None
}
producer.produce(
topic='clickstream' ,
key=user_id,
value=json.dumps(event),
callback=delivery_report
)
producer.poll(0 )
CS 6500 — Big Data Analytics | Week 12
Key Points
producer.flush()
print (f'Done. Published 100 events.' )
Key points:
key=user_id ensures all events for the same user land on the same partition
producer.poll(0) drains the internal callback queue without waiting
producer.flush() is essential before program exit — otherwise undelivered messages are dropped
CS 6500 — Big Data Analytics | Week 12
Python Consumer
Manual offset control and batch processing
CS 6500 — Big Data Analytics | Week 12
Offset Control
from confluent_kafka import Consumer, KafkaError
import json
conf = {
'bootstrap.servers' : 'localhost:9092' ,
'group.id' : 'analytics-service' ,
'auto.offset.reset' : 'earliest' ,
'enable.auto.commit' : False
}
consumer = Consumer(conf)
consumer.subscribe(['clickstream' ])
batch = []
BATCH_SIZE = 10
CS 6500 — Big Data Analytics | Week 12
Poll Loop
try :
while True :
msg = consumer.poll(timeout=1.0 )
if msg is None :
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
event = json.loads(msg.value())
batch.append((msg, event))
if len (batch) >= BATCH_SIZE:
consumer.commit()
print (f'Committed batch of {BATCH_SIZE} .' )
batch = []
finally :
consumer.close()
CS 6500 — Big Data Analytics | Week 12
Batch Processing
if len (batch) >= BATCH_SIZE:
event_counts = {}
for _, event in batch:
etype = event.get('event_type' , 'unknown' )
event_counts[etype] = event_counts.get(etype, 0 ) + 1
if etype == 'purchase' and event.get('amount' ):
print (f" Purchase: user={event['user_id' ]} "
f"amount=${event['amount' ]:.2 f} " )
print (f'Batch summary: {event_counts} ' )
CS 6500 — Big Data Analytics | Week 12
Commit Last
consumer.commit()
batch = []
Why commit last: if an exception occurs during processing, consumer.commit() is never reached. On restart, the consumer reads from the last committed offset — redelivering this batch. This is at-least-once semantics by design.
CS 6500 — Big Data Analytics | Week 12
In-Class Exercise
Four tasks building on each other
CS 6500 — Big Data Analytics | Week 12
Task 1: Producer
Goal: Publish 100 events and observe partition distribution.
python3 producer.py
docker exec -it kafka-broker kafka-console-consumer \
--bootstrap-server localhost:9092 \
--topic clickstream \
--from-beginning \
--property print.partition=true \
--property print.offset=true | head -20
CS 6500 — Big Data Analytics | Week 12
Task 1: Verify
docker exec -it kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group analytics-service --describe
Discuss with a neighbor:
What offset did each partition reach after 100 events? (They won't be equal — why not?)
Did all user_1 events land on the same partition? Verify by checking the output.
CS 6500 — Big Data Analytics | Week 12
Task 2: Consumer
Goal: Add event-type counting; verify that restarting resumes from the last committed offset.
Modify consumer.py to:
Maintain a running total_counts dictionary across all batches (not just per batch)
Print the running totals every 20 events: {'page_view': 47, 'purchase': 12, ...}
Change BATCH_SIZE to 20
CS 6500 — Big Data Analytics | Week 12
Task 2: Replay
python3 consumer.py
docker exec -it kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 --group analytics-service --describe
python3 consumer.py
Discuss: What was the committed offset after Run 1? Did Run 2 reprocess any messages? Why or why not?
CS 6500 — Big Data Analytics | Week 12
Task 3: Scaling
Goal: Observe partition rebalancing when consumers join and leave.
python3 consumer_group_a.py
python3 consumer_group_b.py
watch -n 2 "docker exec kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group scale-group --describe"
CS 6500 — Big Data Analytics | Week 12
Task 3: Observe
What to observe:
When only Consumer A is running: all 3 partitions assigned to A
When Consumer B joins: Kafka triggers rebalancing — partitions split between A and B (2 + 1 or similar)
Kill Consumer A (Ctrl+C in Terminal 1): watch B inherit A's partitions within ~10 seconds
Discuss: What partition assignment did each consumer get after B joined? How long did rebalancing take after A was killed?
CS 6500 — Big Data Analytics | Week 12
Task 4: Crash Test
Goal: See at-least-once redelivery in action.
Run the dedicated crash simulator — no modification needed:
python3 consumer_crash.py
Read the output: how many messages were processed before the crash? What was the committed offset?
docker exec -it kafka-broker kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group crash-demo-group --describe
CS 6500 — Big Data Analytics | Week 12
Task 4: Run Steps
Run sequence:
Run 1: python3 consumer_crash.py — crashes after 15 messages without committing
Check lag: kafka-consumer-groups ... --group crash-demo-group --describe
Run 2: python3 consumer_crash.py again — the same 15 messages are redelivered
Count: how many total messages were processed across both runs?
Discuss: Why were those 15 messages processed twice? What would make processing idempotent?
CS 6500 — Big Data Analytics | Week 12
Checkpoint
Before moving to the debrief, verify you have completed:
[ ] Task 1: producer.py ran successfully; partition offsets discussed
[ ] Task 2: consumer.py modified with running totals; restart behavior confirmed
[ ] Task 3: Partition rebalancing observed; discussed partition assignment before/after
[ ] Task 4: Crash simulation run; at-least-once redelivery observed
CS 6500 — Big Data Analytics | Week 12
Debrief
What you built and what it means
CS 6500 — Big Data Analytics | Week 12
Key Observations
What you built:
A producer that guarantees delivery via callbacks and flush()
A consumer that controls exactly when offsets advance
A consumer group that dynamically rebalances on failure
A crash scenario that proves at-least-once redelivery
The core pattern:
poll → process → commit → poll
↑
only commit after
successful processing
What it means in production:
Your processing logic must handle duplicate events gracefully — use an idempotency key (event_id) to detect replays
Consumer group rebalancing is automatic — no operator action needed when a pod crashes in Kubernetes
Consumer lag (log-end-offset - committed-offset) is your key health metric — high lag = consumer falling behind
Partition count is a deployment-time decision — plan for your maximum consumer count before you launch
CS 6500 — Big Data Analytics | Week 12
What's Missing?
Kafka stores and delivers events at scale — but it wasn't built for everything
CS 6500 — Big Data Analytics | Week 12
The Gaps (1/2)
No stateful computations — Kafka delivers raw events; computing a windowed revenue total, a session count, or a stream-stream join requires a separate processing layer (Flink or Spark Structured Streaming) reading from Kafka topics
No query interface — you can't ask Kafka "how many purchase events happened in the last 10 minutes for user_42"; you can only consume the stream sequentially and maintain that state yourself
CS 6500 — Big Data Analytics | Week 12
The Gaps (2/2)
Operational surface area — partition count decisions at topic creation time are irreversible (you can add partitions but existing data doesn't redistribute); tuning broker configs for production throughput requires deep expertise
Exactly-once is non-trivial — the Task 4 simulation showed at-least-once; achieving true end-to-end exactly-once requires Kafka transactions + idempotent producers + careful downstream coordination
CS 6500 — Big Data Analytics | Week 12
What Comes Next
Gap
Solution
When
Sub-100ms event processing and richer stateful operators
Apache Flink (true streaming)
Week 13
Spark-native streaming with DataFrame API, watermarks, and windows
Spark Structured Streaming
Week 14
Tested SQL transformations and pipeline orchestration
dbt + Apache Airflow
Week 15
Kafka is the right tool for durable, high-throughput event ingestion, fan-out, and replay — but the moment you need aggregations, joins, or alerts over that stream, you need a processing layer consuming from Kafka.
CS 6500 — Big Data Analytics | Week 12
Give 2 minutes for environment check. Identify anyone with issues before starting the CLI demo. The most common issue is ZooKeeper not being ready — docker compose logs zookeeper will show the status.
5 minutes. Emphasize: user_123 appears twice — both records go to the same partition because the partition key is deterministic. Ask students: why would that matter for order processing?
5 minutes. The consumer groups describe output shows the LAG column — how far behind a consumer is. This is the production metric for monitoring consumer health.
linger.ms=5 enables micro-batching — the producer waits 5ms to collect more messages before sending a batch. This improves throughput at the cost of a tiny latency increase.
Walk through each argument to produce(). Students often forget flush() and wonder why some deliveries never get confirmed.
The critical line is enable.auto.commit=False. Without it, Kafka advances offsets on a timer regardless of processing success. With it, we own the commit decision.
This is the core pattern. Draw the timeline: poll → process → commit → poll. Ask: "What if the database write in the processing step fails? What do we want to happen?" They should say: reprocess the batch.
15 minutes. Students often expect 33-33-34 distribution across 3 partitions. In practice, the hash of each user_id is not perfectly balanced — one partition usually gets more. This is a real-world lesson about partition key selection.
20 minutes. The key moment is Run 2 — students see that the consumer picks up exactly where it left off. This is the "committed offset as a bookmark" concept made tangible.
15 minutes. Set session.timeout.ms=10000 in both consumer configs so rebalancing happens in 10 seconds instead of 45 — makes the demo visible.
10 minutes. Expected answer: the batch is redelivered on restart because commit never happened. The LAG column will confirm uncommitted messages. This is the core insight of at-least-once: the batch is the unit of commit, not the individual message.
2-minute checkpoint before debrief. If most students are still on Task 3, skip Task 4 — explain at-least-once redelivery in the debrief instead. If stuck, check: docker compose logs kafka-broker --tail=50
5 minutes. Ask: "If you were building the billing service, how would you make it safe against the Task 4 failure scenario?" They should say: upsert by event_id so duplicate processing produces the same result.