Kafka in Practice

CS 6500 — Week 12, Session 2

CS 6500 — Big Data Analytics | Week 12

Crash Question

"Your consumer processes 50 payment events and writes them to the billing database. Then it crashes — before committing the Kafka offset. When it restarts, does it charge customers again for those 50 events, or silently skip them?"

CS 6500 — Big Data Analytics | Week 12

It Depends on You

The answer depends entirely on how you write the consumer. Today you'll build both the right way and the wrong way — and watch the difference happen live.

Session 2 covers:

  • Kafka CLI: topics, producers, consumers, and consumer group inspection
  • Python producer: publishing JSON events with delivery callbacks
  • Python consumer: manual offset control and batch commit patterns
  • Consumer group scaling: partition rebalancing in real time
  • Failure simulation: at-least-once semantics made concrete

What you'll build:

  • A producer that publishes 100 synthetic clickstream events to Kafka
  • A consumer with manual offset control that tracks event-type counts
  • A consumer group scaling demo and a crash simulation

All exercises run in your Docker environment. Kafka and ZooKeeper should already be running from Session 1.

CS 6500 — Big Data Analytics | Week 12

Environment Check

Verify your environment before the lab starts:

# Install Python client once (host machine)
python3 -m pip install confluent-kafka

# Move to the Kafka lab scripts
cd examples/12_Kafka_Labs

# Confirm Kafka is running
docker ps | grep kafka

# Confirm topics exist (should see clickstream and orders from Session 1)
docker exec -it kafka-broker kafka-topics \
  --bootstrap-server localhost:9092 --list

# Create the lab topic if needed
docker exec -it kafka-broker kafka-topics \
  --bootstrap-server localhost:9092 \
  --create --topic clickstream \
  --partitions 3 --replication-factor 1
CS 6500 — Big Data Analytics | Week 12

Kafka CLI Tools

Everything you need from the terminal

CS 6500 — Big Data Analytics | Week 12

Creating Topics

# Create a topic with 3 partitions
docker exec -it kafka-broker kafka-topics \
  --bootstrap-server localhost:9092 \
  --create --topic orders --partitions 3 --replication-factor 1

# Describe topic: show leader, replicas, ISR for each partition
docker exec -it kafka-broker kafka-topics \
  --bootstrap-server localhost:9092 --describe --topic orders
CS 6500 — Big Data Analytics | Week 12

Interactive Mode

# Interactive producer — one message per line
docker exec -it kafka-broker kafka-console-producer \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property "parse.key=true" \
  --property "key.separator=:"

Type these messages at the > prompt:

user_123:{"order_id": "A1", "amount": 49.99, "status": "placed"}
user_456:{"order_id": "A2", "amount": 12.50, "status": "placed"}
user_123:{"order_id": "A3", "amount": 89.00, "status": "placed"}

Press Ctrl+C when done. Note: user_123 messages always land on the same partition (same key hash).

CS 6500 — Big Data Analytics | Week 12

Consuming Topics

# Consume from the beginning (replay all messages)
docker exec -it kafka-broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --from-beginning \
  --property "print.key=true" \
  --property "print.partition=true" \
  --property "print.offset=true"
CS 6500 — Big Data Analytics | Week 12

Offset Inspection

# Consume as a named group (tracks offset between runs)
docker exec -it kafka-broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --group lab-group \
  --from-beginning

# Inspect consumer group: current offset, log-end offset, LAG
docker exec -it kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group lab-group --describe

Key observation: --from-beginning replays all messages every time. --group with the same group.id resumes from the last committed offset.

CS 6500 — Big Data Analytics | Week 12

Python Producer

Publishing events with delivery confirmation

CS 6500 — Big Data Analytics | Week 12

Producer Setup

# producer.py
from confluent_kafka import Producer
import json, time, random

conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',             # wait for all ISR replicas to acknowledge
    'retries': 3,              # retry on transient failures
    'linger.ms': 5             # batch messages for up to 5ms before sending
}
producer = Producer(conf)
CS 6500 — Big Data Analytics | Week 12

Delivery Callback

