メイン コンテンツをスキップする 補完的コンテンツへスキップ

Apache Spark StreamingのtKafkaInputプロパティ

これらのプロパティは、Spark Streamingジョブのフレームワークで実行されているtKafkaInputを設定するために使われます。

Spark StreamingtKafkaInputコンポーネントは、メッセージングファミリーに属しています。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで利用できます。

基本設定

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

このコンポーネントのスキーマは読み取り専用です。メッセージプロデューサーから送信されたメッセージ本文を保管します。

[Output type] (出力タイプ)

次のコンポーネントに送信するデータのデータ型を選択します。

tKafkaInputはKafka byte[]メッセージをジョブが処理できる文字列に自動変換できるので、通常はStringの使用が推奨されます。ただし、Protobufなど、tKafkaInputで認識できないKafkaメッセージ形式の場合は、byteを選択し、次にtJavaRowなどのカスタムコードコンポーネントを使って、同じジョブの他のコンポーネントがこれらのメッセージを処理できるようにメッセージを文字列にデシリアライズできます。

[Broker list] (ブローカーリスト)

使用するKafkaクラスターのブローカーノードのアドレスを入力します。

このアドレスの形式はhostname:portです。この情報は、このKafkaクラスター内のホスティングノードの名前とポートです。

複数のアドレスを指定する必要がある場合は、コンマ(,)で区切ります。

[Starting offset] (開始オフセット)

消費されるトピックメッセージの開始ポイントを選択します。

Kafkaでは、メッセージのID連番はoffsetと呼ばれます。このリストから、トピック全体の最も古いメッセージから消費を開始するには[From beginning] (最初から)を選択し、同じコンシューマーグループによって消費され、オフセットがSparkチェックポイント内でSparkによって追跡される最新のメッセージから開始するには[From latest] (最新から)を選択します。

コンポーネントが消費されたメッセージの位置を記憶できるようにするには、ジョブの[Run] (実行)ビューの[Spark configuration] (Spark設定)タブでSpark Streamingのチェックポインティングをアクティブにする必要があります。

各コンシューマーグループには、消費したメッセージの位置を記憶するための独自のカウンターがあります。このため、コンシューマーグループが特定のトピックのメッセージの消費を開始すると、コンシューマグループはトピック全体ではなく、このグループが消費を停止する位置に関してのみ最新のメッセージを認識します。この原則に基づいて、次の動作が予想されます。

  • トピックにたとえば100のメッセージがあるとします。コンシューマグループがオフセット50のメッセージで消費を停止した場合、[From latest] (最新から)を選択すると、同じコンシューマグループはオフセット 51から再開します。

  • 新しいコンシューマーグループを作成するか、既存のコンシューマーグループをリセットした場合(どちらの場合も、このグループがこのトピックのメッセージを消費していないことを意味します)、最新のグループから開始すると、この新しいグループが開始し、オフセット101を待機します。

[Topic name] (トピック名)

tKafkaInputがメッセージフィードを受け取るトピック名を入力します。

[Group ID] (グループID)

現在のコンシューマー(tKafkaInputコンポーネント)が属するようにしたいコンシューマーグループの名前を入力します。

この時点でこのコンシューマーグループが存在していない場合、実行時に作成されます。

このプロパティは、Spark 2.0を使っている場合、または使うHadoopディストリビューションがSpark 2.0を実行している場合のみ利用できます。使っているSparkのバージョンが不明な場合は、クラスターの管理者に詳細をお問い合わせください。

[Set number of records per second to read from each Kafka partition] (各Kafkaパーティションから読み取る秒当たりのレコード件数を設定)

この数値を二重引用符で囲んで入力し、処理のために送信される各バッチのサイズを制限します。

たとえば、100を入力し、Spark設定タブで定義したバッチ値が2秒の場合、各バッチのパーティションからのサイズは200メッセージです。

このチェックボックスをオフにした場合、コンポーネントは、1秒間で単一のバッチに全メッセージを読み取ろうとしますが、大量のメッセージのためにジョブが応答しなくなる可能性があります。

[Use SSL/TLS] (SSL/TLSを使用)

SSLまたはTLS暗号化接続を有効にする場合は、このチェックボックスを選択します。

次に、同じジョブ内のtSetKeyStoreコンポーネントを使用して暗号化情報を指定する必要があります。

