Apache Spark StreamingのtKafkaInputプロパティ
これらのプロパティは、Spark Streamingジョブのフレームワークで実行されているtKafkaInputを設定するために使われます。
Spark StreamingのtKafkaInputコンポーネントは、メッセージングファミリーに属しています。
このコンポーネントはTalend Real-Time Big Data PlatformとTalend Data Fabricで利用できます。
基本設定
[Schema] (スキーマ)と[Edit schema] (スキーマを編集) |
スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。 このコンポーネントのスキーマは読み取り専用です。メッセージプロデューサーから送信されたメッセージ本文を保管します。 |
[Output type] (出力タイプ) |
次のコンポーネントに送信するデータのデータ型を選択します。 tKafkaInputはKafka byte[]メッセージをジョブが処理できる文字列に自動変換できるので、通常はStringの使用が推奨されます。ただし、Protobufなど、tKafkaInputで認識できないKafkaメッセージ形式の場合は、byteを選択し、次にtJavaRowなどのカスタムコードのコンポーネントを使って、同じジョブの他のコンポーネントがこれらのメッセージを処理できるようにメッセージを文字列にデシリアライズできます。 |
[Broker list] (ブローカーリスト) |
使用するKafkaクラスターのブローカーノードのアドレスを入力します。 このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。 複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。 |
[Starting offset] (開始オフセット) |
消費されるトピックメッセージの開始ポイントを選択します。 Kafkaでは、メッセージのID連番はoffsetと呼ばれます。このリストから、トピック全体の最も古いメッセージから消費を開始するには[From beginning] (最初から)を選択し、同じコンシューマーグループによって消費され、オフセットがSparkチェックポイント内でSparkによって追跡される最新のメッセージから開始するには[From latest] (最新から)を選択します。 コンポーネントが消費されたメッセージの位置を記憶できるようにするには、ジョブの[Run] (実行)ビューの[Spark configuration] (Spark設定)タブでSpark Streamingのチェックポインティングをアクティブにする必要があります。 各コンシューマーグループには、消費したメッセージの位置を記憶するための独自のカウンターがあります。このため、コンシューマーグループが特定のトピックのメッセージの消費を開始すると、コンシューマグループはトピック全体ではなく、このグループが消費を停止する位置に関してのみ最新のメッセージを認識します。この原則に基づいて、次の動作が予想されます。
|
[Topic name] (トピック名) |
tKafkaInputがメッセージフィードを受け取るトピック名を入力します。 |
[Group ID] (グループID) |
現在のコンシューマー(tKafkaInputコンポーネント)が属するようにしたいコンシューマーグループの名前を入力します。 この時点でこのコンシューマーグループが存在していない場合、実行時に作成されます。 このプロパティは、Spark 2.0を使っている場合、または使うHadoopディストリビューションがSpark 2.0を実行している場合のみ利用できます。使っているSparkのバージョンが不明な場合は、クラスターの管理者に詳細をお問い合わせください。 |
[Set number of records per second to read from each Kafka partition] (各Kafkaパーティションから読み取る秒当たりのレコード件数を設定) |
この数値を二重引用符で囲んで入力し、処理のために送信される各バッチのサイズを制限します。 たとえば、100を入力し、Spark設定タブで定義したバッチ値が2秒の場合、各バッチのパーティションからのサイズは200メッセージです。 このチェックボックスをオフにした場合、コンポーネントは、1秒間で単一のバッチに全メッセージを読み取ろうとしますが、大量のメッセージのためにジョブが応答しなくなる可能性があります。 |
[Use SSL/TLS] (SSL/TLSを使用) |
SSLまたはTLS暗号化接続を有効にする場合は、このチェックボックスを選択します。 次に、同じジョブ内のtSetKeyStoreコンポーネントを使用して暗号化情報を指定する必要があります。 このプロパティは、Spark 2.0を使っている場合、または使うHadoopディストリビューションがSpark 2.0を実行している場合のみ利用できます。使っているSparkのバージョンが不明な場合は、クラスターの管理者に詳細をお問い合わせください。 TrustStoreファイルと使用されているKeyStoreファイルは、Sparkエグゼキューターをホスティングしているすべての単一のSparkノードにローカルに保管されている必要があります。 |
[Use Kerberos authentication] (Kerberos認証を使用) |
使用するKafkaクラスターをKerberosで保護する場合は、このチェックボックスを選択して、定義する関連パラメーターを表示します。
KafkaクラスターをKerberosで保護する方法は、SASLを使用した認証をご覧ください。 このチェックボックスはKafka 0.9.0.1以降で使えます。 |
詳細設定
[Kafka properties] (Kafkaのプロパティ) |
カスタマイズする必要があるKafkaコンシューマープロパティをこのテーブルに追加します。たとえば、ZkTimeoutExceptionを避けるために特定のzookeeper.connection.timeout.ms値を設定できます。 このテーブルで定義できるコンシューマプロパティの詳細は、次のWebサイトにあるKafkaのドキュメンテーションでコンシューマ設定について説明しているセクション(http://kafka.apache.org/documentation.html#consumerconfigs)をご覧ください。 |
[Encoding] (エンコーディング) |
リストからエンコーディングを選択するか、[CUSTOM] (カスタム)を選択して、手動で定義します。 このエンコーディングは、tKafkaInputが入力メッセージをデコードするために使います。 |
使用方法
使用ルール |
このコンポーネントは、開始コンポーネントとして使用され、出力リンクを必要とします。 このコンポーネントは、所属するSpark Streamingのコンポーネントのパレットと共に、Spark Streamingジョブを作成している場合にだけ表示されます。 特に明記していない限り、このドキュメンテーションのシナリオでは、標準ジョブ、つまり従来の Talend Data Integrationジョブだけを扱います。 Sparkの現在のコンポーネントの実装では、KafkaオフセットはSpark自体によって自動的に管理されます。つまり、ZooKeeperまたはKafkaにコミットされる代わりに、オフセットはSparkチェックポイント内で追跡されます。この実装の詳細は、Sparkのドキュメンテーションでダイレクトアプローチのセクションをご覧ください。 |
[Spark Connection] (Spark接続) |
[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
この接続は、ジョブごとに有効になります。 |