メイン コンテンツをスキップする 補完的コンテンツへスキップ

Apache Spark StreamingのtPubSubInputAvroプロパティ

これらのプロパティは、Spark Streamingのジョブのフレームワークで実行されているtPubSubInputAvroを設定するために使われます。

Spark StreamingtPubSubInputAvroコンポーネントは、メッセージングファミリーに属しています。

Dataproc 1.4以降をSparkクラスターとして使用している場合は、PubServiceを実行できるよう、Google Cloud Platformでのクラスター作成時に[Allow API access to all Google Cloud services] (すべてのGoogle CloudサービスへのAPIアクセスを許可)を必ず選択します。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで利用できます。

基本設定

[Define a Google Cloud configuration component] (Google Cloudの設定コンポーネントを定義)

SparkクラスターにDataprocを使っている場合は、このチェックボックスをオフにします。

それ以外の場合は、このチェックボックスをオンにして、Pub/SubのコンポーネントがtGoogleCloudConfigurationコンポーネントの提供するGoogle Cloud設定情報を使えるようにします。

スキーマスキーマの編集

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

ここで定義されたスキーマは、Pub/Subで受信したメッセージのバイナリAvroスキーマに正確に対応している必要があります。これを保証するには、tWriteAvroFieldsを使って特定のスキーマを定義する別のSpark Streamingジョブを作成し、tPubSubOutputを使ってこのスキーマでメッセージを書き込むという方法があります。

トピック名

メッセージを利用するトピックの名前を入力します。

サブスクリプション名

指定したトピックを利用する必要があるサブスクリプションの名前を入力します。

サブスクリプションが存在する場合は、指定されたトピックに接続する必要があります。サブスクリプションが存在しない場合は、ランタイムに作成され、指定されたトピックに接続されます。

詳細設定

[Storage level] (ストレージレベル)

[Storage level] (ストレージレベル) ドロップダウンリストが表示されたら、メモリのみ、またはメモリとディスクのように、キャッシュされたRDDの保存方法を選択します。

各ストレージレベルの詳細は、https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence (英語のみ)をご覧ください。

[Use hierarchical mode] (階層モードを使用)

バイナリ(階層を含む) Avroスキーマを、現在のコンポーネントのスキーマエディターに定義されているフラットスキーマにマップする場合は、このこのチェックボックスを選択します。処理するAvroメッセージto be processed is がフラットの場合は、このチェックボックスをオフのままにしておきます。

オンにする場合は、次のパラメーターを設定する必要があります。

  • [Local path to the avro schema] (avroスキーマへのローカルパス): 処理するAvroデータのスキーマを定義するファイルに移動します。

  • [Mapping]: 現在のコンポーネントのスキーマカラムと処理する階層Avroメッセージに保存されているデータの間にマップを作成します。[Node] (ノード)カラム内に、Avroメッセージから読み取るデータをポイントするJSONパスを入力する必要があります。

使用方法

使用ルール

このコンポーネントは、開始コンポーネントとして使用され、出力リンクを必要とします。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): クラスターで使われているファイルシステム(tHDFSConfiguration Apache Spark BatchtS3Configuration Apache Spark Batchなど)に対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。

PubSubアクセス許可

DataprocクラスターでPub/Subを使う時は、このクラスターにPub/Subサービスにアクセスするための適切な権限があることをご確認ください。

これを行うには、Google Cloud Platformの詳細オプションで同じプロジェクト内の[Allow API access to all Google Cloud services] (すべてのGoogle CloudサービスへのAPIアクセスを許可)をオンにするか、コマンドラインを使ってスコープを明示的に割り当てることで、Dataprocクラスターを作成します(次の例は低リソースのテストクラスター用です):
gcloud beta dataproc clusters create CLUSTER_ID \
    --zone europe-west1-b \
    --master-machine-type n1-standard-2 \
    --master-boot-disk-size 50 \
    --num-workers 2 \
    --worker-machine-type n1-standard-2 \
    --worker-boot-disk-size 50 \
    --scopes 'https://www.googleapis.com/auth/cloud-platform' \
    --project PROJECT_ID

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。