メイン コンテンツをスキップする 補完的コンテンツへスキップ

cKafkaの標準プロパティ

これらのプロパティは、標準ジョブのフレームワークで実行されているcKafkaを設定するために使われます。

標準cKafkaコンポーネントは、Connectivityファミリーに属しています。

基本設定

[Broker List] (ブローカーリスト)

Kafkaメッセージブローカーのリストをフォームhostname1:port1,hostname2:port2,hostname3:port3で指定します。

[Client Id] (クライアントID)

リクエストを行う時にサーバーに渡すクライアントのID文字列を指定します。

トピック

メッセージブローカーにメッセージトピックの名前を入力します。

GroupId

Kafka ConnectクラスターグループのIDを入力します。

次のオプションは、cKafkaがプロデューサーとして使われている場合のみ利用できます。

 

[Partitioner] (パーティショナー)

Kafkaクラスター全体でのデータの分散方法を決定するパーティショナーを入力します。

[Serializer Class] (シリアライザークラス)

使用するシリアライザーのクラス名を入力してください。

[Key Serializer Class] (キーシリアライザークラス)

使用するキーシリアライザーのクラス名を入力してください。

[Send Buffer (bytes)] (送信バッファー(バイト))

データを送信する時に使うTCP送信バッファーのサイズ。

[Request Required Acks] (必要な承認をリクエスト)

次のように入力して、プロデューサーがメッセージを受信したことをブローカーから確認するまで待つかどうかを指定します。

  • 0は、プロデューサーがブローカーからの確認を待機しないことを意味します。

  • 1は、リーダーレプリカがデータを受信した後にプロデューサーが確認を受け取ることを意味します。

  • -1または[all] (すべて)は、同期のレプリカすべてがデータを受け取った後にプロデューサーが確認を受け取ることを意味します。

[Request Timeout (ms)] (リクエストタイムアウト(ミリ秒))

クライアントがリクエストの応答を待機する最大時間をミリ秒単位で指定します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信するか、再試行回数を使い果たした場合はリクエストを失敗させます。

[Compression Codec] (圧縮Codec)

NONEGZIPSNAPPY、およびLZ4から圧縮タイプを選択します。

[Buffer Memory Size] (バッファーメモリサイズ)

サーバーへの送信を待機しているレコードをバッファリングするためにプロデューサーが使えるメモリの総バイト数を指定します。レコードがサーバーに配信されるよりも速く送信される場合、プロデューサーは[Max Block (ms)] (最大ブロック(ミリ秒))の間ブロックし、その後例外を返します。

[Retries] (再試行)

一時的なエラーの可能性があるため送信に失敗したレコードをクライアントが再送信するには、0より大きい値を指定します。

[Retry Backoff (ms)] (再試行バックオフ(ミリ秒))

特定のトピックパーティションへの失敗したリクエストを再試行する前に待機する時間を指定します。これにより、一部の障害シナリオでタイトなループでリクエストが繰り返し送信されることが回避されます。

[Batch Size] (バッチサイズ)

プロデューサーは、複数のレコードが同じパーティションに送信されている時は常に、レコードをまとめてより少ないリクエストにまとめようとします。これにより、クライアントとサーバーの両方でパフォーマンスが向上します。このフィールドには、デフォルトのバッチサイズをバイト単位で指定します。

[Connection Idle Max (ms)] (接続アイドル最大(ミリ秒))

アイドル接続が閉じられるまでの時間をミリ秒単位で指定します。

[Linger (ms)] (リンガー(ミリ秒))

リクエストの数を減らすために、プロデューサーがミリ秒単位で、単一のバッチリクエストに到着するすべてのレコードをグルーピングするのを待つべき時間を指定します。プロデューサーがバスサイズで指定された量のレコードを受信した場合、この設定での指定に関係なく、プロデューサーはレコードをすぐに送信します。

[Max Block (ms)] (最大ブロック(ミリ秒))

バッファーがいっぱいであるかメタデータが利用できないためにプロデューサーが待機する最大時間をミリ秒単位で指定します。

[Max Request Size] (最大リクエストサイズ)

リクエストの最大サイズをバイト単位で指定します。この設定により、プロデューサーが1つのリクエストで送信するレコードバッチの数が制限され、大量のリクエストの送信が回避されます。

[Receive Buffer (bytes)] (受信バッファー(バイト))

データを読み込む時に使うTCP受信バッファーのサイズを指定します。値が-1の場合、OSのデフォルトが使われます。

[Max in Flight Request] (フライトリクエストの最大)

