Skip to main content

Kafka Streams Example

This guide shows how to backup and restore data for Kafka Streams applications, including state stores and changelog topics.

Kafka Streams Architecture

Kafka Streams applications create internal topics:

┌────────────────────────────────────────────────────────────────────┐
│ Kafka Streams Application │
├────────────────────────────────────────────────────────────────────┤
│ │
│ Input Topics State Stores Output Topics │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ orders │ ───▶ │ order-store │ ───▶ │ enriched- │ │
│ │ │ │ (RocksDB) │ │ orders │ │
│ └─────────────┘ └──────┬──────┘ └─────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ app-id-order-store- │ (Changelog topic) │
│ │ changelog │ │
│ └─────────────────────┘ │
│ │
│ ┌─────────────────────┐ │
│ │ app-id-KSTREAM- │ (Repartition topic) │
│ │ REPARTITION-0000 │ │
│ └─────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────┘

What to Backup

For complete Kafka Streams recovery:

Topic TypeBackupWhy
Input topicsYesSource data
Output topicsYesProcessed results
Changelog topicsYesState store data
Repartition topicsOptionalCan be recreated

Backup Configuration

streams-backup.yaml
mode: backup
backup_id: "streams-backup-${TIMESTAMP}"

source:
bootstrap_servers:
- kafka:9092
topics:
include:
# Input topics
- orders
- inventory
- customers

# Output topics
- enriched-orders
- order-totals
- alerts

# Internal topics (by application.id)
- "order-processor-*" # Captures changelog and repartition

exclude:
# Skip repartition topics if desired (can be recreated)
- "*-repartition-*"

storage:
backend: s3
bucket: kafka-backups
prefix: streams/order-processor

backup:
compression: zstd
compression_level: 3
include_offset_headers: true
source_cluster_id: "production"

Minimal Backup

Backup only input and output topics:

source:
topics:
include:
- orders
- inventory
- customers
- enriched-orders
- order-totals

# Skip internal topics (state will be rebuilt from input)
exclude:
- "order-processor-*"

Restore Strategies

Strategy 1: Full Restore with State

Restore everything including state stores:

streams-restore-full.yaml
mode: restore
backup_id: "streams-backup-20241201"

target:
bootstrap_servers:
- target-kafka:9092

storage:
backend: s3
bucket: kafka-backups
prefix: streams/order-processor

restore:
# Restore all topics including changelog
topics:
- orders
- inventory
- customers
- enriched-orders
- order-totals
- "order-processor-*"

include_original_offset_header: true

After restore:

# Restart Streams application
# It will load state from changelog topics
java -jar order-processor.jar

Strategy 2: Rebuild State from Input

Restore only input topics, let Streams rebuild state:

streams-restore-rebuild.yaml
mode: restore
backup_id: "streams-backup-20241201"

target:
bootstrap_servers:
- target-kafka:9092

restore:
topics:
- orders
- inventory
- customers

# Reset to beginning to reprocess all data
consumer_group_strategy: earliest

After restore:

# Delete state store directory
rm -rf /var/kafka-streams/order-processor

# Reset application to reprocess
kafka-streams-application-reset \
--bootstrap-servers target-kafka:9092 \
--application-id order-processor \
--input-topics orders,inventory,customers

# Restart application
java -jar order-processor.jar

Strategy 3: PITR with State Consistency

For point-in-time recovery, ensure state matches:

streams-restore-pitr.yaml
mode: restore
backup_id: "streams-backup-20241201"

restore:
# Restore to specific point
time_window_end: 1701450000000

topics:
# Input topics up to PITR point
- orders
- inventory
- customers

# DON'T restore changelog - state won't match PITR point
# State will be rebuilt from input

Consumer Offset Management

Kafka Streams Consumer Groups

Kafka Streams uses consumer groups named {application.id}:

# View Streams consumer group
kafka-consumer-groups \
--bootstrap-server kafka:9092 \
--group order-processor \
--describe

Reset for Reprocessing

Using Kafka Streams reset tool:

kafka-streams-application-reset \
--bootstrap-servers target-kafka:9092 \
--application-id order-processor \
--input-topics orders,inventory,customers \
--to-earliest

Using OSO Kafka Backup:

offset_reset:
groups:
- order-processor
strategy: earliest

Resume from Position

If restoring with changelog:

offset_reset:
groups:
- order-processor
strategy: header-based
source_cluster: "production"

Example: Order Processing Application

Application Code

// Order processing Kafka Streams application
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");

StreamsBuilder builder = new StreamsBuilder();

// State store for order aggregation
builder.addStateStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("order-totals"),
Serdes.String(),
Serdes.Double()
)
);

// Process orders
builder.stream("orders", Consumed.with(Serdes.String(), orderSerde))
.groupByKey()
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getAmount(),
Materialized.as("order-totals")
)
.toStream()
.to("customer-totals");

Internal Topics Created

order-processor-order-totals-changelog
order-processor-order-totals-repartition

Backup Script

#!/bin/bash
# backup-streams.sh

APP_ID="order-processor"
TIMESTAMP=$(date +%Y%m%d-%H%M%S)

cat > /tmp/streams-backup.yaml << EOF
mode: backup
backup_id: "${APP_ID}-${TIMESTAMP}"

source:
bootstrap_servers:
- kafka:9092
topics:
include:
- orders
- customer-totals
- "${APP_ID}-*"

storage:
backend: s3
bucket: kafka-backups
prefix: streams/${APP_ID}

backup:
compression: zstd
include_offset_headers: true
source_cluster_id: "production"
EOF

kafka-backup backup --config /tmp/streams-backup.yaml

Restore Script

#!/bin/bash
# restore-streams.sh

APP_ID="order-processor"
BACKUP_ID="$1"

# Step 1: Stop the Streams application
kubectl scale deployment ${APP_ID} --replicas=0

# Step 2: Restore data
cat > /tmp/streams-restore.yaml << EOF
mode: restore
backup_id: "${BACKUP_ID}"

target:
bootstrap_servers:
- target-kafka:9092

storage:
backend: s3
bucket: kafka-backups
prefix: streams/${APP_ID}

restore:
topics:
- orders
- customer-totals
- "${APP_ID}-*"
include_original_offset_header: true
EOF

kafka-backup three-phase-restore --config /tmp/streams-restore.yaml

# Step 3: Restart Streams application
kubectl scale deployment ${APP_ID} --replicas=3

State Store Considerations

Local State Directory

Kafka Streams stores state locally:

/var/kafka-streams/{application.id}/{task.id}/
├── rocksdb/
│ └── order-totals/
│ ├── CURRENT
│ ├── MANIFEST-000001
│ ├── OPTIONS-000001
│ └── 000001.sst
└── .checkpoint

State Restoration Options

OptionMethodTimeConsistency
From changelogRestore changelog topicsFastExact state
RebuildReprocess input topicsSlowEventually consistent
Standby replicasUse standby taskInstantExact state

Standby Replicas

Configure standby replicas for faster recovery:

props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);

Testing the Restore

Verify Input Topics

# Check topic exists with data
kafka-console-consumer \
--bootstrap-server target-kafka:9092 \
--topic orders \
--from-beginning \
--max-messages 10

Verify Changelog Topics

# Check changelog has state
kafka-console-consumer \
--bootstrap-server target-kafka:9092 \
--topic order-processor-order-totals-changelog \
--from-beginning \
--max-messages 10 \
--property print.key=true

Verify Application State

# Query state store via interactive query
curl http://streams-app:8080/state/order-totals/customer-123

Kubernetes Deployment

Backup CronJob

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: streams-backup
namespace: kafka-backup
spec:
schedule: "0 * * * *" # Hourly
kafkaCluster:
bootstrapServers:
- kafka:9092
topics:
- orders
- customer-totals
- "order-processor-*"
storage:
storageType: s3
s3:
bucket: kafka-backups
region: us-west-2
prefix: streams/order-processor
compression: zstd

Restore Job

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaRestore
metadata:
name: streams-restore
namespace: kafka-backup
spec:
backupId: "order-processor-20241201-100000"
targetCluster:
bootstrapServers:
- target-kafka:9092
storage:
storageType: s3
s3:
bucket: kafka-backups
region: us-west-2
prefix: streams/order-processor
offsetReset:
strategy: headerBased
consumerGroups:
- order-processor

Best Practices

  1. Include changelog topics for fast state recovery
  2. Use application.id prefix to capture all internal topics
  3. Test state consistency after restore
  4. Consider standby replicas for production deployments
  5. Document internal topic naming for your applications

Next Steps