fbpx

Amazon MSKを使用したデータ送受信プログラムの開発

はじめに

今回の記事では、Amazon MSKクラスターを使用するデータ送受信のプログラムの開発について紹介します。MSKのTopicにデータを送信するプログラムをプロデューサー(Producer)、データを受信するプログラムをコンシューマー(Consumer)と呼びます。この記事では、特にデータ送受信のセキュリティとして、IAM Roleベースの認証方式に焦点を当てて説明します。IAM Roleベースの認証を使用することで、セキュリティを強化し、信頼性のあるデータの送受信を実現できます。

MSKクラスターやTopicに関する情報は、姉妹記事:サーバレスのAmazon MSKクラスターをご参照ください。

事前準備

サンプルコードを実行するには、Python 3.8 - 3.11 のバージョンが必要です。想定のバージョンのPythonをインストールし、サンプルコードを実行する準備をしてください。

Kafkaクライアントプログラムを開発するためのパッケージは、次の通りです。

$ pip install kafka-python
$ pip install confluent_kafka

なお、IAM Roleベースのクラスターに接続するためには、aws-msk-iam-sasl-signer-python.jarが必要です。

次のURLを参照してインストールしてください。

$ pip install aws-msk-iam-sasl-signer-python

参照:https://github.com/aws/aws-msk-iam-sasl-signer-python/blob/main/docs/installation.rst

また、.aws/credentialsを適切に更新しておいてください。プログラムを実行した時に参照されます。

プロデューサーの作成

Pythonプログラムを作成してください。このプログラムでは、JSON形式の注文データ(注文番号、時刻、タイトル、価格)を生成し、IAM Roleベースの認証を使用してAmazon MSKクラスターに接続し、testTopicに送信しています。

producer.py

 # producer.py
import uuid
from datetime import datetime
import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
import socket
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# MSKトークンプロバイダークラス
class MSKTokenProvider():
    def token(self): 
        token, _ = MSKAuthTokenProvider.generate_auth_token('ap-northeast-1') 
        return token

# データを生成する関数
def create_message():
    return {
        "orderId": str(uuid.uuid4()),
        "time": datetime.now().isoformat(),
        "bookName": "Sample Book",
        "price": 20.99
    }

# MSKトークンプロバイダーインスタンス
tp = MSKTokenProvider()

# Kafkaプロデューサーの設定
producer = KafkaProducer(
    bootstrap_servers='b-3.edamskcluster1.xxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.edamskcluster1.xxxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-1.edamskcluster1.xxxxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098',
    security_protocol='SASL_SSL',
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id=socket.gethostname(),
    alue_serializer=lambda m: json.dumps(m).encode('utf-8') # メッセージをJSON形式にシリアライズ
    )

#トピック名の設定
topic = "testTopic" 

# メッセージ送信ループ
try:
    message = create_message()
    producer.send(topic, message)
    producer.flush()
    print(f"Produced: {message}")
except KafkaError as e:
    print("Failed to send message:", e)

# プロデューサーのクローズ
producer.close()

では、プロデューサーを実行してみましょう。データの受信が確認できるようにCLIのコンスーマーを実行しておいてください。

kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \  
--topic testTopic  --from-beginning \  
--consumer.config ./client.properties

producer.pyを実行してください。

$ python producer.py
Produced: {'orderId': '60b88fb1-2f8c-4d9a-9a05-158a55a4d18e', 'time': '2024-01-03T07:42:17.764526', 'bookName': 'Book L', 'price': 21.42}

コンシューマーからデータの受信が確認できるはずです。

$ kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \  
--topic testTopic  --from-beginning \  
--consumer.config ./client.properties
Produced: {'orderId': '621064bb-56ef-472d-ba29-4ffa06e65c37', 'time': '2024-01-02T10:35:42.133741', 'bookName': 'Sample Book', 'price': 20.99}

コンシューマーの作成

次は、コンシューマーを作成してください。このプログラムは、Amazon MSKクラスターにIAM認証を経て接続し、継続的にデータを受け取って標準出力しています。

consumer.py

# consumer.py
from confluent_kafka import Consumer
import socket
import time
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

def oauth_cb(oauth_config):
    auth_token, expiry_ms = MSKAuthTokenProvider.generate_auth_token("ap-northeast-1")
    # Note that this library expects oauth_cb to return expiry time in seconds since epoch, while the token generator returns expiry in ms
    return auth_token, expiry_ms/1000

c = Consumer({
    'bootstrap.servers': "b-3.edamskcluster.xxxxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098,b-2.edamskcluster.xxxxxx.ap-northeast-1.amazonaws.com:9098,b-1.testmskcluster2.xxxx.c4.kafka.ap-northeast-1.amazonaws.com:9098",
    'client.id': socket.gethostname(),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'OAUTHBEARER',
    'oauth_cb': oauth_cb,
    'group.id': 'testTopic-g1',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['testTopic'])

print("Starting consumer!")

while True:
    msg = c.poll(500) # ms

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue
    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

参照:https://github.com/aws/aws-msk-iam-sasl-signer-python#get-started

'group.id': 'testTopic-g1' は、トピックからデータを受け取るコンシューマーの論理的なグループを指します。Consumer Groupは、データを処理する一連のコンシューマをまとめて管理し、データの並行処理とスケーラビリティを実現するために使用します。同じTopicからデータを取得する場合、異なるConsumer Groupを使用すると、それぞれ、同じデータセットを受け取ることになります。

メッセージの送受信を確認

では、コンシューマーを実行してみましょう。

$ python consumer.py
Starting consumer!

この状態からプロデューサーからデータを送信してください。

$ python producer.py
Produced: {'orderId': 'eeb7169a-e737-4868-81b8-b60be5c4d375', 'time': '2024-01-03T08:04:22.872183', 'bookName': 'Book F', 'price': 19.8}

コンシューマーがメッセージを受信し、表示するはずです。

$ python consumer.py
Starting consumer!
{'orderId': 'eeb7169a-e737-4868-81b8-b60be5c4d375', 'time': '2024-01-03T08:04:22.872183', 'bookName': 'Book F', 'price': 19.8}

まとめ

PythonでAmazon MSKクラスターを使用するプロデューサーとコンシューマー(データ送受信のプログラム)の開発について紹介しました。Amazon MSKを利用すると、このように簡単なコーディングで、ネットワークを介してデータを安全かつ効率的に送受信することができます。この特性からマイクロサービスやイベント駆動型アーキテクチャ(EDA)のプラットフォームとして広く活用されています。

Author

モダンアーキテクチャー基盤のソリューションアーキテクトとして活動しています。

[著書]
・Amazon Cloudテクニカルガイド―EC2/S3からVPCまで徹底解析
・Amazon Elastic MapReduceテクニカルガイド ―クラウド型Hadoopで実現する大規模分散処理
・Cypherクエリー言語の事例で学ぶグラフデータベースNeo4j
・Neo4jを使うグラフ型データベース入門(共著)
・RDB技術者のためのNoSQLガイド(共著)

leeの記事一覧

新規CTA