Apache Spark BatchのtDeltaLakeOutputプロパティ
これらのプロパティは、Spark Batchジョブのフレームワークで実行されているtDeltaLakeOutputを設定するために使われます。
Spark BatchのtDeltaLakeOutputコンポーネントは、テクニカルファミリーに属しています。
このフレームワークのコンポーネントは、すべてのサブスクリプションベースのビッグデータ対応のTalend製品およびTalend Data Fabricで利用できます。
基本設定
| プロパティ | 説明 |
|---|---|
| [Define how to save the dataset] (データセットを保存する方法を定義) |
データセットストレージを選択します:
Deltaテーブルのデータをマージする方法は、Databricksのドキュメントのmergeを使用したテーブルへのアップサートをご覧ください。 |
| [Define a storage configuration component] (ストレージ設定コンポーネントを定義) |
ターゲットファイルシステムへの接続に関する設定情報の提供で使用する設定コンポーネントを選択します。 このチェックボックスをオフにすると、ターゲットファイルシステムはローカルシステムになります。 使用する接続設定は同じジョブ内にあることが必要です。たとえばジョブにtS3Configurationコンポーネントをドロップした場合は、それを選択し、指定したS3ストレージシステムに結果を書き込めるようになります。 このフィールドは、[Basic settings] (基本設定)ビューで[Define the source of the dataset] (データセットのソースを定義する)ドロップダウンリストから[Files] (ファイル)を選択した時のみ利用可能です。 |
| [Property type] (プロパティタイプ) |
|
| [Schema] (スキーマ)と[Edit schema] (スキーマを編集) |
|
| [Folder/File] (フォルダー/ファイル) |
ファイルシステムで使用するデータを参照するか、パスを入力します。 設定したパスがフォルダーを指す場合、このコンポーネントによりフォルダーに保管されているすべてのファイル(/user/talend/inなど)が読み取られます。サブフォルダーが存在する場合、[Spark configuration] (Spark設定)タブの[Advanced properties] (詳細プロパティ)テーブルでプロパティspark.hadoop.mapreduce.input.fileinputformat.input.dir.recursiveをtrueに設定しない限り、サブフォルダーは自動的に無視されます。
参照用のボタンはSpark Localモードでは機能しません。お使いのディストリビューションで、Talend Studioがサポートしているその他のSpark Yarnモードを使用している場合は、同じジョブ内の設定コンポーネントで接続を適切に設定したことを確認する必要があります。使用されるファイルシステムに応じて設定コンポーネントを使用します。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Files] (ファイル)を選択した時のみ利用可能です。 |
| [Action] (アクション) | ジョブの設定コンポーネントが接続情報を提供するファイルシステムにデータを書き込むための操作を選択します。
Delta Lakeは、ファイルのアップロード時間とこのファイルのメタデータタイムスタンプとの間にわずかな違いを体系的に作成します。データをフィルタリングする必要がある場合は、この違いに留意してください。 このオプションは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Files] (ファイル)または[Metastore] (メタストア)を選択した時のみ利用可能です。 |
| [Source type] (ソースタイプ) |
|
| [Database] (データベース) | 使うDelta Lakeデータベースの名前を二重引用符で囲んで入力します。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Metastore] (メタストア)および[Merge] (マージ)を選択した時のみ利用可能です。 |
| [Target table] (ターゲットテーブル) | 使うテーブルの名前を二重引用符で囲んで入力します。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Metastore] (メタストア)および[Merge] (マージ)を選択した時のみ利用可能です。 |
| [External paths] (外部パス) | データを格納するために、DBFSとは異なるファイルシステムを指すパスを二重引用符で囲んで入力します。ADLSGen2ファイルシステムまたはS3ファイルシステムのいずれかです。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Metastore] (メタストア)を選択した時のみ利用可能です。 |
| [Optimize] (最適化) | SQLステートメントを入力して、データのレイアウトを最適化します。 Delta Lakeデータの最適化の詳細は、DatabricksドキュメントのOptimize(Delta Lake on Databricks)をご覧ください。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Metastore] (メタストア)を選択した時のみ利用可能です。 |
| [Merge ON] (マージON) | マージ操作を適用する入力カラムと出力カラムを示します。入力カラムと出力カラムのペアごとに、ファンクションを指定する必要があります。結果の句は、実行するマージアクションを条件付けます。これらのマージアクションは、[When matched] (一致する場合)および[When not matched] (一致しない場合)オプションで定義されます。 たとえば、航空会社がフライトイベントに関連するデータを処理するとします。彼らは定期的にフライトの出発日の変更を処理し、現在のデータを新しい日付でアップデートする必要があります。フライトのIDに基づいてこの新しいデータを既存のイベントにマージするために、 flightIdを入力列と出力列として示し、==ファンクション。次に、この条件の結果がtrueを返したときに実行するマージアクションを定義できます。この場合、入力列と出力列のflightIdの値が等しいときに条件は真になります。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した時のみ利用可能です。 |
| [When matched] (一致した場合) | テーブルの[マージ]で 定義されたファンクションがtrueを返したときに実行するマージアクションを定義するには、このチェックボックスをオンにします。[When matched] (一致した場合)オプションには、それぞれ1つのマージアクションを定義できます。2つのWhen matchedオプションを定義した場合、最初の1つは節条件を持っていなければならず、2つのオプションは定義された順番で評価されます。 条件フィールドに条件ステートメントを入力して、マージアクションを適用するデータをさらに絞り込むことができます。条件を指定すると、その行の条件が真の場合にのみ、指定された行に対してマージアクションが実行されます。条件を使用すると、マージを高速化できます。条件は、targetTable.column = this.columnの形式に従う必要があります。ここで、targeTableはターゲットテーブルの名前です。[Dataset source type] (データセットソースタイプ)の場合、これは接続の名前です。SQLソースタイプでは、これを[Table alias](テーブルのエイリアス)フィールドに置き換える必要があります。 [Merge action] (マージアクション) ドロップダウンリストで、実行するアクションを選択します。
たとえば、航空会社のデータテーブルに、フライトの乗客数と、フライトがさらに混雑したかどうかを示すブール値が含まれているとします。次の条件は、マージアクションの範囲を、乗客の数に基づいて混雑しているフライトに制限します。flightEvents.nbOfPeople< this.nbOfPeople。マージアクションは、UPDATE SET *アクションを使用してisFlightMoreCrowdedの値をtrueにアップデートすることです。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した時のみ利用可能です。 |
| 一致しない場合 | [テーブルの マージ]で 定義されたファンクションがfalseを返したときに実行するマージアクションを定義するには、このチェックボックスをオンにします。 条件フィールドに条件ステートメントを入力して、マージアクションを適用するデータをさらに絞り込むことができます。条件を指定すると、その行の条件が真の場合にのみ、指定された行に対してマージアクションが実行されます。条件を使用すると、マージを高速化できます。条件は、targetTable.column = this.columnの形式に従う必要があります。ここで、targeTableはターゲットテーブルの名前です。[Dataset source type] (データセットソースタイプ)の場合、これは接続の名前です。SQLソースタイプでは、これを[Table alias](テーブルのエイリアス)フィールドに置き換える必要があります。 [Merge action] (マージアクション) ドロップダウンリストで、実行するアクションを選択します。
たとえば、航空会社のデータテーブルが日付で分割されていて、マージアクションが以下の条件を持っているとします: flightEvents.date = current_date()。マージアクションは、マージオンテーブルで定義されたファンクションがfalseを返す場合に適用され、現在の日付に発生したフライトに対応するデータにのみ適用されます。 このフィールドは、[Basic settings] (基本設定)ビューで[Select how to save the dataset] (データセットを保存する方法を選択)ドロップダウンリストから[Merge] (マージ)を選択した時のみ利用可能です。 |
Unity Catalogからデータセットを取得する場合は、以下のパラメーターでUnity Catalog関連情報を指定する必要があります:
|
詳細設定
| プロパティ | 説明 |
|---|---|
| [Define column partitions] (カラムパーティションを定義) | このチェックボックスをオンにして、入力データのスキーマからのカラムを使用して、表示されるテーブルに入力します。選択したカラムのレコードは、データのパーティションを行うためのキーとして使用されます。 |
| [Sort columns alphabetically] (カラムをアルファベット順にソート) | スキーマのカラムをアルファベット順にソートする場合は、このチェックボックスをオンにします。このチェックボックスをオフのままにすると、これらのカラムはスキーマエディターで定義された順序に従います。 |
| [Merge Schema] (スキーマをマージ) | データセットのスキーマは、時間の経過とともに変化することがよくあります。スキーマが異なる場合に受信データと既存データのスキーマをマージするには、このチェックボックスをオンにします。 このチェックボックスと[Overwrite Schema] (スキーマの上書き)チェックボックスをオフのままにすると、既存のデータのカラムのみが使用されます。 |
| [Overwrite Schema] (スキーマの上書き) | データセットのスキーマは、時間の経過とともに変化することがよくあります。受信データのスキーマを使用して既存のデータのスキーマを上書きするには、このチェックボックスをオンにします。 このチェックボックスと[Merge Schema] (スキーマをマージ)チェックボックスをオフのままにすると、既存のデータのカラムのみが使用されます。 |
使用方法
| 使用方法のガイダンス | 説明 |
|---|---|
| 使用ルール |
このコンポーネントは、終了コンポーネントとして使用され、入力リンクを必要とします。 Delta Lakeは、ファイルのアップロード時間とこのファイルのメタデータタイムスタンプとの間にわずかな違いを体系的に作成します。データをフィルタリングする必要がある場合は、この違いに留意してください。 このDelta Lakeレイヤーは、Data Lakeシステムの上に構築されているため、Data Lakeシステムに対応する設定コンポーネント(tAzureFSConfigurationなど)を使用して、Data Lakeシステムの一部として接続されます。 |
| [Spark Connection] (Spark接続) |
[Run] (実行)ビューの[Spark configuration] (Spark設定)タブで、ジョブ全体でのSparkクラスターへの接続を定義します。また、ジョブでは、依存jarファイルを実行することを想定しているため、Sparkがこれらのjarファイルにアクセスできるように、これらのファイルの転送先にするファイルシステム内のディレクトリーを指定する必要があります。
この接続は、ジョブごとに有効になります。 |