メイン コンテンツをスキップする 補完的コンテンツへスキップ

Apache Spark StreamingのtJavaプロパティ

これらのプロパティは、Spark Streamingジョブのフレームワークで実行されているtJavaを設定するために使われます。

Spark StreamingtJavaコンポーネントは、カスタムコードファミリーに属しています。

このコンポーネントはTalend Real-Time Big Data PlatformTalend Data Fabricで利用できます。

基本設定

[Schema] (スキーマ)[Edit Schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

スキーマを変更するには[Edit schema] (スキーマを編集)をクリックします。現在のスキーマがリポジトリータイプの場合は、3つのオプションを利用できます。

  • [View schema] (スキーマを表示): スキーマのみを表示する場合は、このオプションを選択します。

  • [Change to built-in property] (組み込みのプロパティに変更): ローカルで変更を行うためにスキーマを組み込みに変更する場合は、このオプションを選択します。

  • [Update repository connection] (リポジトリー接続をアップデート): リポジトリーに保存されているスキーマに変更を加え、変更後にそのコンテンツをすべてのジョブにプロパゲートするかどうかを決める場合は、このオプションを選択します。

    変更を現在のジョブにのみ反映する場合は、変更後、[No] (いいえ)を選択し、[Repository Content] (リポジトリーコンテンツ)ウィンドウで再びこのスキーマのメタデータを選択します。

null不可能なプリミティブフィールドの入力値がnullの場合、そのフィールドを含むデータ行は拒否されることにご注意ください。

 

[Built-in] (組み込み): そのコンポーネントに対してのみスキーマを作成し、ローカルに保管します。

 

[Repository] (リポジトリー): スキーマは作成済みで、リポジトリーに保管されています。さまざまなプロジェクトやジョブデザインで再利用できます。

[Code] (コード)

入力リンクからのRDDを処理するため、または新しいRDDをこの入力リンクから作成するためのJavaコードを入力します。

スキーマ、リンクとコンポーネント名を利用してカスタムコードを作成する必要があります。たとえば、このコンポーネントのラベルがtJava_1で、tJava_1への接続のラベルがrow1である場合、入力RDDのクラスはrow1Structで。入力RDD自体はrdd_tJava_1変数と共に利用できます。

詳細な手順は、このコンポーネントの[Code] (コード)フィールドに記載のデフォルトのコメントをご覧ください。

Spark Java APIの詳細は、ApacheのSparkのドキュメント(https://spark.apache.org/docs/latest/api/java/index.html)をご覧ください。

詳細設定

Classes (クラス)

[Basic settings] (基本設定)ビューの[Code] (コード)フィールドに書き込まれたコード内で使用する必要のあるクラスを定義します。

シリアライズで最終的な例外の発生を避けるために、[Code] (コード)フィールド内ではなく、このフィールド内で新しいクラスを定義することをお勧めします。

[Import] (インポート)

インポートするJavaコード、および必要に応じて[Basic settings] (基本設定)ビューの[Code] (コード)フィールドで使用されている外部ライブラリーを入力します。

使用方法

使用ルール

このコンポーネントは、終了コンポーネントとして使用され、入力リンクを必要とします。

コードサンプル [Basic settings] (基本設定)ビューの[Code] (コード)フィールドに次のコードを入力し、入力RDDにカスタム変換を使用して、出力RDDを作成します。mapInToOutは、[Advanced settings] (詳細設定)ビューの[Classes] (クラス)フィールドで定義するクラスです。
outputrdd_tJava_1 = rdd_tJava_1.map(new mapInToOut(job));
[Advanced settings] (詳細設定)ビューの[Classes] (クラス)フィールドに次のコードを入力して、mapInToOutクラスを定義します。
public static class mapInToOut implements
  org.apache.spark.api.java.function.Function<inputStruct,RecordOut_tJava_1>{

     private ContextProperties context = null;
     private java.util.List<org.apache.avro.Schema.Field> fieldsList;
		
     public mapInToOut(JobConf job) {
	   this.context = new ContextProperties(job);
     }
		
     @Override
     public RecordOut_tJava_1 call(inputStruct origStruct) {		
			
	 if (fieldsList == null) {
	     this.fieldsList = (new inputStruct()).getSchema()
			.getFields();
	 }

	 RecordOut_tJava_1 value = new RecordOut_tJava_1();

	 for (org.apache.avro.Schema.Field field : fieldsList) {
	      value.put(field.pos(), origStruct.get(field.pos()));
	 }

	 return value;		
			
      }
}

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): クラスターで使われているファイルシステム(tHDFSConfiguration Apache Spark BatchtS3Configuration Apache Spark Batchなど)に対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。