このプロパティは、Spark 2.0を使っている場合、または使うHadoopディストリビューションがSpark 2.0を実行している場合のみ利用できます。使っているSparkのバージョンが不明な場合は、クラスターの管理者に詳細をお問い合わせください。

TrustStoreファイルと使用されているKeyStoreファイルは、Sparkエグゼキューターをホスティングしているすべての単一のSparkノードにローカルに保管されている必要があります。

[Use Kerberos authentication] (Kerberos認証を使用)

使用するKafkaクラスターをKerberosで保護する場合は、このチェックボックスをオンにして、定義する関連パラメーターを表示します。

  • JAAS configuration path (JAAS設定パス): パスを入力するか、Kafkaへのクライアントの認証を実行するジョブによって使われるJAAS設定ファイルを参照します。

    このJAASファイルには、kinitモードまたはkeytabモードのいずれかを使用して、クライアント(TalendによるKafka関連ジョブ)をKafkaブローカーノードに接続する方法が指定されています。JAASファイルは、これらのジョブが実行されるマシンに保管されている必要があります。

    Talend、Kerberos、またはKafkaはこのJAASファイルを提供していません。ユーザーの組織のセキュリティ戦略に応じて、Configuring Kafka client (英語のみ)の説明に従ってJAASファイルを作成する必要があります。

  • Kafka brokers principal name (Kafkaブローカーのプリンシパル名): ブローカークラスターの作成時にブローカーのために定義したKerberosのプリンシパルの主要部分を入力します。たとえばkafka/kafka1.hostname.com@EXAMPLE.COMというプリンシパルの場合、このフィールドへの入力に使用する主要部分はkafkaになります。

  • Set kinit command path (kinitコマンドパス): Kerberosは、そのkinit実行可能ファイルへのデフォルトのパスを使用します。このパスを変更した場合は、このチェックボックスをオンにしてカスタムアクセスパスを入力します。

    このチェックボックスをオフにする場合は、デフォルトのパスが使用されます。

  • Set Kerberos configuration path (Kerberos設定パス): Kerberosは、たとえば、Kerberos 5の設定ファイルであるkrb5.confファイル(または、Windowsの場合はkrb5.ini)へのデフォルトのパスを使用します。このパスを変更した場合は、このチェックボックスをオンにして、Kerberos設定ファイルへのカスタムアクセスパスを入力します。

    このチェックボックスをオフにすると、必要な設定情報を見つけるために、Kerberosによって所定の戦略が適用されます。この戦略の詳細は、Kerberos requirementsLocating the krb5.conf Configuration Fileをご覧ください。

KafkaクラスターをKerberosで保護する方法は、SASLを使用した認証 (英語のみ)をご覧ください。

このチェックボックスはKafka 0.9.0.1以降で使えます。

詳細設定

[Kafka properties] (Kafkaのプロパティ)

カスタマイズする必要があるKafkaコンシューマープロパティをこのテーブルに追加します。たとえば、ZkTimeoutExceptionを避けるために特定のzookeeper.connection.timeout.ms値を設定できます。

このテーブルで定義できるコンシューマプロパティの詳細は、次のWebサイトにあるKafkaのドキュメンテーションでコンシューマ設定について説明しているセクション(http://kafka.apache.org/documentation.html#consumerconfigs (英語のみ))をご覧ください。

[Encoding] (エンコーディング)

リストからエンコーディングを選択するか、[CUSTOM] (カスタム)を選択して、手動で定義します。

このエンコーディングは、tKafkaInputが入力メッセージをデコードするために使います。

使用方法

使用ルール

このコンポーネントは、開始コンポーネントとして使用され、出力リンクを必要とします。

このコンポーネントは、所属するSpark Streamingのコンポーネントのパレットと共に、Spark Streamingジョブを作成している場合にだけ表示されます。

特に明記していない限り、このドキュメントのシナリオでは、標準ジョブ、つまり従来の Talend データ統合ジョブだけを扱います。

Sparkの現在のコンポーネントの実装では、KafkaオフセットはSpark自体によって自動的に管理されます。つまり、ZookeeperまたはKafkaにコミットされる代わりに、オフセットはSparkチェックポイント内で追跡されます。この実装については、Sparkのドキュメンテーションでダイレクトアプローチのセクション(http://spark.apache.org/docs/latest/streaming-kafka-integration.html (英語のみ))をご覧ください。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): クラスターで使われているファイルシステム(tHDFSConfiguration Apache Spark BatchtS3Configuration Apache Spark Batchなど)に対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。