メイン コンテンツをスキップする 補完的コンテンツへスキップ

Apache Spark StreamingのtPubSubInputプロパティ

これらのプロパティは、Spark Streamingのジョブのフレームワークで実行されているtPubSubInputを設定するために使われます。

Spark StreamingtPubSubInputコンポーネントは、メッセージングファミリーに属しています。

Dataproc 1.4以降をSparkクラスターとして使用している場合は、PubServiceを実行できるよう、Google Cloud Platformでのクラスター作成時に[Allow API access to all Google Cloud services] (すべてのGoogle CloudサービスへのAPIアクセスを許可)を必ず選択します。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで利用できます。

基本設定

[Define a Google Cloud configuration component] (Google Cloudの設定コンポーネントを定義)

SparkクラスターにDataprocを使っている場合は、このチェックボックスをオフにします。

それ以外の場合は、このチェックボックスをオンにして、Pub/SubのコンポーネントがtGoogleCloudConfigurationコンポーネントの提供するGoogle Cloud設定情報を使えるようにします。

[Schema] (スキーマ)[Edit schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

このコンポーネントのスキーマは読み取り専用です。メッセージプロデューサーから送信されたメッセージ本文を保管します。

[Output type] (出力タイプ)

次のコンポーネントに送信するデータのデータ型を選択します。

tPubSubInputはPubSub byte[]メッセージをジョブが処理できる文字列に自動変換できるので、通常はStringの使用が推奨されます。ただし、Protobufなど、tPubSubInputで認識できないメッセージ形式の場合は、byteを選択し、次にtJavaRowなどのカスタムコードのコンポーネントを使って、同じジョブの他のコンポーネントがこれらのメッセージを処理できるようにメッセージを文字列にデシリアライズできます。

トピック名

メッセージを利用するトピックの名前を入力します。

サブスクリプション名

指定したトピックを利用する必要があるサブスクリプションの名前を入力します。

サブスクリプションが存在する場合は、指定されたトピックに接続する必要があります。サブスクリプションが存在しない場合は、ランタイムに作成され、指定されたトピックに接続されます。

詳細設定

[Storage level] (ストレージレベル)

[Storage level] (ストレージレベル) ドロップダウンリストが表示されたら、メモリのみ、またはメモリとディスクのように、キャッシュされたRDDの保存方法を選択します。

各ストレージレベルの詳細は、https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence (英語のみ)をご覧ください。

使用方法

使用ルール

このコンポーネントは、開始コンポーネントとして使用され、出力リンクを必要とします。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): クラスターで使われているファイルシステム(tHDFSConfiguration Apache Spark BatchtS3Configuration Apache Spark Batchなど)に対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。

PubSubアクセス許可

DataprocクラスターでPub/Subを使う時は、このクラスターにPub/Subサービスにアクセスするための適切な権限があることをご確認ください。

これを行うには、Google Cloud Platformの詳細オプションで同じプロジェクト内の[Allow API access to all Google Cloud services] (すべてのGoogle CloudサービスへのAPIアクセスを許可)をオンにするか、コマンドラインを使ってスコープを明示的に割り当てることで、Dataprocクラスターを作成します(次の例は低リソースのテストクラスター用です):
gcloud beta dataproc clusters create CLUSTER_ID \
    --zone europe-west1-b \
    --master-machine-type n1-standard-2 \
    --master-boot-disk-size 50 \
    --num-workers 2 \
    --worker-machine-type n1-standard-2 \
    --worker-boot-disk-size 50 \
    --scopes 'https://www.googleapis.com/auth/cloud-platform' \
    --project PROJECT_ID

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。