ブロックする前にクライアントが単一の接続で送信する未確認のリクエストの最大数を指定します。この設定が1より大きい値に設定されており、送信が失敗した場合、再試行が有効になっていると、再試行が原因でメッセージが再配列されるリスクがあります。

[Metadata Max Age (ms)] (メタデータ最大期間(ミリ秒))

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がなくてもメタデータの更新が発生するまでの時間を、ミリ秒単位で指定します。

[Reconnect Backoff (ms)] (再接続バックオフ(ミリ秒))

特定のホストへの再接続を試行する前に待機する時間をミリ秒単位で指定します。これにより、タイトなループでホストに繰り返し接続することが回避されます。

[Use Schema Registry] (スキーマレジストリーを使用) Confluent Schema Registryを使用する場合はこのチェックボックスを選択し、Schema Registry URLフィールドにhost1:port1,host2:port2という形式でスキーマレジストリーインスタンスのURLを入力します。

スキーマレジストリーの詳細は、Confluentのドキュメンテーションをご覧ください。

このオプションは、Talendが提供する8.0.1-R2023-10以降のTalend Studioマンスリーアップデートをインストール済みである場合に利用できます。詳細は管理者にお問い合わせください。

次のオプションは、cKafkaがコンシューマーとして使われている場合のみ利用できます。

 

[Send Buffer (bytes)] (送信バッファー(バイト))

データを送信する時に使うTCP送信バッファーのサイズを指定します。値が-1の場合、OSのデフォルトが使われます。

[Retry Backoff (ms)] (再試行バックオフ(ミリ秒))

特定のトピックパーティションへの失敗したリクエストを再試行する前に待機する時間を指定します。これにより、一部の障害シナリオでタイトなループでリクエストが繰り返し送信されることが回避されます。

[Connection Idle Max (ms)] (接続アイドル最大(ミリ秒))

アイドル接続が閉じられるまでの時間をミリ秒単位で指定します。

[Receive Buffer (bytes)] (受信バッファー(バイト))

データを読み込む時に使うTCP受信バッファーのサイズを指定します。値が-1の場合、OSのデフォルトが使われます。

[Metadata Max Age (ms)] (メタデータ最大期間(ミリ秒))

新しいブローカーまたはパーティションをプロアクティブに検出するためのパーティションリーダーシップの変更がなくてもメタデータの更新が発生するまでの時間を、ミリ秒単位で指定します。

[Reconnect Backoff (ms)] (再接続バックオフ(ミリ秒))

特定のホストへの再接続を試行する前に待機する時間をミリ秒単位で指定します。これにより、タイトなループでホストに繰り返し接続することが回避されます。

[Auto Commit Enable] (自動コミット有効化)

このチェックボックスをオンにすると、メッセージのオフセットがバックグラウンドで定期的にコミットされます。コンシューマーオフセットがKafkaにコミットされる頻度を[Auto Commit Interval (ms)] (自動コミット間隔(ミリ秒))で指定します。

[Fetch Min (bytes)] (フェッチ最小(バイト))

サーバーがフェッチリクエストに対して返す必要のあるデータの最小量をバイト単位で指定します。利用できるデータが不十分な場合、リクエストはそのデータが蓄積するまで待機してから応答します。デフォルト設定の1バイトは、1バイトのデータが使用可能になるか、データの到着を待機してタイムアウトになるとすぐに、フェッチリクエストがフェッチリクエストに応答することを意味します。これを1より大きい値に設定すると、サーバーは大量のデータが蓄積するのを待機するため、サーバーのスループットが少し向上しますが、その反面、レイテンシーが多少増加します。

[Fetch Wait Max (ms)] (フェッチ待機最大(ミリ秒))

[Fetch Min (bytes)] (フェッチ最小(バイト))で指定された要件をすぐに満たすのに十分なデータがない場合に、フェッチリクエストに応答するまでにサーバーがブロックする最大時間を指定します。

[Auto Offset Reset] (自動オフセットリセット)

Kafkaに初期オフセットがない場合、または現在のオフセットがサーバー上に存在しなくなった場合の対処方法を次から選択します。

  • [EARLIEST] (最も早い): オフセットを最も早いオフセットに自動的にリセットします。

  • [LATEST] (最も遅い): オフセットを最も遅いオフセットに自動的にリセットします。

  • [NONE] (なし): コンシューマーのグループに以前のオフセットが見つからない場合は、コンシューマーに例外を返します。

[Heartbeat Interval (ms)] (ハートビート間隔(ミリ秒))