def delivery_report(err, msg):
    """Called by Kafka client for every record that is produced."""
    if err:
        print(f'Delivery FAILED: {err}')
    else:
        print(f'Delivered → topic={msg.topic()} '
              f'partition={msg.partition()} offset={msg.offset()}')

The delivery_report callback is called asynchronously after the broker acknowledges (or rejects) each message. Use producer.flush() to wait for all outstanding callbacks before exiting.

CS 6500 — Big Data Analytics | Week 12

Publishing Events

event_types = ['page_view', 'add_to_cart', 'purchase', 'search']

for i in range(100):
    user_id = f'user_{random.randint(1, 20)}'
    event = {
        'event_id':   i,
        'user_id':    user_id,
        'event_type': random.choice(event_types),
        'timestamp':  time.time(),
        'amount':     round(random.uniform(10, 500), 2)
                      if random.random() > 0.7 else None
    }
    producer.produce(
        topic='clickstream',
        key=user_id,               # same user → same partition → ordered events per user
        value=json.dumps(event),
        callback=delivery_report
    )
    producer.poll(0)               # serve delivery callbacks without blocking the loop
CS 6500 — Big Data Analytics | Week 12

Key Points

producer.flush()                   # block until all outstanding messages are delivered
print(f'Done. Published 100 events.')

Key points:

  • key=user_id ensures all events for the same user land on the same partition
  • producer.poll(0) drains the internal callback queue without waiting
  • producer.flush() is essential before program exit — otherwise undelivered messages are dropped
CS 6500 — Big Data Analytics | Week 12

Python Consumer

Manual offset control and batch processing

CS 6500 — Big Data Analytics | Week 12

Offset Control

# consumer.py
from confluent_kafka import Consumer, KafkaError
import json

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id':          'analytics-service',
    'auto.offset.reset': 'earliest',    # start from beginning if no prior offset
    'enable.auto.commit': False         # WE control when offsets advance
}
consumer = Consumer(conf)
consumer.subscribe(['clickstream'])

batch = []
BATCH_SIZE = 10
CS 6500 — Big Data Analytics | Week 12

Poll Loop

try:
    while True:
        msg = consumer.poll(timeout=1.0)    # wait up to 1 second for a message
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue                    # reached end of partition — keep polling
            raise Exception(msg.error())

        event = json.loads(msg.value())
        batch.append((msg, event))

        if len(batch) >= BATCH_SIZE:
            # [processing happens here — see next slide]
            consumer.commit()               # advance ALL offsets in this batch atomically
            print(f'Committed batch of {BATCH_SIZE}.')
            batch = []
finally:
    consumer.close()
CS 6500 — Big Data Analytics | Week 12

Batch Processing

if len(batch) >= BATCH_SIZE:
    event_counts = {}   # accumulate counts for this batch

    for _, event in batch:
        etype = event.get('event_type', 'unknown')
        event_counts[etype] = event_counts.get(etype, 0) + 1

        if etype == 'purchase' and event.get('amount'):
            print(f"  Purchase: user={event['user_id']} "
                  f"amount=${event['amount']:.2f}")

    print(f'Batch summary: {event_counts}')
CS 6500 — Big Data Analytics | Week 12

Commit Last

    # Only commit AFTER all processing is confirmed successful
    # If anything above raises an exception, commit is never called
    # → Kafka will redeliver this batch on restart (at-least-once)
    consumer.commit()
    batch = []

Why commit last: if an exception occurs during processing, consumer.commit() is never reached. On restart, the consumer reads from the last committed offset — redelivering this batch. This is at-least-once semantics by design.

CS 6500 — Big Data Analytics | Week 12

In-Class Exercise

Four tasks building on each other

CS 6500 — Big Data Analytics | Week 12

Task 1: Producer

Goal: Publish 100 events and observe partition distribution.

# Run the producer
python3 producer.py

# Verify delivery: consume from beginning and show partition + offset
docker exec -it kafka-broker kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic clickstream \
  --from-beginning \
  --property print.partition=true \
  --property print.offset=true | head -20
