fbpx

Amazon MSKクラスターとメッセージの送受信

はじめに

Amazon MSK(Amazon Managed Streaming for Apache Kafka)は、Apache Kafkaをベースにした完全マネージドのストリーミングサービスです。MSKでは、サーバーレスとプロビジョニングの2つのタイプのクラスターを提供しています。

サーバーレスタイプでは、CPU、メモリ、ストレージなどのキャパシティ管理が自動化されていますが、スループット、パーティション数に制限があります。一方、プロビジョニングタイプは、従来のApache Kafkaクラスターと同様の方式で運用し、特定のキャパシティ制限はありませんが、キャパシティ管理はユーザーが行う必要があります。

今回は、サーバーレスタイプのクラスターの構築とIAMロールを使用したアクセスコントロールの特徴について簡略に紹介します。

事前準備

VPCを用意してください。今回のMSK及び連載記事のためのVPC環境は、次のような構成になっています。

  • Region : Tokyo
  • VPC
    • VPC Name: eda-vpc
    • Range: 10.0.0.16
    • Subnet:
eda-subnet-public-northeast-1a10.0.10.0/24
eda-subnet-lambda-northeast-1a10.0.100.0/24
eda-subnet-lambda-northeast-1c10.0.101.0/24
eda-subnet-lambda-northeast-1d10.0.102.0/24
eda-subnet-msk-northeast-1a10.0.110.0/24
eda-subnet-msk-northeast-1c10.0.111.0/24
eda-subnet-msk-northeast-1d10.0.112.0/24
eda-subnet-db-northeast-1a10.0.200.0/24
eda-subnet-db-northeast-1c10.0.201.0/24
eda-subnet-db-northeast-1d10.0.202.0/24
  • SecurityGroup
    • eda-security-g-lambda
      • from cloud9(443)
      • from msk(443)
      • self(443)
    • eda-security-g-msk
      • from cloud9(9098)
      • from lamba(9098)
      • self(9098)
    • eda-security-g-db
      • from cloud9(3306)
      • from lamba(3306)
      • from msk(3306)
  • VPC Endpoint
    • Lambda(eda-subnet-lambda-northeast-1{a,b,c})
    • Glue(eda-subnet-lambda-northeast-1{a,b,c})
    • secretmanager(eda-subnet-lambda-northeast-1{a,b,c})
    • RDS(eda-subnet-db-northeast-1{a,b,c})
  • NAT Gateway
    • MSKのためにパブリックサブネットにパブリックタイプのNAT gatewayを作成し、MSKが属するRoute tablesに設定します。NAT gatewayを通してインターネットに出ていくわけではありません。プライベートサブネットに配置しているサービス間の通信経路を確保するために使います。
    • 同VPCのなかでもMSKがプライベートサブネットにあり、他のプライベートサブネットとの通信が必要である場合、NAT Gateway又はPrivatelinkが必要です。これは、MSKがVPC endpointと統合されていないためです。MSKがNAT gatewayを設定する方法は、Using Amazon MSK as an event source for AWS Lambda を参照してください。Privatelinkを利用する方式については、別の紙面を借りて紹介します。

今回のネットワーク設定は、今後掲載する一連の記事のために多少大目に設定しています。今回の記事に限って必要なのは、Amazon MSKと直接関わる設定のみとなります(Cloud9,MSK)。ご了承ください。

MSKクラスターの構築

次のようにMSKクラスターを構築します。

1.Amazon MSK→MSK Cluster→Clusters→Create Clusterをクリックしてください。

2.Creation Methodでは、Custom Createを選択してください。

3.Cluster Name:eda-msk-cluster-1と設定してください。

4.Cluster Typeは、Provisionedを選択してください。

Serverlessでは、キャパシティーの管理が自動ですが、スループットやパーティーション数の制限があります。Provisionedでは、ユーザがキャパシティー管理を行う必要がありますが、特にキャパシティーの制限は存在しません。どちらにすべきかは、要件や運用能力に合わせて選択すべきです。ここでは、Serverlessにしています。

料金は、次を参照してください。Serverlessタイプでも、1時間当たり、Large同等な固定料金が発生しますので注意してください。Provisionedタイプだと、Smallタイプのインスタンスも使えます。

https://aws.amazon.com/jp/msk/pricing/

5.Networkは、事前準備のVPCとプライベートサブネット、セキュリティグループを設定してください。

  • eda-vpc
  • eda-subnet-msk-northeast-1{a,c,d}
  • eda-security-g-msk

6.Serverlessのアクセスコントロールは、IAM Role baseの認証のみとなっています。

Provisionedの場合、複数のアクセスコントロール方式から選択できます。

7.Preview and Createで、設定内容を確認し、クラスター作成を開始してください。

8.しばらく待ってからクラスターを確認してください。

MSKクラスターへの接続

AWS CLIインストール

必要に応じてAWS CLIをインストールしてください。

https://docs.aws.amazon.com/ja_jp/cli/latest/userguide/getting-started-install.html

$curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install

.aws/credentialsの設定

最近は、SSOによる認証が一般的なのかも知れません。状況に応じてアクセスキーとトークンを設定してください。

[default]
aws_access_key_id=ASIARNIZSLAGL5TTK44F
aws_secret_access_key=Sb9l35kbNPcy0hhqRE7DFq+h9p/Snc18BXqRs72o
aws_session_token=<your token>
region=ap-northeast-1

Javaのインストール

必要に応じてJavaをインストールしてください。Java8も使えます。

$ sudo apt update
$ sudo apt install openjdk-11-jdk
$java -version
openjdk version "11.0.20.1" 2023-08-24  
$ javac -version
javac 11.0.20.1

