Kafka接続の設定
API Connection Manager で、Apache Kafkaの接続を設定できます。
これにより、1つまたは複数のパブリッシャーが送信し、1つまたは複数のコンシューマーが読み込むメッセージを格納するKafkaトピックに接続できます。Kafkaの詳細情報については、 Apache Kafkaのドキュメントを参照してください。
接続の設定
Kafka接続を設定するには、以下の手順に従ってください。
-
新しい接続を追加します。
-
Edit(編集)セクションに移動してください。
-
接続の名前を指定します。
-
Type ドロップダウンメニューから、 Kafka を選択します。
-
接続したい Topic の名前を入力してください。
-
Host を指定します。これはKafkaサーバホストの名前またはIPアドレスです。
-
リッスンする Port を入力してください。
-
コンシューマグループの GroupId を指定してください。これはこのトピックに登録したコンシューマのグループ名です。
-
レコード(メッセージ)を保存するパーティションの番号を入力してください。
Kafkaトピックは複数のパーティションに分割され、大量のデータを1台または複数のサーバーに分散させることができます。各パーティションには、 0 や 5 などの番号があります。
-
オプションで、特定のKafka接続の設定を含む ClientPropertiesFile へのパスを指定します。使用可能なプロパティについては、 Kafkaのドキュメントを参照してください。
-
コミットせずにパーティションから記録を取得するために、ピーキングを有効にしてください。
これにより、パーティションの位置に関係なく、また、オフセットに影響を与えることなく、任意のレコードを読むことができます。レコードの位置を定義するオフセットは、どのレコードがすでに消費されているか、 API Engine が次にどの未読レコードをプルする必要があるかを示します。
-
次のオプションのいずれかを使用して、メッセージの KeyType を指定してください。
-
Ignore を選択して、メッセージを null に設定し、キーではなく、値のみをシリアライズします。
-
None を選択し、値とキーの両方をシリアライズから除外します。
-
整数メッセージ。
-
文字列メッセージ。
-
-
Treat warning as error を選択して、実行中に発生する警告をエラーとして表示します。
-
接続に認証が必要な場合、ユーザー名とパスワードで認証するか、SSLを介して認証するかの2つのオプションがあります。SSLを選択した場合は、代わりに以下のフィールドを入力してください。
-
Certificate ファイルを .pem 形式で指定します。
-
Passphrase を入力します。これは秘密鍵のパスフレーズです。
-
Private Keyに、 .pem 形式で秘密鍵へのパスを指定します。
-
CA Certificate を .pem 形式で指定します。これはKafkaブローカーの証明書を検証するためのルートCA証明書です。
-
-
オプションとして、 XMLとウェブサービスのセキュリティを設定してください。
-
KafkaClientと Kafkaブローカー間の安全で認証された接続を確立するには、Clientプロパティ ファイルシークレットを定義します。これらのシークレットは、機密情報を保存します。
-
オプションとして、KafkaメッセージにAvroスキーマベースのシリアライズを追加して、レコードの値のデータスキーマを定義します。このスキーマには、値のフィールドとそのデータ型が記述されています。Avroスキーマの詳細情報については、 Apache Avroのドキュメントを参照してください。
APIエンジン3.0 は、Avroスキーマの配列タイプに対するデフォルト値をサポートしていません。詳細情報については、この例を参照してください。
Schema Type ドロップダウンメニューから Avro を選択し、JSON形式のデータ構造を定義した Schema Registry Url と Schema を追加します。
スキーマレジストリがSSLを必要とする場合は、以下のフィールドにご記入ください。
-
スキーマレジストリのCA を指定してください。これは CA証明書へのディレクトリパスです。
-
スキーマキーストアレジストリにおいて、認証に使用したキーストアのパスを指定してください。
-
スキーマキーストアレジストリパスワードに、キーストアのパスワードを入力してください。
-
-
値のシリアライズスキーマとキーのシリアライズスキーマを設定してください。
-
None と設定し、キーまたは値をシリアライズします。
-
Topic と設定し、トピックに登録スキーマを使用して、キーまたは値をシリアライズします。
-
Kafka 接続 の設定
これでKafka接続のテストを行う準備が整いました。Kafkaトピックを実行する方法を学びましょう。
Avroスキーマのシリアライズ形式のサポート
Kafkaの接続は、以下のスキーマをサポートしています。
-
配列
-
Boolean
-
Double
-
Enum
-
Float
-
Int
-
Logical: Date、Time(ミリ秒)、Time(マイクロ秒)、Timestamp(ミリ秒)、Timestamp(マイクロ秒)、Local timestamp(ミリ秒)、Local timestamp(マイクロ秒)、UUID
-
Long
-
Null
-
Record
-
String
-
Union
Avroスキーマにおいて、配列型 "type": "array" のデフォルト値 "default": [ ] はサポートされていません。
{
"default": [],
"type": "array",
"items": {
"type": "record",
"name": "Attribute",
"fields": [
...
]
}
}