メイン コンテンツをスキップする 補完的コンテンツへスキップ

Apache Spark StreamingのtAggregateRowプロパティ

これらのプロパティは、Spark Streamingジョブのフレームワークで実行されているtAggregateRowを設定するために使われます。

Spark StreamingtAggregateRowコンポーネントは、変換処理ファミリーに属しています。

このコンポーネントは、Talend Real Time Big Data PlatformおよびTalend Data Fabricで利用できます。

基本設定

[Schema] (スキーマ)[Edit Schema] (スキーマを編集)

スキーマとは行の説明のことです。処理して次のコンポーネントに渡すフィールド(カラム)数を定義します。Sparkジョブを作成する場合、フィールドの命名時は予約語のlineを避けます。

スキーマを変更するには[Edit schema] (スキーマを編集)をクリックします。現在のスキーマがリポジトリータイプの場合は、3つのオプションを利用できます。

  • [View schema] (スキーマを表示): スキーマのみを表示する場合は、このオプションを選択します。

  • [Change to built-in property] (組み込みのプロパティに変更): ローカルで変更を行うためにスキーマを組み込みに変更する場合は、このオプションを選択します。

  • [Update repository connection] (リポジトリー接続をアップデート): リポジトリーに保存されているスキーマに変更を加え、変更後にそのコンテンツをすべてのジョブにプロパゲートするかどうかを決める場合は、このオプションを選択します。変更を現在のジョブにのみ反映する場合は、変更後、[No] (いいえ)を選択し、[Repository Content] (リポジトリーコンテンツ)ウィンドウで再びこのスキーマのメタデータを選択します。

 

[Built-in] (組み込み): そのコンポーネントに対してのみスキーマを作成し、ローカルに保管します。

 

[Repository] (リポジトリー): スキーマは作成済みで、リポジトリーに保管されています。さまざまなプロジェクトやジョブデザインで再利用できます。

[Group by] (グループ基準)

集計セットを定義し、その値を計算に使用します。

 

[Output Column] (出力カラム): 定義したスキーマストラクチャーに基づいて提供されるリストのカラムラベルを選択します。必要に応じて複数の出力カラムを追加することで、より正確に集計を行うことができます。

例: リストの各国の平均値を算出する場合はCountryを選択し、国内の地域ごとに値を比較する場合はCountryとRegionを選択します。

 

[Input Column] (入力カラム): 集計セットの出力ラベルが異なる必要がある場合に備えて、入力カラムと出力カラムを照合します。

[Operations] (操作)

操作の種類、また計算および出力フィールドに使用する値を選択します。

 

[Output Column] (出力カラム): リストの宛先フィールドを選択します。

 

[Function] (ファンクション): 以下の中から演算子を選択します。

  • [count] (カウント): 行数を計算します

  • [count (distinct)] (カウント(重複を除く)): 重複を除いた行数をカウントします。

  • [min] (最小): 最小値を選択します

  • [max] (最大): 最大値を選択します

  • [avg] (平均): 平均を計算します

  • [sum] (合計): 合計を計算します

  • [standard deviation] (標準偏差): 値のセットの変動を計算します。

従来のETLジョブ内で利用できる一部のファンクション(firstlastなど)は分散環境では意味をなさないため、Sparkジョブでは利用できません。

 

[Input Column] (入力カラム): 集計対象の値を取得する入力カラムを選択します。

 

[Ignore null values] (NULL値を無視): NULL値を無視するカラムの名前に対応するチェックボックスをオンにします。

詳細設定

金融計算精度を使用します。これはSUMおよびAVG操作の最大精度です。オプションにチェックを入れるとチェックを入れない場合と比べてヒープメモリが増加し、処理が遅くなります。

財務精度を使うには、このチェックボックスをオンにします。これは最大精度ですが、より多くのメモリを消費し、処理が遅くなります。

情報メモ警告:

正確な結果を取得するために、出力にはBigDecimal型を使うことをお勧めします。

[Check type overflow (slower)] (タイプのオーバーフローをチェック :低速)

データのタイプをチェックして、ジョブがクラッシュしないことを確認します。

[Check ULP (Unit in the Last Place), ensure that a value will be incremented or decremented correctly, only float and double types. (slower) ] (ULP (Unit in the Last Place)をチェックし、Float型とDouble型のみ値が正しく増減することをご確認ください(低速))

Float型とDouble型で最も正確な結果を得られるようにするには、チェックボックスをオンにします。

使用方法

使用ルール

このコンポーネントは、中間ステップとして使用されます。

このコンポーネントは、所属するSpark Streamingのコンポーネントのパレットと共に、Spark Streamingジョブを作成している場合にだけ表示されます。

特に明記していない限り、このドキュメントのシナリオでは、標準ジョブ、つまり従来の Talend データ統合ジョブだけを扱います。

[Spark Connection] (Spark接続)

[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
  • Yarnモード(YarnクライアントまたはYarnクラスター):
    • Google Dataprocを使用している場合、[Spark configuration] (Spark設定)タブの[Google Storage staging bucket] (Google Storageステージングバケット)フィールドにバケットを指定します。

    • HDInsightを使用している場合、[Spark configuration] (Spark設定)タブの[Windows Azure Storage configuration] (Windows Azure Storage設定)エリアでジョブのデプロイメントに使用するブロブを指定します。

    • Altusを使用する場合は、[Spark configuration] (Spark設定)タブでジョブのデプロイにS3バケットまたはAzure Data Lake Storageを指定します。
    • Quboleを使用する場合は、ジョブにtS3Configurationを追加し、QuboleでS3システム内に実際のビジネスデータを書き込みます。tS3Configurationを使用しないと、このビジネスデータはQubole HDFSシステムに書き込まれ、クラスターをシャットダウンすると破棄されます。
    • オンプレミスのディストリビューションを使用する場合は、クラスターで使われているファイルシステムに対応する設定コンポーネントを使用します。一般的に、このシステムはHDFSになるため、tHDFSConfigurationを使用します。

  • [Standalone mode] (スタンドアロンモード): クラスターで使われているファイルシステム(tHDFSConfiguration Apache Spark BatchtS3Configuration Apache Spark Batchなど)に対応する設定コンポーネントを使用します。

    ジョブ内に設定コンポーネントがない状態でDatabricksを使用している場合、ビジネスデータはDBFS (Databricks Filesystem)に直接書き込まれます。

この接続は、ジョブごとに有効になります。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。