Amazon MSKを使用したデータ送受信プログラムの開発
はじめに
今回の記事では、Amazon MSKクラスターを使用するデータ送受信のプログラムの開発について紹介します。MSKのTopicにデータを送信するプログラムをプロデューサー(Producer)、データを受信するプログラムをコンシューマー(Consumer)と呼びます。この記事では、特にデータ送受信のセキュリティとして、IAM Roleベースの認証方式に焦点を当てて説明します。IAM Roleベースの認証を使用することで、セキュリティを強化し、信頼性のあるデータの送受信を実現できます。
MSKクラスターやTopicに関する情報は、姉妹記事:サーバレスのAmazon MSKクラスターをご参照ください。
事前準備
サンプルコードを実行するには、Python 3.8 - 3.11 のバージョンが必要です。想定のバージョンのPythonをインストールし、サンプルコードを実行する準備をしてください。
Kafkaクライアントプログラムを開発するためのパッケージは、次の通りです。
$ pip install kafka-python $ pip install confluent_kafka |
なお、IAM Roleベースのクラスターに接続するためには、aws-msk-iam-sasl-signer-python.jarが必要です。
次のURLを参照してインストールしてください。
$ pip install aws-msk-iam-sasl-signer-python |
参照:https://github.com/aws/aws-msk-iam-sasl-signer-python/blob/main/docs/installation.rst
また、.aws/credentialsを適切に更新しておいてください。プログラムを実行した時に参照されます。
プロデューサーの作成
Pythonプログラムを作成してください。このプログラムでは、JSON形式の注文データ(注文番号、時刻、タイトル、価格)を生成し、IAM Roleベースの認証を使用してAmazon MSKクラスターに接続し、testTopicに送信しています。
producer.py
# producer.py
import uuid
from datetime import datetime
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# MSKトークンプロバイダークラス
class MSKTokenProvider():
def token(self):
token, _ = MSKAuthTokenProvider.generate_auth_token('ap-northeast-1')
return token
# データを生成する関数
def create_message():
return {
"orderId": str(uuid.uuid4()),
"time": datetime.now().isoformat(),
"bookName": "Sample Book",
"price": 20.99
}
# MSKトークンプロバイダーインスタンス
tp = MSKTokenProvider()
# Kafkaプロデューサーの設定
producer = KafkaProducer(
bootstrap_servers='b-3.edamskcluster1.xxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.edamskcluster1.xxxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.edamskcluster1.xxxxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098',
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp,
client_id=socket.gethostname(),
alue_serializer=lambda m: json.dumps(m).encode('utf-8') # メッセージをJSON形式にシリアライズ
)
#トピック名の設定
topic = "testTopic"
# メッセージ送信ループ
try:
message = create_message()
producer.send(topic, message)
producer.flush()
print(f"Produced: {message}")
except KafkaError as e:
print("Failed to send message:", e)
# プロデューサーのクローズ
producer.close()
では、プロデューサーを実行してみましょう。データの受信が確認できるようにCLIのコンスーマーを実行しておいてください。
kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \ --topic testTopic --from-beginning \ --consumer.config ./client.properties |
producer.pyを実行してください。
$ python producer.py Produced: {'orderId': '60b88fb1-2f8c-4d9a-9a05-158a55a4d18e', 'time': '2024-01-03T07:42:17.764526', 'bookName': 'Book L', 'price': 21.42} |
コンシューマーからデータの受信が確認できるはずです。
$ kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \ --topic testTopic --from-beginning \ --consumer.config ./client.properties Produced: {'orderId': '621064bb-56ef-472d-ba29-4ffa06e65c37', 'time': '2024-01-02T10:35:42.133741', 'bookName': 'Sample Book', 'price': 20.99} |
コンシューマーの作成
次は、コンシューマーを作成してください。このプログラムは、Amazon MSKクラスターにIAM認証を経て接続し、継続的にデータを受け取って標準出力しています。
consumer.py
# consumer.py
from confluent_kafka import Consumer
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
def oauth_cb(oauth_config):
auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("ap-northeast-1")
# Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms
return auth_token, expiry_ms/1000
c = Consumer({
'bootstrap.servers': "b-3.edamskcluster.xxxxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.edamskcluster.xxxxxx.ap-northeast-1.amazonaws.com:9098,b-1.testmskcluster2.xxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098",
'client.id': socket.gethostname(),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
'oauth_cb': oauth_cb,
'group.id': 'testTopic-g1',
'auto.offset.reset': 'earliest'
})
c.subscribe(['testTopic'])
print("Starting consumer!")
while True:
msg = c.poll(500) # ms
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
参照:https://github.com/aws/aws-msk-iam-sasl-signer-python#get-started
'group.id': 'testTopic-g1' は、トピックからデータを受け取るコンシューマーの論理的なグループを指します。Consumer Groupは、データを処理する一連のコンシューマをまとめて管理し、データの並行処理とスケーラビリティを実現するために使用します。同じTopicからデータを取得する場合、異なるConsumer Groupを使用すると、それぞれ、同じデータセットを受け取ることになります。
メッセージの送受信を確認
では、コンシューマーを実行してみましょう。
$ python consumer.py Starting consumer! |
この状態からプロデューサーからデータを送信してください。
$ python producer.py Produced: {'orderId': 'eeb7169a-e737-4868-81b8-b60be5c4d375', 'time': '2024-01-03T08:04:22.872183', 'bookName': 'Book F', 'price': 19.8} |
コンシューマーがメッセージを受信し、表示するはずです。
$ python consumer.py Starting consumer! {'orderId': 'eeb7169a-e737-4868-81b8-b60be5c4d375', 'time': '2024-01-03T08:04:22.872183', 'bookName': 'Book F', 'price': 19.8} |
まとめ
PythonでAmazon MSKクラスターを使用するプロデューサーとコンシューマー(データ送受信のプログラム)の開発について紹介しました。Amazon MSKを利用すると、このように簡単なコーディングで、ネットワークを介してデータを安全かつ効率的に送受信することができます。この特性からマイクロサービスやイベント駆動型アーキテクチャ(EDA)のプラットフォームとして広く活用されています。