Skip to main content

KafkaBackup CRD

The KafkaBackup custom resource defines a backup configuration for Kafka topics.

Overview

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: my-backup
namespace: kafka-backup
spec:
# Backup configuration

Full Specification

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: production-backup
namespace: kafka-backup
spec:
# Schedule (cron format) - omit for one-time backup
schedule: "0 * * * *"

# Kafka cluster connection
kafkaCluster:
bootstrapServers:
- kafka-0.kafka.svc:9092
- kafka-1.kafka.svc:9092
- kafka-2.kafka.svc:9092

# Security protocol: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
securityProtocol: SASL_SSL

# TLS configuration
tlsSecret:
name: kafka-tls
caKey: ca.crt
certKey: tls.crt
keyKey: tls.key

# SASL configuration
saslSecret:
name: kafka-credentials
mechanism: SCRAM-SHA-256 # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
usernameKey: username
passwordKey: password

# Topics to backup
topics:
- orders
- payments
- "events-*" # Wildcard patterns

# Topics to exclude
excludeTopics:
- "__consumer_offsets"
- "_schemas"

# Storage configuration
storage:
storageType: s3 # s3, azure, gcs, pvc

# S3 configuration
s3:
bucket: kafka-backups
region: us-west-2
prefix: production/hourly
endpoint: "" # Custom endpoint (MinIO, etc.)
credentialsSecret:
name: s3-credentials
accessKeyKey: accessKey
secretKeyKey: secretKey

# Azure Blob configuration
azure:
container: kafka-backups
prefix: production/hourly
connectionStringSecret:
name: azure-credentials
key: connectionString

# GCS configuration
gcs:
bucket: kafka-backups
prefix: production/hourly
credentialsSecret:
name: gcs-credentials
key: credentials.json

# PVC configuration
pvc:
claimName: backup-storage
subPath: kafka/production

# Compression settings
compression: zstd # zstd, lz4, none
compressionLevel: 3 # 1-22 for zstd, 1-12 for lz4

# Include original offset in message headers
includeOffsetHeaders: true

# Source cluster identifier (for offset mapping)
sourceClusterId: "production-us-west-2"

# Checkpoint interval
checkpointIntervalSecs: 30

# Backup retention
retention:
backups: 168 # Number of backups to keep

# Rate limiting
rateLimit:
bytesPerSecond: 104857600 # 100 MB/s
recordsPerSecond: 100000

# Circuit breaker
circuitBreaker:
enabled: true
failureThreshold: 5
resetTimeoutSecs: 60

# Resource requirements for backup job
resources:
requests:
cpu: 500m
memory: 512Mi
limits:
cpu: 2
memory: 2Gi

# Job configuration
job:
backoffLimit: 3
activeDeadlineSeconds: 3600
ttlSecondsAfterFinished: 86400

# Pod configuration
podTemplate:
annotations:
prometheus.io/scrape: "true"
labels:
app: kafka-backup
nodeSelector:
node-type: worker
tolerations:
- key: "dedicated"
operator: "Equal"
value: "kafka-backup"
effect: "NoSchedule"

Spec Fields

kafkaCluster

FieldTypeRequiredDescription
bootstrapServers[]stringYesKafka broker addresses
securityProtocolstringNoSecurity protocol (default: PLAINTEXT)
tlsSecretobjectNoTLS certificate secret reference
saslSecretobjectNoSASL credentials secret reference

tlsSecret

FieldTypeRequiredDescription
namestringYesSecret name
caKeystringNoKey for CA certificate (default: ca.crt)
certKeystringNoKey for client certificate (default: tls.crt)
keyKeystringNoKey for client key (default: tls.key)

saslSecret

FieldTypeRequiredDescription
namestringYesSecret name
mechanismstringYesSASL mechanism
usernameKeystringNoKey for username (default: username)
passwordKeystringNoKey for password (default: password)

storage

FieldTypeRequiredDescription
storageTypestringYesStorage backend (s3, azure, gcs, pvc)
s3objectNoS3 configuration
azureobjectNoAzure Blob configuration
gcsobjectNoGCS configuration
pvcobjectNoPVC configuration

s3

FieldTypeRequiredDescription
bucketstringYesS3 bucket name
regionstringYesAWS region
prefixstringNoObject key prefix
endpointstringNoCustom S3 endpoint
credentialsSecretobjectNoCredentials secret (uses IRSA if omitted)

retention

FieldTypeRequiredDescription
backupsintNoNumber of backups to retain

rateLimit

FieldTypeRequiredDescription
bytesPerSecondintNoMaximum bytes per second
recordsPerSecondintNoMaximum records per second

Status

status:
phase: Completed # Pending, Running, Completed, Failed
lastBackupId: "production-backup-20241201-120000"
lastBackupTime: "2024-12-01T12:00:00Z"
lastBackupSize: 1073741824 # bytes
lastBackupRecords: 1000000
nextScheduledTime: "2024-12-01T13:00:00Z"
backupHistory:
- backupId: "production-backup-20241201-120000"
startTime: "2024-12-01T12:00:00Z"
completionTime: "2024-12-01T12:05:00Z"
size: 1073741824
records: 1000000
outcome: success
conditions:
- type: Ready
status: "True"
reason: BackupScheduled
message: "Next backup scheduled for 2024-12-01T13:00:00Z"
lastTransitionTime: "2024-12-01T12:05:00Z"

Examples

Simple Backup (Plaintext)

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: simple-backup
spec:
kafkaCluster:
bootstrapServers:
- kafka:9092
topics:
- my-topic
storage:
storageType: s3
s3:
bucket: my-backups
region: us-west-2

Scheduled Backup with SASL

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: hourly-backup
spec:
schedule: "0 * * * *"

kafkaCluster:
bootstrapServers:
- kafka:9092
securityProtocol: SASL_SSL
saslSecret:
name: kafka-credentials
mechanism: SCRAM-SHA-256
tlsSecret:
name: kafka-tls

topics:
- orders
- payments

storage:
storageType: s3
s3:
bucket: kafka-backups
region: us-west-2
prefix: production/hourly

compression: zstd
compressionLevel: 3
includeOffsetHeaders: true
sourceClusterId: "production"

retention:
backups: 168

Backup to Azure

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: azure-backup
spec:
kafkaCluster:
bootstrapServers:
- kafka:9092

topics:
- "*"

storage:
storageType: azure
azure:
container: kafka-backups
prefix: production
connectionStringSecret:
name: azure-storage
key: connectionString

Backup to PVC

apiVersion: kafka.oso.sh/v1alpha1
kind: KafkaBackup
metadata:
name: local-backup
spec:
kafkaCluster:
bootstrapServers:
- kafka:9092

topics:
- "*"

storage:
storageType: pvc
pvc:
claimName: backup-storage
subPath: kafka

Operations

Trigger Manual Backup

# Add annotation to trigger immediate backup
kubectl annotate kafkabackup my-backup kafka.oso.sh/trigger-backup=$(date +%s) --overwrite

Check Backup Status

kubectl get kafkabackup my-backup -o jsonpath='{.status.phase}'

View Backup History

kubectl get kafkabackup my-backup -o jsonpath='{.status.backupHistory}' | jq

Delete Old Backups

# Managed automatically based on retention.backups
# Or manually:
kubectl patch kafkabackup my-backup --type merge -p '{"spec":{"retention":{"backups":10}}}'

Next Steps