メイン コンテンツをスキップする 補完的コンテンツへスキップ

tKafkaInputの標準プロパティ

これらのプロパティは、標準ジョブのフレームワークで実行されているtKafkaInputを設定するために使われます。

標準tKafkaInputコンポーネントは、インターネットファミリーに属しています。

このフレームワーク内のコンポーネントは、ビッグデータ対応のTalend 製品すべて、およびTalend Data Fabricで利用できます。

基本設定

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

このコンポーネントのスキーマは読み取り専用です。メッセージプロデューサーから送信されたメッセージを保管します。

[Output type] (出力タイプ)

次のコンポーネントに送信するデータのデータ型を選択します。

tKafkaInputはKafka byte[]メッセージをジョブが処理できる文字列に自動変換できるので、通常はStringの使用が推奨されます。ただし、Protobufなど、tKafkaInputで認識できないKafkaメッセージ形式の場合は、byteを選択し、次にtJavaRowなどのカスタムコードコンポーネントを使って、同じジョブの他のコンポーネントがこれらのメッセージを処理できるようにメッセージを文字列にデシリアライズできます。

[Use an existing connection] (既存の接続を使用)

定義済みの接続の詳細を再利用する場合は、このチェックボックスをオンにして、[Component List] (コンポーネントリスト)ドロップダウンリストから、目的の接続コンポーネントを選択します。

[Version] (バージョン)

使うKafkaクラスターのバージョンを選択します。

[Zookeeper quorum list] (Zookeeperクォーラムリスト)

使うKafkaクラスターのZookeeperサービスのアドレスを入力します。

このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。

複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。

このフィールドはKafka 0.8.2.0でのみ使えます。

[Broker list] (ブローカーリスト)

使用するKafkaクラスターのブローカーノードのアドレスを入力します。

このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。

複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。

このフィールドはKafka 0.9.0.1以降で使えます。

コンシューマーグループのオフセットをリセット

あるコンシューマーグループを、メッセージをまったく消費していない新しいグループとして処理できるように、このコンシューマーグループ用に保存されているオフセットをクリアする場合は、このチェックボックスをオンにします。

新規コンシューマーグループの開始点

消費されるトピックメッセージの開始ポイントを選択します。

Kafkaでは、メッセージの増加するID番号はoffsetと呼ばれます。新しいコンシューマーグループが開始すると、このリストから[beginning] (最初から)を選択してトピック全体のうちで最も古いメッセージから消費を開始するか、[latest] (最後から)を選択して新しいメッセージを待機できます。

コンシューマグループは、オフセットコミットされた開始のメッセージのみを考慮します。

各コンシューマーグループには、消費したメッセージの位置を記憶するための独自のカウンターがあります。このため、コンシューマーグループが特定のトピックのメッセージの消費を開始すると、コンシューマグループはトピック全体ではなく、このグループが消費を停止する位置に関してのみ最新のメッセージを認識します。この原則に基づいて、次の動作が予想されます。

  • 既存のコンシューマグループを再開する場合、このオプションは、コミットされた開始点がまだない場合のみ、このコンシューマグループの開始点を決定します。それ以外の場合、このコンシューマグループは、このコミットされた開始点から開始します。たとえば、トピックに100のメッセージがあるとします。既存のコンシューマグループが50のメッセージを正しく処理し、それらのオフセットをコミットした場合、同じコンシューマグループはオフセット51から再開します。

  • 新しいコンシューマーグループを作成するか、既存のコンシューマーグループをリセットした場合(どちらの場合も、このグループがこのトピックのメッセージを消費していないことを意味します)、最新のグループから開始すると、この新しいグループが開始し、オフセット101を待機します。

オフセットストレージ

消費済みメッセージのオフセットをコミットする先のシステムを選択します。

デュアルコミットの有効化

オフセットストレージシステムとしてKafkaを選択すると、[Enable dual commit] (デュアルコミットを有効にする)チェックボックスが表示されます。デフォルトでは、ジョブがZookeeperとKafkaの両方にメッセージをコミットできるように選択されています。ジョブをKafkaにのみコミットする場合は、このチェックボックスをオフにします。

自動コミットのオフセット

このチェックボックスを選択すると、tKafkaInputは、指定された各時間間隔の終了時に消費状態を自動的に保存します。表示される[Interval] (間隔)フィールドでこの間隔を定義する必要があります。

オフセットは、各間隔の最後にコミットされます。ジョブがこの時間間隔の途中で停止した場合、この間隔内におけるメッセージの消費状態はコミットされません。

トピック名

tKafkaInputがメッセージフィードを受け取るトピック名を入力します。

コンシューマーグループID

現在のコンシューマー(tKafkaInputコンポーネント)が属するようにしたいコンシューマーグループの名前を入力します。

この時点でこのコンシューマーグループが存在していない場合、実行時に作成されます。

最長処理時間(ミリ秒)に達したら停止

このチェックボックスをオンにして、tKafkaInputの実行が終了するまでの期間(ミリ秒)をポップアップフィールドに入力します。