kafkaインストール

Kafkaをインストールしてください。Kafkaは、クライアントツールを使うために必要です。

$ wget https://archive.apache.org/dist/kafka/3.5.1/kafka_2.13-3.5.1.tgz  
$ tar -xvf kafka_2.13-3.5.1.tgz
$ cd kafka_2.13-3.5.1

環境変数を設定してください。

export KAFKA_CLIENT_HOME="~/environment/kafka_2.13-3.5.1"
export PATH="$PATH:$KAFKA_CLIENT_HOME:$KAFKA_CLIENT_HOME/bin:$KAFKA_CLIENT_HOME/lib"

MSK Clusterのエンドポイントも環境変数にしておきます。

export bootstrapBrokerString="boot-xxxxxxxx.xx.kafka-serverless.ap-northeast-1.amazonaws.com:9098"

aws-msk-iam-auth

IAMロールベースのMSKクラスターに接続するためには、aws-msk-iam-authが必要です。

$ wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.1/aws-msk-iam-auth-1.1.1-all.jar  
$ cp aws-msk-iam-auth-1.1.1-all.jar ./kafka_2.13-3.5.1/libs/

CLASSPATHを設定してください。

export CLASSPATH="~/environment/kafka_2.13-3.5.1/libs/aws-msk-iam-auth-1.1.1-all.jar"

これらは、繰り返して使いますので、.bashrc又は.bash_profileに登録しておいてください。

client.propertie作成

client.propertiesファイルを作成してください。これは、Kafkaクライアント実行時にIAM Roleベースの認証のオプションとして必要です。ここでは、ホームディレクトリに作成しています。

cat << EOF > client.properties
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required; sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
EOF

Topic作成

KafkaのTopicを作成してください。

$ kafka-topics.sh --create --bootstrap-server $bootstrapBrokerString \  
--replication-factor 3 --partitions 2
--topic testTopic \  
--command-config ./client.properties
Created topic testTopic.  

Topicのリストを表示してみてください。

$ kafka-topics.sh --list --bootstrap-server $bootstrapBrokerString \  
--command-config ./client.properties testTopic  

Topicの設定内容を確認してみてください。

$ kafka-topics.sh --describe --bootstrap-server $bootstrapBrokerString \  
--topic testTopic \  
--command-config ./client.properties  
Topic: testTopic        TopicId: JNtExshPRK69mJRulhsciw PartitionCount: 2       ReplicationFactor: 3    Configs: min.insync.replicas=2,segment.bytes=134217728,retention.ms=604800000,message.format.version=2.8-IV2,unclean.leader.election.enable=false,retention.bytes=268435456000        
Topic: testTopic        Partition: 0    Leader: 377     Replicas: 377,393,361   Isr: 377,393,361        
Topic: testTopic        Partition: 1    Leader: 393     Replicas: 393,369,385   Isr: 393,369,385  

--command-config ./client.propertiesのように認証オプションを指定して実行することに注意してください。省略すると、MSKに接続できません。

$ kafka-topics.sh --describe --bootstrap-server $bootstrapBrokerString \  
--topic testTopic
Error while executing topic command : The AdminClient thread has exited. Call: listTopics [2024-01-02 08:25:32,094] ERROR org.apache.kafka.common.errors.TimeoutException: The AdminClient thread has exited. Call: listTopics  (kafka.admin.TopicCommand$) [2024-01-02 08:25:32,103]
ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread) java.lang.OutOfMemoryError: Java heap space        
at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:61)        
at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:348)        
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)        
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:102)        
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:452)        
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:402)        
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:674)        
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:576)        
at org.apache.kafka.common.network.Selector.poll(Selector.java:481)        
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:571)        
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1413)         at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1344)         at java.base/java.lang.Thread.run(Thread.java:829)

メッセージの送受信

ここでは、CLIで簡単なメッセージの送受信を体験してみましょう。

MSK(Kafka)からメッセージを受信 - Consumer      

ターミナルを開いて、Consumerを実行してみてください。次のようにメッセージの受信を待っている状態になります。

$ kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \  
--topic testTopic  --from-beginning \  
--consumer.config ./client.properties  

MSK(Kafka)にメッセージ送信 - Producer

別のターミナルを開いて、任意のメッセージ送信してみてください。

$ kafka-console-producer.sh --broker-list $bootstrapBrokerString \  
--topic testTopic \  
--producer.config ./client.properties
message 1
message 2  

Consumerを確認してみてください。

次のようにメッセージを受け取っているはずです。

$ kafka-console-consumer.sh --bootstrap-server $bootstrapBrokerString \  
--topic testTopic  --from-beginning \  
--consumer.config ./client.properties  
message 1
message 2  

まとめ

今回の記事では、Amazon MSKを使用して、Serverlessクラスターのセットアップ手順と、IAM Roleベースのアクセスコントロールの概要を紹介しました。MSKは、Apache Kafkaベースのストリーミングサービスで、サーバーレスタイプではキャパシティ管理が自動化されていますが、制限がある一方、プロビジョニングタイプはキャパシティ制限がなく、ユーザーが管理を行う必要があります。

Author

モダンアーキテクチャー基盤のソリューションアーキテクトとして活動しています。

[著書]
・Amazon Cloudテクニカルガイド―EC2/S3からVPCまで徹底解析
・Amazon Elastic MapReduceテクニカルガイド ―クラウド型Hadoopで実現する大規模分散処理
・Cypherクエリー言語の事例で学ぶグラフデータベースNeo4j
・Neo4jを使うグラフ型データベース入門(共著)
・RDB技術者のためのNoSQLガイド(共著)

leeの記事一覧

新規CTA