Amazon MSKクラスターとメッセージの送受信
はじめに
Amazon MSK(Amazon Managed Streaming for Apache Kafka)は、Apache Kafkaをベースにした完全マネージドのストリーミングサービスです。MSKでは、サーバーレスとプロビジョニングの2つのタイプのクラスターを提供しています。
サーバーレスタイプでは、CPU、メモリ、ストレージなどのキャパシティ管理が自動化されていますが、スループット、パーティション数に制限があります。一方、プロビジョニングタイプは、従来のApache Kafkaクラスターと同様の方式で運用し、特定のキャパシティ制限はありませんが、キャパシティ管理はユーザーが行う必要があります。
今回は、サーバーレスタイプのクラスターの構築とIAMロールを使用したアクセスコントロールの特徴について簡略に紹介します。
事前準備
VPCを用意してください。今回のMSK及び連載記事のためのVPC環境は、次のような構成になっています。
- Region : Tokyo
- VPC
- VPC Name: eda-vpc
- Range: 10.0.0.16
- Subnet:
eda-subnet-public-northeast-1a | 10.0.10.0/24 |
eda-subnet-lambda-northeast-1a | 10.0.100.0/24 |
eda-subnet-lambda-northeast-1c | 10.0.101.0/24 |
eda-subnet-lambda-northeast-1d | 10.0.102.0/24 |
eda-subnet-msk-northeast-1a | 10.0.110.0/24 |
eda-subnet-msk-northeast-1c | 10.0.111.0/24 |
eda-subnet-msk-northeast-1d | 10.0.112.0/24 |
eda-subnet-db-northeast-1a | 10.0.200.0/24 |
eda-subnet-db-northeast-1c | 10.0.201.0/24 |
eda-subnet-db-northeast-1d | 10.0.202.0/24 |
- SecurityGroup
- eda-security-g-lambda
- from cloud9(443)
- from msk(443)
- self(443)
- eda-security-g-msk
- from cloud9(9098)
- from lamba(9098)
- self(9098)
- eda-security-g-db
- from cloud9(3306)
- from lamba(3306)
- from msk(3306)
- eda-security-g-lambda
- VPC Endpoint
- Lambda(eda-subnet-lambda-northeast-1{a,b,c})
- Glue(eda-subnet-lambda-northeast-1{a,b,c})
- secretmanager(eda-subnet-lambda-northeast-1{a,b,c})
- RDS(eda-subnet-db-northeast-1{a,b,c})
- NAT Gateway
- MSKのためにパブリックサブネットにパブリックタイプのNAT gatewayを作成し、MSKが属するRoute tablesに設定します。NAT gatewayを通してインターネットに出ていくわけではありません。プライベートサブネットに配置しているサービス間の通信経路を確保するために使います。
- 同VPCのなかでもMSKがプライベートサブネットにあり、他のプライベートサブネットとの通信が必要である場合、NAT Gateway又はPrivatelinkが必要です。これは、MSKがVPC endpointと統合されていないためです。MSKがNAT gatewayを設定する方法は、Using Amazon MSK as an event source for AWS Lambda を参照してください。Privatelinkを利用する方式については、別の紙面を借りて紹介します。
今回のネットワーク設定は、今後掲載する一連の記事のために多少大目に設定しています。今回の記事に限って必要なのは、Amazon MSKと直接関わる設定のみとなります(Cloud9,MSK)。ご了承ください。
MSKクラスターの構築
次のようにMSKクラスターを構築します。
1.Amazon MSK→MSK Cluster→Clusters→Create Clusterをクリックしてください。
2.Creation Methodでは、Custom Createを選択してください。
3.Cluster Name:eda-msk-cluster-1と設定してください。
4.Cluster Typeは、Provisionedを選択してください。
Serverlessでは、キャパシティーの管理が自動ですが、スループットやパーティーション数の制限があります。Provisionedでは、ユーザがキャパシティー管理を行う必要がありますが、特にキャパシティーの制限は存在しません。どちらにすべきかは、要件や運用能力に合わせて選択すべきです。ここでは、Serverlessにしています。
料金は、次を参照してください。Serverlessタイプでも、1時間当たり、Large同等な固定料金が発生しますので注意してください。Provisionedタイプだと、Smallタイプのインスタンスも使えます。
https://aws.amazon.com/jp/msk/pricing/
5.Networkは、事前準備のVPCとプライベートサブネット、セキュリティグループを設定してください。
- eda-vpc
- eda-subnet-msk-northeast-1{a,c,d}
- eda-security-g-msk
6.Serverlessのアクセスコントロールは、IAM Role baseの認証のみとなっています。
Provisionedの場合、複数のアクセスコントロール方式から選択できます。
7.Preview and Createで、設定内容を確認し、クラスター作成を開始してください。
8.しばらく待ってからクラスターを確認してください。
MSKクラスターへの接続
AWS CLIインストール
必要に応じてAWS CLIをインストールしてください。
https://docs.aws.amazon.com/ja_jp/cli/latest/userguide/getting-started-install.html
$curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" $ unzip awscliv2.zip $ sudo ./aws/install |
.aws/credentialsの設定
最近は、SSOによる認証が一般的なのかも知れません。状況に応じてアクセスキーとトークンを設定してください。
[default] aws_access_key_id=ASIARNIZSLAGL5TTK44F aws_secret_access_key=Sb9l35kbNPcy0hhqRE7DFq+h9p/Snc18BXqRs72o aws_session_token=<your token> region=ap-northeast-1 |
Javaのインストール
必要に応じてJavaをインストールしてください。Java8も使えます。
$ sudo apt update $ sudo apt install openjdk-11-jdk $java -version openjdk version "11.0.20.1" 2023-08-24 $ javac -version javac 11.0.20.1 |
kafkaインストール
Kafkaをインストールしてください。Kafkaは、クライアントツールを使うために必要です。
$ wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz $ tar -xvf kafka_2.13-3.5.1.tgz $ cd kafka_2.13-3.5.1 |
環境変数を設定してください。
export KAFKA_CLIENT_HOME="~/environment/kafka_2.13-3.5.1" export PATH="$PATH:$KAFKA_CLIENT_HOME:$KAFKA_CLIENT_HOME/bin:$KAFKA_CLIENT_HOME/lib" |
MSK Clusterのエンドポイントも環境変数にしておきます。
export bootstrapBrokerString="boot-xxxxxxxx.xx.kafka-serverless.ap-northeast-1.amazonaws.com:9098" |
aws-msk-iam-auth
IAMロールベースのMSKクラスターに接続するためには、aws-msk-iam-authが必要です。
$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar $ cp aws-msk-iam-auth-1.1.1-all.jar ./kafka_2.13-3.5.1/libs/ |
CLASSPATHを設定してください。
export CLASSPATH="~/environment/kafka_2.13-3.5.1/libs/aws-msk-iam-auth-1.1.1-all.jar" |
これらは、繰り返して使いますので、.bashrc又は.bash_profileに登録しておいてください。
client.propertie作成
client.propertiesファイルを作成してください。これは、Kafkaクライアント実行時にIAM Roleベースの認証のオプションとして必要です。ここでは、ホームディレクトリに作成しています。
cat << EOF > client.properties security.protocol=SASL_SSL sasl.mechanism=AWS_MSK_IAM sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler EOF |
Topic作成
KafkaのTopicを作成してください。
$ kafka-topics.sh --create --bootstrap-server $bootstrapBrokerString \ --replication-factor 3 --partitions 2 --topic testTopic \ --command-config ./client.properties Created topic testTopic. |
Topicのリストを表示してみてください。
$ kafka-topics.sh --list --bootstrap-server $bootstrapBrokerString \ --command-config ./client.properties testTopic |
Topicの設定内容を確認してみてください。
$ kafka-topics.sh --describe --bootstrap-server $bootstrapBrokerString \ --topic testTopic \ --command-config ./client.properties Topic: testTopic TopicId: JNtExshPRK69mJRulhsciw PartitionCount: 2 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=134217728,retention.ms=604800000,message.format.version=2.8-IV2,unclean.leader.election.enable=false,retention.bytes=268435456000 Topic: testTopic Partition: 0 Leader: 377 Replicas: 377,393,361 Isr: 377,393,361 Topic: testTopic Partition: 1 Leader: 393 Replicas: 393,369,385 Isr: 393,369,385 |
--command-config ./client.propertiesのように認証オプションを指定して実行することに注意してください。省略すると、MSKに接続できません。
$ kafka-topics.sh --describe --bootstrap-server $bootstrapBrokerString \ --topic testTopic Error while executing topic command : The AdminClient thread has exited. Call: listTopics [2024-01-02 08:25:32,094] ERROR org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics (kafka.admin.TopicCommand$) [2024-01-02 08:25:32,103] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61) at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348) at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576) at org.apache.kafka.common.network.Selector.poll(Selector.java:481) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:571) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413) at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344) at java.base/java.lang.Thread.run(Thread.java:829) |
メッセージの送受信
ここでは、CLIで簡単なメッセージの送受信を体験してみましょう。
MSK(Kafka)からメッセージを受信 - Consumer
ターミナルを開いて、Consumerを実行してみてください。次のようにメッセージの受信を待っている状態になります。
$ kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \ --topic testTopic --from-beginning \ --consumer.config ./client.properties |
MSK(Kafka)にメッセージ送信 - Producer
別のターミナルを開いて、任意のメッセージ送信してみてください。
$ kafka-console-producer.sh --broker-list $bootstrapBrokerString \ --topic testTopic \ --producer.config ./client.properties message 1 message 2 |
Consumerを確認してみてください。
次のようにメッセージを受け取っているはずです。
$ kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \ --topic testTopic --from-beginning \ --consumer.config ./client.properties message 1 message 2 |
まとめ
今回の記事では、Amazon MSKを使用して、Serverlessクラスターのセットアップ手順と、IAM Roleベースのアクセスコントロールの概要を紹介しました。MSKは、Apache Kafkaベースのストリーミングサービスで、サーバーレスタイプではキャパシティ管理が自動化されていますが、制限がある一方、プロビジョニングタイプはキャパシティ制限がなく、ユーザーが管理を行う必要があります。