[Stop after receiving a maximum number of messages] (最大数のメッセージを受信したら停止)

このチェックボックスをオンにして、tKafkaInputが自動的に実行を停止するまでに受信するメッセージの最大数をポップアップフィールドに入力します。

[Stop after maximum time waiting between messages (ms)] (メッセージ間の最大待機時間(ミリ秒)が経過した後に停止)

このチェックボックスをオンにして、tKafkaInputによる新しいメッセージの待機時間(ミリ秒)をポップアップフィールドに入力します。tKafkaInputは、この待機時間が終了するまでに新しいメッセージを受信しなかった場合、自動的に実行を停止します。

[Use SSL/TLS] (SSL/TLSを使用)

SSLまたはTLS暗号化接続を有効にする場合は、このチェックボックスを選択します。

次に、同じジョブ内のtSetKeyStoreコンポーネントを使用して暗号化情報を指定する必要があります。

このチェックボックスはKafka 0.9.0.1以降で使えます。

[Use Kerberos authentication] (Kerberos認証を使用)

使用するKafkaクラスターをKerberosで保護する場合は、このチェックボックスをオンにして、定義する関連パラメーターを表示します。

  • JAAS configuration path (JAAS設定パス): パスを入力するか、Kafkaへのクライアントの認証を実行するジョブによって使われるJAAS設定ファイルを参照します。

    このJAASファイルには、kinitモードまたはkeytabモードのいずれかを使用して、クライアント(TalendによるKafka関連ジョブ)をKafkaブローカーノードに接続する方法が指定されています。JAASファイルは、これらのジョブが実行されるマシンに保管されている必要があります。

    Talend、Kerberos、またはKafkaはこのJAASファイルを提供していません。ユーザーの組織のセキュリティ戦略に応じて、Configuring Kafka client (英語のみ)の説明に従ってJAASファイルを作成する必要があります。

  • Kafka brokers principal name (Kafkaブローカーのプリンシパル名): ブローカークラスターの作成時にブローカーのために定義したKerberosのプリンシパルの主要部分を入力します。たとえばkafka/kafka1.hostname.com@EXAMPLE.COMというプリンシパルの場合、このフィールドへの入力に使用する主要部分はkafkaになります。

  • Set kinit command path (kinitコマンドパス): Kerberosは、そのkinit実行可能ファイルへのデフォルトのパスを使用します。このパスを変更した場合は、このチェックボックスをオンにしてカスタムアクセスパスを入力します。

    このチェックボックスをオフにする場合は、デフォルトのパスが使用されます。

  • Set Kerberos configuration path (Kerberos設定パス): Kerberosは、たとえば、Kerberos 5の設定ファイルであるkrb5.confファイル(または、Windowsの場合はkrb5.ini)へのデフォルトのパスを使用します。このパスを変更した場合は、このチェックボックスをオンにして、Kerberos設定ファイルへのカスタムアクセスパスを入力します。

    このチェックボックスをオフにすると、必要な設定情報を見つけるために、Kerberosによって所定の戦略が適用されます。この戦略の詳細は、Kerberos requirementsLocating the krb5.conf Configuration Fileをご覧ください。

KafkaクラスターをKerberosで保護する方法は、SASLを使用した認証 (英語のみ)をご覧ください。

このチェックボックスはKafka 0.9.0.1以降で使えます。

詳細設定

Kafkaのプロパティ

カスタマイズする必要があるKafkaコンシューマープロパティをこのテーブルに追加します。たとえば、ZkTimeoutExceptionを避けるために特定のzookeeper.connection.timeout.ms値を設定できます。

このテーブルで定義できるコンシューマプロパティの詳細は、次のWebサイトにあるKafkaのドキュメンテーションでコンシューマ設定について説明しているセクション(http://kafka.apache.org/documentation.html#consumerconfigs (英語のみ))をご覧ください。

タイムアウト精度(ミリ秒)

消費できるメッセージがない場合に、その最後にタイムアウト例外が返される期間をミリ秒で入力します。

-1はタイムアウトの設定がないことを示します。

メッセージでオフセットをロード

このチェックボックスを選択すると、次のコンポーネントに、消費済みメッセージのオフセットが出力されます。このパラメーターを選択すると、offsetという名前の読み取り専用カラムがスキーマに追加されます。

[Custom encoding] (カスタムエンコーディング)

保管データを処理する際、エンコーディングの問題が発生することがあります。このような場合は、チェックボックスをオンにして[Encoding] (エンコーディング)リストを表示します。

リストからエンコーディングを選択するか、[CUSTOM] (カスタム)を選択して、手動で定義します。

[tStatCatcher Statistics] (tStatCatcher統計)

このチェックボックスを選択すると、ジョブレベルおよび各コンポーネントレベルで処理メタデータが収集されます。

使用方法

使用ルール

このコンポーネントは、開始コンポーネントとして使用され、出力リンクを必要とします。使う必要のあるKafkaトピックが存在しない場合は、tKafkaCreateTopicコンポーネントと共に使って、後者のコンポーネントによって作成されたトピックを読み取ることができます。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。