Kafkaのグループ管理機能を使う場合、コンシューマーコーディネーターへのハートビート間の予想時間をミリ秒単位で指定します。ハートビートは、コンシューマーのセッションがアクティブなままであることを確認し、新しいコンシューマーがグループに参加または脱退する時に再調整を容易にするために使われます。この値は[Session Timeout (ms)] (セッションタイムアウト(ミリ秒))よりも低く設定する必要がありますが、通常はその値の1/3以下にすべきです。通常の再調整の予想時間を制御するために、さらに低く調整しても構いません。

[Maximum Partition Fetch (bytes)] (最大パーティションフェッチ(バイト))

サーバーが返すパーティションごとのデータの最大量をバイト単位で指定します。フェッチの最初の空でないパーティションの最初のメッセージがこの制限よりも大きい場合でも、メッセージは返され、コンシューマーは処理を進めることができます。

[Session Timeout (ms)] (セッションタイムアウト(ミリ秒))

Kafkaのグループ管理機能を使う時に、コンシューマーの障害を検出するために使われるタイムアウトをミリ秒単位で指定します。コンシューマーは定期的にハートビートを送信して、ブローカーにその活性を示します。このセッションタイムアウトの期限が切れる前にブローカーがハートビートを受信しなかった場合、ブローカーはこのコンシューマーをグループから削除し、再調整を開始します。

[Partition Assignor] (パーティションアサイナー)

グループ管理が使われている場合に、クライアントがコンシューマーインスタンス間でパーティションの所有権を分散するために使うパーティション割り当て戦略のクラス名を指定します。

[Request Timeout (ms)] (リクエストタイムアウト(ミリ秒))

クライアントがリクエストの応答を待機する最大時間をミリ秒単位で指定します。タイムアウトが経過する前に応答が受信されない場合、クライアントは必要に応じてリクエストを再送信するか、再試行回数を使い果たした場合はリクエストを失敗させます。

[Use Schema Registry] (スキーマレジストリーを使用) Confluent Schema Registryを使用する場合はこのチェックボックスを選択し、Schema Registry URLフィールドにhost1:port1,host2:port2という形式でスキーマレジストリーインスタンスのURLを入力します。

スキーマレジストリーの詳細は、Confluentのドキュメンテーションをご覧ください。

このオプションは、Talendが提供する8.0.1-R2023-10以降のTalend Studioマンスリーアップデートをインストール済みである場合に利用できます。詳細は管理者にお問い合わせください。

詳細設定

[Kafka Properties] (Kafkaのプロパティ)

オプションの引数を対応するテーブルに設定します。[+]を必要な回数だけクリックして、テーブルに引数を追加します。次に、対応する値フィールドをクリックして、値を入力します。利用可能なオプションは、http://kafka.apache.org/documentation.htmlを参照してください。

[SSL Key Password] (SSLキーパスワード)

[...]をクリックし、キーストアファイルにプライベートキーのパスワードを二重引用符で囲んで入力します。

[SSL Keystore Location] (SSLキーストアロケーション)

キーストアファイルのロケーションを入力します。

[SSL Keystore Password] (SSLキーストアパスワード)

[...]をクリックし、キーストアファイルのパスワードを二重引用符で囲んで入力します。これは[SSL Keystore Location] (SSLキーストアロケーション)が設定されている場合にのみ必要です。

[SSL Truststore Location] (SSLトラストストアロケーション)

トラストストアファイルのロケーションを入力します。

[SSL Truststore Password] (SSLトラストストアパスワード)

[...]をクリックし、トラストストアファイルのパスワードを二重引用符で囲んで入力します。

[SSL Cipher Suits] (SSL暗号スイート)

暗号スイートのリストを入力します。これは、TLSまたはSSLネットワークプロトコルを使うネットワーク接続のセキュリティ設定をネゴシエートするために使われる、認証、暗号化、MAC、およびキー交換アルゴリズムの名前付きの組み合わせです。デフォルトでは、利用可能なすべての暗号スイートがサポートされています。

[SSL Endpoint Algorithm] (SSLエンドポイントアルゴリズム)

エンドポイント識別アルゴリズムを入力し、サーバー証明書を使ってサーバーホスト名を検証します。

[Kerberos Service Name] (Kerberosサービス名)

Kafkaが実行するKerberosプリンシパル名を入力します。

[Security Protocol] (セキュリティプロトコル)

ブローカーとの通信に使うプロトコルを、PlaintextSSLSASL over Plaintext、およびSASL over SSLから選択します。

使用方法

使用ルール

cKafkaは、ルート内の開始、中間、または終了コンポーネントとして使います。

制限事項

該当なし

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。