LambdaのイベンドソースとしてAmazon MSKを使用する
はじめに
今回の記事では、Lambdaのイベンドソース(Trigger)としてAmazon MSKを使用する方法を紹介します。これは、言い換えれば、LambdaをAmazon MSKのConsumerとして使用する方法です。イベントソースとしてMSKを利用すると、一般的なConsumerの実装でみられるような煩雑なコーディングが不要です。MSKからのデータは、Lambdaのイベントデータとして受け取るからです。
事前準備
Lambdaのためのロールを用意してください。
[MskLambdaRole]
{
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Action: [
kafka-cluster:Connect,
kafka-cluster:AlterCluster,
kafka-cluster:DescribeCluster,
kafka-cluster:*Topic*,
kafka-cluster:WriteData,
kafka-cluster:ReadData,
kafka-cluster:AlterGroup,
kafka-cluster:DescribeGroup,
kafka-cluster:DescribeClusterDynamicConfiguration,
kafka:DescribeClusterV2,
kafka:GetBootstrapBrokers,
ec2:CreateNetworkInterface,
ec2:DescribeNetworkInterfaces,
ec2:DescribeVpcs,
ec2:DeleteNetworkInterface,
ec2:DescribeSubnets,
ec2:DescribeSecurityGroups,
logs:CreateLogGroup,
logs:CreateLogStream,
logs:PutLogEvents,
secretsmanager:GetSecretValue,
secretsmanager:DescribeSecret,
glue:GetSchemaVersion,
glue:GetSchemaVersionsDiff,
glue:GetSchema,
glue:GetRegistry
],
Resource: [ * ]
}
]
}
[Trusted Policy]
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
ネットワーク環境やMSKについては、姉妹記事:サーバレスのAmazon MSKクラスターをご参照ください。
Lambda作成
Lambdaを作成してください。
1.タイプの選択:Auther from scrach
2.Function Name: EdaMskTestLambda
3.Runtime: Python3.10
4.Architecture: x86_64
5.Execution role: User an existing role
6.Existing role: MskLambdaRole
7.Advanced setting :Enable VPC
8.VPC: eda-vpc
9.Subnets: eda-subnet-private-northeast-{1a,1c,1d}
10.Security groups: eda-security-g-private
11.Create Functionをクリック
Lambdaを作成したら、lambda_function.pyのソースコードを編集してください。初期状態では、次のようになっているはずです。
import json
def lambda_handler(event, context):
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
次のようにイベントソースから受け取ったメッセージのバリューを表示してみます。
import json
def lambda_handler(event, context):
# 全イベントデータを表示
print("Received event:", json.dumps(event, indent=2))
# 各レコードを処理
for record in event['records']:
# キーと値をデコード
key = record['key']
value = record['value']
# レコードの詳細を表示
print(f"Key: {key}, Value: {value}")
# 処理結果を返す
return {
'statusCode': 200,
'body': json.dumps('Event processed successfully!')
}
イベントソース設定(Add Trigger)
+Add triggerをクリックし、イベントソースを設定してください。
1.Select a source: MSK
2.MSK cluster: test-msk-cluster-1
3.Authentication: -
4.Secret Manager key: -
5.Active trigger: checked
6.Batch size: 100
7.String postion: Trim horizon(earlest相当)
8.Batch windows: -
9.Topic Name: testTopic
10.Consumer group ID(Option):testTopic-g1
11.Addボターンをクリック
イベントデータの受信を確認
姉妹記事:Amazon MSKを使用したデータ送受信プログラムの開発を参照し、メッセージを送信してみてください。
$ python procedure.py
Produced: {'orderId': 04b15cb6-afb0-4288-a197-7244c75884f7', 'time': '2024-01-03T10:50:06.282572', 'bookName': 'Book F', 'price': 48.72}
結果は、Cloud WatchのLog Groupから確認できます。
/aws/lambda/EdaMskTestLambdログを確認してください。次のようなメッセージが表示されているはずです。
Partition: testTopic-1, Offset: 2948, Value: {
"orderId": "04b15cb6-afb0-4288-a197-7244c75884f7",
"time": "2024-01-03T10:50:06.282572",
"bookName": "Book F",
"price": 48.72
まとめ
このようにLambdaでMSKをイベントソースとして使用する場合、一般的なConsumerの開発に比べ、コーディングがとても簡単です。開発者は、特にConsumerであることを意識する必要がありません。また、サーバーレスプラットフォームで運用できるメリットも非常に大きいです。Consumerを開発し、EC2やコンテナーで運用する場合と比較してみてください。