tKafkaInputの標準プロパティ
これらのプロパティは、標準ジョブのフレームワークで実行されているtKafkaInputを設定するために使われます。
標準のtKafkaInputコンポーネントは、インターネットファミリーに属しています。
このフレームワークのコンポーネントは、すべてのビッグデータ対応のTalend製品およびTalend Data Fabricで使用できます。
基本設定
[Schema] (スキーマ)と[Edit schema] (スキーマを編集) |
スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。 このコンポーネントのスキーマは読み取り専用です。メッセージプロデューサーから送信されたメッセージを保管します。 |
出力タイプ |
ドロップダウンリストから次のコンポーネントに送信するデータの型を選択します。
tKafkaInputはKafka byte[]メッセージをジョブが処理できる文字列に自動変換できるので、通常はStringの使用が推奨されます。ただし、Protobufなど、tKafkaInputで認識できないKafkaメッセージ形式の場合は、byteを選択し、次にtJavaRowなどのカスタムコードのコンポーネントを使って、同じジョブの他のコンポーネントがこれらのメッセージを処理できるようにメッセージを文字列にデシリアライズできます。 |
[Use an existing connection] (既存の接続を使用) |
定義済みの接続の詳細を再利用する場合は、このチェックボックスを選択して、[Component List] (コンポーネントリスト)ドロップダウンリストから、目的の接続コンポーネントを選択します。 |
[Version] (バージョン) |
使うKafkaクラスターのバージョンを選択します。 Talendが提供する8.0.1-R2024-02以降のTalend Studioマンスリーアップデートをインストール済みである場合、Kafka 2.4.x以前のバージョンは非推奨となります。 |
[Zookeeper quorum list] (Zookeeperクォーラムリスト) |
使うKafkaクラスターのZooKeeperサービスのアドレスを入力します。 このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。 複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。 このフィールドはKafka 0.8.2.0でのみ使えます。 |
[Broker list] (ブローカーリスト) |
使用するKafkaクラスターのブローカーノードのアドレスを入力します。 このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。 複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。 このフィールドはKafka 0.9.0.1以降で使えます。 |
[Topic name] (トピック名) |
tKafkaInputがメッセージフィードを受け取るトピック名を入力します。 |
[Consumer group ID] (コンシューマーグループID) |
現在のコンシューマー(tKafkaInputコンポーネント)が属するようにしたいコンシューマーグループの名前を入力します。 この時点でこのコンシューマーグループが存在していない場合、実行時に作成されます。 |
[Reset offsets on consumer group] (コンシューマーグループのオフセットをリセット) |
あるコンシューマーグループを、メッセージをまったく消費していない新しいグループとして処理できるように、このコンシューマーグループ用に保存されているオフセットをクリアする場合は、このチェックボックスを選択します。 |
[New consumer group starts from] (新規コンシューマーグループの開始点) |
消費されるトピックメッセージの開始ポイントを選択します。 Kafkaでは、メッセージの増加するID番号はoffsetと呼ばれます。新しいコンシューマーグループが開始すると、このリストから[beginning] (最初から)を選択してトピック全体のうちで最も古いメッセージから消費を開始するか、[latest] (最後から)を選択して新しいメッセージを待機できます。 コンシューマグループは、オフセットコミットされた開始のメッセージのみを考慮します。 各コンシューマーグループには、消費したメッセージの位置を記憶するための独自のカウンターがあります。このため、コンシューマーグループが特定のトピックのメッセージの消費を開始すると、コンシューマグループはトピック全体ではなく、このグループが消費を停止する位置に関してのみ最新のメッセージを認識します。この原則に基づいて、次の動作が予想されます。
|
[Auto-commit offsets] (自動コミットのオフセット) |
このチェックボックスを選択すると、tKafkaInputは、指定された各時間間隔の終了時に消費状態を自動的に保存します。表示される[Interval] (間隔)フィールドでこの間隔を定義する必要があります。 オフセットは、各間隔の最後にコミットされます。ジョブがこの時間間隔の途中で停止した場合、この間隔内におけるメッセージの消費状態はコミットされません。 |
[Stop after a maximum total duration (ms)] (最長処理時間(ミリ秒)に達したら停止) |
このチェックボックスを選択して、tKafkaInputの実行が終了するまでの期間(ミリ秒)をポップアップフィールドに入力します。 |
[Stop after receiving a maximum number of messages] (最大数のメッセージを受信したら停止) |
このチェックボックスを選択して、tKafkaInputが自動的に実行を停止するまでに受信するメッセージの最大数をポップアップフィールドに入力します。 |
[Stop after maximum time waiting between messages (ms)] (メッセージ間の最大待機時間(ミリ秒)が経過した後に停止) |
このチェックボックスを選択して、tKafkaInputによる新しいメッセージの待機時間(ミリ秒)をポップアップフィールドに入力します。tKafkaInputは、この待機時間が終了するまでに新しいメッセージを受信しなかった場合、自動的に実行を停止します。 |
[Use SSL/TLS] (SSL/TLSを使用) |
SSLまたはTLS暗号化接続を有効にする場合は、このチェックボックスを選択します。 このチェックボックスはKafka 0.9.0.1以降で使えます。 |
[Set keystore] (キーストアを設定) |
このチェックボックスを選択すると、tSetKeystoreコンポーネント経由のSSLまたはTLS暗号化接続が有効になります。 次に、同じジョブ内のtSetKeyStoreコンポーネントを使用して暗号化情報を指定する必要があります。 このチェックボックスは、[Use SSL/TLS] (SSL/TLSを使用)チェックボックスをオンにすると使用できます。 情報メモ注: このオプションは、Talendが提供する8.0.1-R2022-05以降のTalend Studioマンスリーアップデートをインストール済みである場合に利用できます。詳細は管理者にお問い合わせください。
|
[Use Kerberos authentication] (Kerberos認証を使用) |
使用するKafkaクラスターをKerberosで保護する場合は、このチェックボックスを選択して、定義する関連パラメーターを表示します。
KafkaクラスターをKerberosで保護する方法は、SASLを使用した認証をご覧ください。 このチェックボックスはKafka 0.9.0.1以降で使えます。 |
詳細設定
[Kafka properties] (Kafkaのプロパティ) |
カスタマイズする必要があるKafkaコンシューマープロパティをこのテーブルに追加します。たとえば、ZkTimeoutExceptionを避けるために特定のzookeeper.connection.timeout.ms値を設定できます。 また、SSL暗号化などのセキュリティプロパティはssl.truststore.locationやssl.keystore.locationで設定することもできます。 このテーブルで定義できるコンシューマプロパティの詳細は、次のWebサイトにあるKafkaのドキュメンテーションでコンシューマ設定について説明しているセクション(http://kafka.apache.org/documentation.html#consumerconfigs)をご覧ください。 |
[Apply security properties after advanced Kafka properties] (Kafkaの詳細プロパティの後にセキュリティのプロパティを適用) |
このチェックボックスを選択すると、[Advanced settings] (基本設定)ビューで[Use SSL/TLS] (SSL/TLSを使用)チェックボックスが選択されている場合に、tSetKeyStoreコンポーネントで設定されているセキュリティプロパティではなく、[Advanced settings] (基本設定)ビューの[Kafka properties] (Kafkaのプロパティ)テーブルで設定されているセキュリティプロパティが最初に考慮されます。 |
[Timeout precision(ms)] (タイムアウト精度(ミリ秒)) |
消費できるメッセージがない場合に、その最後にタイムアウト例外が返される期間をミリ秒で入力します。 値-1はタイムアウトの設定がないことを示します。 |
[Use schema registry] (スキーマレジストリーを使用) |
このチェックボックスを選択すると、Confluent Schema Registryが使用され、定義する次の関連パラメーターが表示されます。
スキーマレジストリーの詳細は、Confluentのドキュメンテーションをご覧ください。 このオプションは、[Basic settings] (基本設定)ビューで[Output type] (出力タイプ)ドロップダウンリストからConsumerRecordが選択されている場合のみ利用できます。 情報メモ注: このオプションは、Talendが提供する8.0.1-R2022-01以降のTalend Studioマンスリーアップデートをインストール済みである場合に利用できます。詳細は管理者にお問い合わせください。
|
[Load the offset with the message] (メッセージでオフセットをロード) |
このチェックボックスを選択すると、次のコンポーネントに、消費済みメッセージのオフセットが出力されます。このパラメーターを選択すると、offsetという名前の読み取り専用カラムがスキーマに追加されます。 このプロパティは、[Basic settings] (基本設定)ビューの[Output type] (出力タイプ)ドロップダウンリストでStringかByte[]が選択されている場合のみ利用できます。 |
[Custom encoding] (カスタムエンコーディング) |
保管データを処理する際、エンコーディングの問題が発生することがあります。このような場合は、チェックボックスをオンにして[Encoding] (エンコーディング)リストを表示します。 リストからエンコーディングを選択するか、[CUSTOM] (カスタム)を選択して、手動で定義します。 このプロパティは、[Basic settings] (基本設定)ビューの[Output type] (出力タイプ)ドロップダウンリストでStringかByte[]が選択されている場合のみ利用できます。 |
[tStatCatcher Statistics] (tStatCatcher統計) |
このチェックボックスを選択すると、ジョブレベルおよび各コンポーネントレベルで処理メタデータが収集されます。 |
グローバル変数
ERROR_MESSAGE |
エラー発生時にコンポーネントによって生成されるエラーメッセージ。これはAfter変数で、文字列を返します。 |
使用方法
使用ルール |
このコンポーネントは、開始コンポーネントとして使用され、出力リンクを必要とします。使う必要のあるKafkaトピックが存在しない場合は、tKafkaCreateTopicコンポーネントと共に使って、後者のコンポーネントによって作成されたトピックを読み取ることができます。 |