CS 6500 — Big Data Analytics | Week 12

Task 1: Verify

# Inspect partition distribution
docker exec -it kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group analytics-service --describe

Discuss with a neighbor:

  • What offset did each partition reach after 100 events? (They won't be equal — why not?)
  • Did all user_1 events land on the same partition? Verify by checking the output.
CS 6500 — Big Data Analytics | Week 12

Task 2: Consumer

Goal: Add event-type counting; verify that restarting resumes from the last committed offset.

Modify consumer.py to:

  1. Maintain a running total_counts dictionary across all batches (not just per batch)
  2. Print the running totals every 20 events: {'page_view': 47, 'purchase': 12, ...}
  3. Change BATCH_SIZE to 20
CS 6500 — Big Data Analytics | Week 12

Task 2: Replay

# Run 1: start the consumer, let it process all 100 events, stop it (Ctrl+C)
python3 consumer.py

# Check committed offset
docker exec -it kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 --group analytics-service --describe

# Run 2: restart the consumer — does it reprocess old messages?
python3 consumer.py

Discuss: What was the committed offset after Run 1? Did Run 2 reprocess any messages? Why or why not?

CS 6500 — Big Data Analytics | Week 12

Task 3: Scaling

Goal: Observe partition rebalancing when consumers join and leave.

# From examples/12_Kafka_Labs/

# Terminal 1: start Consumer A (group.id = 'scale-group')
python3 consumer_group_a.py

# Terminal 2: start Consumer B (same group.id = 'scale-group')
python3 consumer_group_b.py

# Terminal 3: watch partition assignments in real time
watch -n 2 "docker exec kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group scale-group --describe"
CS 6500 — Big Data Analytics | Week 12

Task 3: Observe

What to observe:

  1. When only Consumer A is running: all 3 partitions assigned to A
  2. When Consumer B joins: Kafka triggers rebalancing — partitions split between A and B (2 + 1 or similar)
  3. Kill Consumer A (Ctrl+C in Terminal 1): watch B inherit A's partitions within ~10 seconds

Discuss: What partition assignment did each consumer get after B joined? How long did rebalancing take after A was killed?

CS 6500 — Big Data Analytics | Week 12

Task 4: Crash Test

Goal: See at-least-once redelivery in action.

Run the dedicated crash simulator — no modification needed:

# Run 1: processes CRASH_AFTER messages, then raises RuntimeError
# The offset is NOT committed — Kafka still thinks these messages are unread
python3 consumer_crash.py

Read the output: how many messages were processed before the crash? What was the committed offset?

# Inspect the lag — should show unread messages despite having processed them
docker exec -it kafka-broker kafka-consumer-groups \
  --bootstrap-server localhost:9092 \
  --group crash-demo-group --describe
CS 6500 — Big Data Analytics | Week 12

Task 4: Run Steps

Run sequence:

  1. Run 1: python3 consumer_crash.py — crashes after 15 messages without committing
  2. Check lag: kafka-consumer-groups ... --group crash-demo-group --describe
  3. Run 2: python3 consumer_crash.py again — the same 15 messages are redelivered
  4. Count: how many total messages were processed across both runs?

Discuss: Why were those 15 messages processed twice? What would make processing idempotent?

CS 6500 — Big Data Analytics | Week 12

Checkpoint

Before moving to the debrief, verify you have completed:

  • [ ] Task 1: producer.py ran successfully; partition offsets discussed
  • [ ] Task 2: consumer.py modified with running totals; restart behavior confirmed
  • [ ] Task 3: Partition rebalancing observed; discussed partition assignment before/after
  • [ ] Task 4: Crash simulation run; at-least-once redelivery observed
CS 6500 — Big Data Analytics | Week 12

Debrief

What you built and what it means

CS 6500 — Big Data Analytics | Week 12

Key Observations

What you built:

  • A producer that guarantees delivery via callbacks and flush()
  • A consumer that controls exactly when offsets advance
  • A consumer group that dynamically rebalances on failure
  • A crash scenario that proves at-least-once redelivery

The core pattern:

poll → process → commit → poll
         ↑
    only commit after
    successful processing

What it means in production:

  • Your processing logic must handle duplicate events gracefully — use an idempotency key (event_id) to detect replays
  • Consumer group rebalancing is automatic — no operator action needed when a pod crashes in Kubernetes
  • Consumer lag (log-end-offset - committed-offset) is your key health metric — high lag = consumer falling behind
  • Partition count is a deployment-time decision — plan for your maximum consumer count before you launch
CS 6500 — Big Data Analytics | Week 12

What's Missing?

Kafka stores and delivers events at scale — but it wasn't built for everything

CS 6500 — Big Data Analytics | Week 12

The Gaps (1/2)

  • No stateful computations — Kafka delivers raw events; computing a windowed revenue total, a session count, or a stream-stream join requires a separate processing layer (Flink or Spark Structured Streaming) reading from Kafka topics
  • No query interface — you can't ask Kafka "how many purchase events happened in the last 10 minutes for user_42"; you can only consume the stream sequentially and maintain that state yourself
CS 6500 — Big Data Analytics | Week 12

The Gaps (2/2)

  • Operational surface area — partition count decisions at topic creation time are irreversible (you can add partitions but existing data doesn't redistribute); tuning broker configs for production throughput requires deep expertise
  • Exactly-once is non-trivial — the Task 4 simulation showed at-least-once; achieving true end-to-end exactly-once requires Kafka transactions + idempotent producers + careful downstream coordination
CS 6500 — Big Data Analytics | Week 12

What Comes Next

Gap Solution When
Sub-100ms event processing and richer stateful operators Apache Flink (true streaming) Week 13
Spark-native streaming with DataFrame API, watermarks, and windows Spark Structured Streaming Week 14
Tested SQL transformations and pipeline orchestration dbt + Apache Airflow Week 15

Kafka is the right tool for durable, high-throughput event ingestion, fan-out, and replay — but the moment you need aggregations, joins, or alerts over that stream, you need a processing layer consuming from Kafka.

CS 6500 — Big Data Analytics | Week 12

Give 2 minutes for environment check. Identify anyone with issues before starting the CLI demo. The most common issue is ZooKeeper not being ready — docker compose logs zookeeper will show the status.

5 minutes. Emphasize: user_123 appears twice — both records go to the same partition because the partition key is deterministic. Ask students: why would that matter for order processing?

5 minutes. The consumer groups describe output shows the LAG column — how far behind a consumer is. This is the production metric for monitoring consumer health.

linger.ms=5 enables micro-batching — the producer waits 5ms to collect more messages before sending a batch. This improves throughput at the cost of a tiny latency increase.

Walk through each argument to produce(). Students often forget flush() and wonder why some deliveries never get confirmed.

The critical line is enable.auto.commit=False. Without it, Kafka advances offsets on a timer regardless of processing success. With it, we own the commit decision.

This is the core pattern. Draw the timeline: poll → process → commit → poll. Ask: "What if the database write in the processing step fails? What do we want to happen?" They should say: reprocess the batch.

15 minutes. Students often expect 33-33-34 distribution across 3 partitions. In practice, the hash of each user_id is not perfectly balanced — one partition usually gets more. This is a real-world lesson about partition key selection.

20 minutes. The key moment is Run 2 — students see that the consumer picks up exactly where it left off. This is the "committed offset as a bookmark" concept made tangible.

15 minutes. Set session.timeout.ms=10000 in both consumer configs so rebalancing happens in 10 seconds instead of 45 — makes the demo visible.

10 minutes. Expected answer: the batch is redelivered on restart because commit never happened. The LAG column will confirm uncommitted messages. This is the core insight of at-least-once: the batch is the unit of commit, not the individual message.

2-minute checkpoint before debrief. If most students are still on Task 3, skip Task 4 — explain at-least-once redelivery in the debrief instead. If stuck, check: docker compose logs kafka-broker --tail=50

5 minutes. Ask: "If you were building the billing service, how would you make it safe against the Task 4 failure scenario?" They should say: upsert by event_id so duplicate processing produces the same result.