Kafka接続の設定

API Connection Manager で、Apache Kafkaの接続を設定できます。

これにより、1つまたは複数のパブリッシャーが送信し、1つまたは複数のコンシューマーが読み込むメッセージを格納するKafkaトピックに接続できます。Kafkaの詳細情報については、 Apache Kafkaのドキュメントを参照してください。

接続の設定

Kafka接続を設定するには、以下の手順に従ってください。

  1. API Connection Manager を開きます。

  2. 新しい接続を追加します。

  3. Edit(編集)セクションに移動してください。

  4. 接続の名前を指定します。

  5. Type ドロップダウンメニューから、 Kafka を選択します。

  6. 接続したい Topic の名前を入力してください。

  7. Host を指定します。これはKafkaサーバホストの名前またはIPアドレスです。

  8. リッスンする Port を入力してください。

  9. コンシューマグループの GroupId を指定してください。これはこのトピックに登録したコンシューマのグループ名です。

  10. レコード(メッセージ)を保存するパーティションの番号を入力してください。

    Kafkaトピックは複数のパーティションに分割され、大量のデータを1台または複数のサーバーに分散させることができます。各パーティションには、 05 などの番号があります。

  11. オプションで、特定のKafka接続の設定を含む ClientPropertiesFile へのパスを指定します。使用可能なプロパティについては、 Kafkaのドキュメントを参照してください。

  12. コミットせずにパーティションから記録を取得するために、ピーキングを有効にしてください。

    これにより、パーティションの位置に関係なく、また、オフセットに影響を与えることなく、任意のレコードを読むことができます。レコードの位置を定義するオフセットは、どのレコードがすでに消費されているか、 API Engine が次にどの未読レコードをプルする必要があるかを示します。

  13. 次のオプションのいずれかを使用して、メッセージの KeyType を指定してください。

    • Ignore を選択して、メッセージを null に設定し、キーではなく、値のみをシリアライズします。

    • None を選択し、値とキーの両方をシリアライズから除外します。

    • 整数メッセージ。

    • 文字列メッセージ。

  14. Treat warning as error を選択して、実行中に発生する警告をエラーとして表示します。

  15. 接続に認証が必要な場合、ユーザー名パスワードで認証するか、SSLを介して認証するかの2つのオプションがあります。SSLを選択した場合は、代わりに以下のフィールドを入力してください。

    • Certificate ファイルを .pem 形式で指定します。

    • Passphrase を入力します。これは秘密鍵のパスフレーズです。

    • Private Keyに、 .pem 形式で秘密鍵へのパスを指定します。

    • CA Certificate.pem 形式で指定します。これはKafkaブローカーの証明書を検証するためのルートCA証明書です。

  16. オプションとして、 XMLとウェブサービスのセキュリティを設定してください。

  17. KafkaClientと Kafkaブローカー間の安全で認証された接続を確立するには、Clientプロパティ ファイルシークレットを定義します。これらのシークレットは、機密情報を保存します。

  18. オプションとして、KafkaメッセージにAvroスキーマベースのシリアライズを追加して、レコードの値のデータスキーマを定義します。このスキーマには、値のフィールドとそのデータ型が記述されています。Avroスキーマの詳細情報については、 Apache Avroのドキュメントを参照してください。

    APIエンジン3.0 は、Avroスキーマの配列タイプに対するデフォルト値をサポートしていません。詳細情報については、この例を参照してください。

    Schema Type ドロップダウンメニューから Avro を選択し、JSON形式のデータ構造を定義した Schema Registry UrlSchema を追加します。

    スキーマレジストリがSSLを必要とする場合は、以下のフィールドにご記入ください。

    • スキーマレジストリのCA を指定してください。これは CA証明書へのディレクトリパスです。

    • スキーマキーストアレジストリにおいて、認証に使用したキーストアのパスを指定してください。

    • スキーマキーストアレジストリパスワードに、キーストアのパスワードを入力してください。

  19. 値のシリアライズスキーマキーのシリアライズスキーマを設定してください。

    • None と設定し、キーまたは値をシリアライズします。

    • Topic と設定し、トピックに登録スキーマを使用して、キーまたは値をシリアライズします。

Kafka 接続 の設定

これでKafka接続のテストを行う準備が整いました。Kafkaトピックを実行する方法を学びましょう。

Avroスキーマのシリアライズ形式のサポート

Kafkaの接続は、以下のスキーマをサポートしています。

  • 配列

  • Boolean

  • Double

  • Enum

  • Float

  • Int

  • Logical: Date、Time(ミリ秒)、Time(マイクロ秒)、Timestamp(ミリ秒)、Timestamp(マイクロ秒)、Local timestamp(ミリ秒)、Local timestamp(マイクロ秒)、UUID

  • Long

  • Null

  • Record

  • String

  • Union

Avroスキーマにおける例外

Avroスキーマにおいて、配列型 "type": "array" のデフォルト値 "default": [ ] はサポートされていません。

コピー
{
"default": [],
"type": "array",
"items": {
    "type": "record",
    "name": "Attribute",
    "fields": [
        ...
    ]
    }
}