AWS Lambda Example
This guide shows how to backup and restore Kafka data consumed by AWS Lambda functions using Amazon MSK as an event source.
Lambda with MSK Architecture
┌───────────────── ────────────────────────────────────────────────────┐
│ AWS Account │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Amazon │ │ Lambda │ │ Other │ │
│ │ MSK │────▶│ Function │────▶│ Services │ │
│ │ │ │ │ │ (DynamoDB, │ │
│ │ ┌────────┐ │ │ │ │ S3, etc) │ │
│ │ │ orders │ │ │ │ │ │ │
│ │ └────────┘ │ └──────────────┘ └──────────────┘ │
│ └──────────────┘ │
│ │ │
│ │ Backup │
│ ▼ │
│ ┌──────────────┐ │
│ │ S3 Bucket │ │
│ │ (Backups) │ │
│ └──────────────┘ │
│ │
└──────────────────────────────────────── ─────────────────────────────┘
Lambda MSK Event Source
AWS Lambda can consume from MSK/Kafka using Event Source Mapping:
// Lambda function receiving Kafka events
exports.handler = async (event) => {
for (const record of event.records) {
for (const topicRecord of record.value) {
const key = Buffer.from(topicRecord.key, 'base64').toString();
const value = Buffer.from(topicRecord.value, 'base64').toString();
const offset = topicRecord.offset;
console.log(`Processing record: key=${key}, offset=${offset}`);
// Process the message
await processOrder(JSON.parse(value));
}
}
return { statusCode: 200 };
};
Event Source Mapping Configuration
{
"EventSourceArn": "arn:aws:kafka:us-west-2:123456789:cluster/my-cluster/abc123",
"FunctionName": "order-processor",
"Topics": ["orders"],
"StartingPosition": "LATEST",
"BatchSize": 100,
"MaximumBatchingWindowInSeconds": 5,
"DestinationConfig": {
"OnFailure": {
"Destination": "arn:aws:sqs:us-west-2:123456789:orders-dlq"
}
}
}
Backup Configuration
Backup from Amazon MSK
msk-backup.yaml
mode: backup
backup_id: "msk-orders-${TIMESTAMP}"
source:
bootstrap_servers:
- b-1.my-cluster.abc123.kafka.us-west-2.amazonaws.com:9092
- b-2.my-cluster.abc123.kafka.us-west-2.amazonaws.com:9092
security:
security_protocol: SASL_SSL
sasl_mechanism: AWS_MSK_IAM
topics:
include:
- orders
- order-events
- notifications
storage:
backend: s3
bucket: kafka-backups
region: us-west-2
prefix: msk/orders
backup:
compression: zstd
include_offset_headers: true
source_cluster_id: "msk-production"
IAM Authentication
For MSK with IAM authentication:
source:
security:
security_protocol: SASL_SSL
sasl_mechanism: AWS_MSK_IAM
# Uses default credential chain (IAM role, env vars, etc.)
IAM Policy for backup:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:DescribeTopic",
"kafka-cluster:ReadData",
"kafka-cluster:DescribeGroup"
],
"Resource": [
"arn:aws:kafka:us-west-2:123456789:cluster/my-cluster/*",
"arn:aws:kafka:us-west-2:123456789:topic/my-cluster/*",
"arn:aws:kafka:us-west-2:123456789:group/my-cluster/*"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::kafka-backups",
"arn:aws:s3:::kafka-backups/*"
]
}
]
}
Lambda Consumer Group Handling
Understanding Lambda Consumer Groups
Lambda uses auto-generated consumer group IDs:
Consumer Group ID Pattern:
amazon.lambda.{event-source-uuid}
Example:
amazon.lambda.12345678-1234-1234-1234-123456789012
Finding Lambda Consumer Group
# List consumer groups
aws kafka list-groups \
--cluster-arn arn:aws:kafka:us-west-2:123456789:cluster/my-cluster/abc123
# Or via Kafka tools
kafka-consumer-groups \
--bootstrap-server b-1.my-cluster.abc123.kafka.us-west-2.amazonaws.com:9092 \
--list \
--command-config msk.properties
Backup with Lambda Consumer Group
backup:
include_offset_headers: true
source_cluster_id: "msk-production"
# Track Lambda consumer position
consumer_groups:
- "amazon.lambda.12345678-1234-1234-1234-123456789012"
Restore Strategies
Strategy 1: Restore and Reset Lambda
msk-restore.yaml
mode: restore
backup_id: "msk-orders-20241201"
target:
bootstrap_servers:
- b-1.my-cluster.abc123.kafka.us-west-2.amazonaws.com:9092
security:
security_protocol: SASL_SSL
sasl_mechanism: AWS_MSK_IAM
storage:
backend: s3
bucket: kafka-backups
prefix: msk/orders
restore:
include_original_offset_header: true
After restore, reset Lambda event source:
# Delete and recreate event source mapping
aws lambda delete-event-source-mapping \
--uuid 12345678-1234-1234-1234-123456789012
aws lambda create-event-source-mapping \
--function-name order-processor \
--event-source-arn arn:aws:kafka:us-west-2:123456789:cluster/my-cluster/abc123 \
--topics orders \
--starting-position TRIM_HORIZON # Start from beginning
Strategy 2: PITR with Lambda
restore:
time_window_end: 1701450000000
# Lambda will need to start from TRIM_HORIZON
# to process restored messages
Strategy 3: Resume from Position
For resuming exactly where Lambda left off:
# Get current Lambda offset position
OFFSET=$(kafka-consumer-groups \
--bootstrap-server b-1.my-cluster.abc123.kafka.us-west-2.amazonaws.com:9092 \
--group amazon.lambda.12345678-1234-1234-1234-123456789012 \
--describe \
--command-config msk.properties | grep orders | awk '{print $4}')
# Note: Lambda event source mapping doesn't support custom offset
# Must use TRIM_HORIZON or LATEST
AWS Lambda Function for Backup
Run backups using Lambda:
// backup-lambda.js
const { execSync } = require('child_process');
const path = require('path');
exports.handler = async (event) => {
const backupId = `msk-backup-${Date.now()}`;
const config = {
mode: 'backup',
backup_id: backupId,
source: {
bootstrap_servers: [process.env.MSK_BOOTSTRAP_SERVERS],
security: {
security_protocol: 'SASL_SSL',
sasl_mechanism: 'AWS_MSK_IAM'
},
topics: {
include: event.topics || ['orders']
}
},
storage: {
backend: 's3',
bucket: process.env.BACKUP_BUCKET,
prefix: 'msk-backups'
},
backup: {
compression: 'zstd',
include_offset_headers: true
}
};
// Write config
const configPath = '/tmp/backup-config.yaml';
require('fs').writeFileSync(configPath, require('yaml').stringify(config));
// Execute backup
try {
execSync(`/opt/kafka-backup backup --config ${configPath}`, {
stdio: 'inherit'
});
return {
statusCode: 200,
body: JSON.stringify({ backup_id: backupId })
};
} catch (error) {
console.error('Backup failed:', error);
throw error;
}
};
Lambda Layer for kafka-backup
# Dockerfile for Lambda layer
FROM amazonlinux:2
RUN yum install -y curl tar gzip
# Download kafka-backup binary
RUN curl -L https://github.com/osodevops/kafka-backup/releases/latest/download/kafka-backup-linux-amd64.tar.gz | \
tar xz -C /opt
# Create layer structure
RUN mkdir -p /opt/layer && \
cp /opt/kafka-backup /opt/layer/
# Package layer
WORKDIR /opt/layer
RUN zip -r /opt/layer.zip .
EventBridge Scheduled Backups
CloudWatch Event Rule
{
"Name": "msk-backup-schedule",
"ScheduleExpression": "rate(1 hour)",
"State": "ENABLED",
"Targets": [
{
"Id": "backup-lambda",
"Arn": "arn:aws:lambda:us-west-2:123456789:function:msk-backup",
"Input": "{\"topics\": [\"orders\", \"order-events\"]}"
}
]
}
Terraform Configuration
resource "aws_cloudwatch_event_rule" "backup_schedule" {
name = "msk-backup-schedule"
description = "Trigger MSK backup every hour"
schedule_expression = "rate(1 hour)"
}
resource "aws_cloudwatch_event_target" "backup_lambda" {
rule = aws_cloudwatch_event_rule.backup_schedule.name
target_id = "backup-lambda"
arn = aws_lambda_function.backup.arn
input = jsonencode({
topics = ["orders", "order-events"]
})
}
resource "aws_lambda_permission" "allow_eventbridge" {
statement_id = "AllowExecutionFromEventBridge"
action = "lambda:InvokeFunction"
function_name = aws_lambda_function.backup.function_name
principal = "events.amazonaws.com"
source_arn = aws_cloudwatch_event_rule.backup_schedule.arn
}
DLQ Integration
Backup DLQ Messages
When Lambda fails, messages go to SQS DLQ. Backup both:
source:
topics:
include:
- orders
# Also backup SQS DLQ separately (different tool needed)
Restore Failed Messages
// Lambda to move DLQ messages back to Kafka
const AWS = require('aws-sdk');
const { Kafka } = require('kafkajs');
exports.handler = async (event) => {
const sqs = new AWS.SQS();
const kafka = new Kafka({
clientId: 'dlq-replayer',
brokers: [process.env.MSK_BOOTSTRAP_SERVERS],
ssl: true,
sasl: {
mechanism: 'aws',
authorizationIdentity: 'msk-admin',
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
}
});
const producer = kafka.producer();
await producer.connect();
// Receive messages from DLQ
const response = await sqs.receiveMessage({
QueueUrl: process.env.DLQ_URL,
MaxNumberOfMessages: 10
}).promise();
for (const message of response.Messages || []) {
const kafkaMessage = JSON.parse(message.Body);
// Resend to Kafka
await producer.send({
topic: 'orders',
messages: [{ key: kafkaMessage.key, value: kafkaMessage.value }]
});
// Delete from DLQ
await sqs.deleteMessage({
QueueUrl: process.env.DLQ_URL,
ReceiptHandle: message.ReceiptHandle
}).promise();
}
await producer.disconnect();
};
Cross-Region DR
Backup in Primary Region
backup-us-west-2.yaml
source:
bootstrap_servers:
- b-1.primary-cluster.kafka.us-west-2.amazonaws.com:9092
storage:
backend: s3
bucket: kafka-backups-us-west-2
region: us-west-2
prefix: primary
# Enable cross-region replication in S3
Restore in DR Region
restore-us-east-1.yaml
target:
bootstrap_servers:
- b-1.dr-cluster.kafka.us-east-1.amazonaws.com:9092
storage:
backend: s3
bucket: kafka-backups-us-east-1 # Replicated bucket
region: us-east-1
prefix: primary
Monitoring
CloudWatch Metrics
// Lambda backup function with metrics
const AWS = require('aws-sdk');
const cloudwatch = new AWS.CloudWatch();
async function reportMetrics(backupStats) {
await cloudwatch.putMetricData({
Namespace: 'KafkaBackup',
MetricData: [
{
MetricName: 'RecordsBackedUp',
Value: backupStats.records,
Unit: 'Count'
},
{
MetricName: 'BackupDurationSeconds',
Value: backupStats.duration,
Unit: 'Seconds'
},
{
MetricName: 'BackupSizeBytes',
Value: backupStats.bytes,
Unit: 'Bytes'
}
]
}).promise();
}
CloudWatch Alarms
{
"AlarmName": "KafkaBackupFailed",
"MetricName": "Errors",
"Namespace": "AWS/Lambda",
"Dimensions": [
{
"Name": "FunctionName",
"Value": "msk-backup"
}
],
"Statistic": "Sum",
"Period": 3600,
"EvaluationPeriods": 1,
"Threshold": 1,
"ComparisonOperator": "GreaterThanOrEqualToThreshold",
"AlarmActions": ["arn:aws:sns:us-west-2:123456789:alerts"]
}
Best Practices
- Use IAM authentication for MSK when possible
- Schedule backups via EventBridge
- Enable S3 cross-region replication for DR
- Monitor Lambda errors with CloudWatch
- Handle DLQ messages separately
- Test restore in DR region regularly
Next Steps
- Multi-Cluster DR - Complex DR scenarios
- AWS S3 Setup - S3 configuration
- Disaster Recovery - DR planning