変換フローを追加
変換タスクにフローを含めることができます。フロー デザイナーを使用すれば、ソース、プロセッサー、ターゲットを使って変換フローを作成し、複雑または単純な変換を定義できるようになります。
変換フローとプロセッサーは論理表現です。つまり、ELT タスクではターゲットごとにテーブルが 1 つのみ作成され、どのプロセッサーもターゲットごとに 1 つの SQL 文にコンパイルされます。
サポートされるターゲット プラットフォーム
変換フローでは、次のプラットフォームがサポートされています。
-
Snowflake
-
Databricks
-
Google BigQuery
-
Azure Synapse Analytics
-
Microsoft Fabric
-
Microsoft SQL Server
前提条件
変換フローを作成する前に、次の操作が必要です:
- 変換フローで使用したいオンボード データをストレージ タスクに入力するか、既存のデータを登録します。データのオンボードと登録方法の詳細については、「 データのオンボーディング および データ プラットフォームに既存のデータを登録する」を参照してください。
- 変換フローでソースとして使用するストレージまたは変換タスクを準備し、実行します。ストレージ タスクの実行は必須ではありませんが、フローの各ステップでデータ プレビューを表示できるようにすることをお勧めします。
変換フローを作成
有効な変換フローを作成するには、少なくともソース データセットが 1 つ、および定義されたキーを持つ名前付きターゲットが 1 つ必要です。
- データ パイプラインで [データを変換] タスクを開きます。
- [変換] で、変換フローに含めるソース データセットを選択して [変換フローを追加] をクリックします。
[変換フローを追加] が表示されます。ここで変換を設定できます。
-
[名前] にターゲット データセットの名前を入力します。
[説明] に長い説明を追加することもできます。
-
[実体化] で、変換された出力を実体化するかどうかを選択します。データ タスク設定から設定を継承することを選択できます。
-
[オン] にするとテーブルが作成され、関連する ELT 処理が実行されます。
-
[オフ] にすると、オンザフライで変換を実行するビューが作成されます。
-
-
増分ロードを使用すると、マクロを使用して処理されるデータのセットを減らすためにフィルターまたはその他の条件を適用することで、増分データ ロードのクエリを調整できます。増分ロードは、データがテーブルとして実体化されている場合にのみ使用できます。
-
増分ロードが [オン] の場合
タスクの最初の実行では初期ロードが実行され、クエリのすべての結果がターゲット テーブルに挿入されます。後続の実行では、増分処理用に定義したフィルターまたは特定の条件を利用して、増分ロードが実行されます。増分ロード中、タスクはデータを更新または挿入としてのみ処理し、削除は管理されません。
-
増分ロードが [オフ] の場合
タスクの最初の実行では初期ロードが実行され、クエリのすべての結果がターゲット テーブルに挿入されます。後続の実行では、クエリのすべての結果をターゲット テーブルと比較し、新規、変更、または削除されたレコードを処理します。
情報メモクエリがターゲットに存在する必要があるすべてのレコードを選択する場合は、増分ロードをオフに設定します。選択されていないレコードはターゲットで削除されます。 -
-
変換フローを作成する準備ができたら、 [追加] をクリックします。
フロー デザイナーが開き、ターゲットが作成されてターゲット コンポーネントとしてフローに表示されます。
情報メモフロー ターゲットには一意の名前を付ける必要があります。後でフロー ターゲットの名前を変更する場合は、そのターゲットを選択し、 [データセット名] に新しい名前を入力します。 - ターゲットにキーが定義されていない場合は、 [キーと Null 許容文字] 項目の横にある [編集] をクリックします。設定ウィンドウが開きます。 情報メモキーはソース データセットから継承されないため、手動で定義する必要があります。Null 許容文字はソース データセットから継承され、変更も可能です。
- [キーと Null 許容文字を設定] で、プライマリ キーとして定義する列で [キー] を、Null 許容文字として定義する列で [Null 許容文字] を選択します。
- [確認] をクリックして変更を保存し、設定ウィンドウを閉じます。
変換フローのステータスが有効であれば、フローを閉じてデータを準備できます。
マテリアライゼーションと増分ロードの設定は、後でターゲット設定で変更できます。
-
ターゲットを選択し、ターゲット構成の [設定] の横にある [編集] をクリックします。
プロセッサーを追加
フローにプロセッサーを追加できます。
プロセッサーとはパイプラインに追加できるフローのことで、受信されるデータを変換し、変換されたそのデータがフローの次のステップに返されるようにします。
- フロー デザイナーで、プロセッサーを追加するフロー コンポーネントを選択します。
- フロー コンポーネントで をクリックし、次に [プロセッサーを追加] をクリックして、追加するプロセッサーを選択します。左のパネルからキャンバスにプロセッサーをドラッグすることもできます。
- 必要に応じてプロセッサーを設定し、 [保存] をクリックして変更を保存してデータ プレビューを更新します。
利用可能なプロセッサー
ユース ケース: Snowflake でデータを結合、集計、フィルタリング
このユース ケースでは、Snowflake の顧客データをプロセッサーで変換する必要があります。顧客情報は 2 つのデータセットから来るので、まず Join (結合) プロセッサーを追加してレコードを結合します。また、注文の平均価格を計算するために Aggregate (集計) プロセッサーを使い、出力データセットに残したい顧客レコードのタイプをフィルタリングするために Filter (フィルター) プロセッサーを使います。
最初のデータセットは CUSTOMER_ACCOUNT という Snowflake テーブルに基づいており、そのスキーマは次のようになります:
2 番目のデータセットは CUSTOMER_ORDER という Snowflake テーブルに基づいており、そのスキーマは次のようになります:
- 左パネルの [プロセッサー] から、Join (結合) プロセッサーをキャンバスにドラッグします。
- 2 番目のソースを Join (結合) プロセッサーにリンクさせ、両方のデータセットからのデータを結合できるようにします。
- 顧客 ID キー (CUSTOMER_ID) で 2 つのソース データセットを結合するよう、Join (結合) プロセッサーを設定します。
- Join (結合) プロセッサーの後ろに Aggregate (集計) プロセッサーをドラッグします。
- 顧客セグメント タイプ (LEFT_CUSTOMER_SEGMENT) 別にレコードをグループ化しながら、顧客の平均購入金額 (ORDER_TOTAL_PRICE) を計算して新しい列に追加し、その列に avg_order_price という名前を付けられるよう、Aggregate (集計) プロセッサーを設定します。
- 左パネルの [プロセッサー] から、Filter (フィルター) プロセッサーをキャンバスにドラッグ アンド ドロップします。
- ビジネス顧客タイプ (Business) でフィルタリングするよう、Filter (フィルター) プロセッサーを設定します。
- Filter (フィルター) プロセッサーで を選択してメニューを開き、 [一致しないターゲットを追加] をクリックして、2 番目のターゲットをフローに追加します。
このターゲットには、フィルター条件に一致しなかったレコード、個々の顧客タイプ (Individual) が含まれます。
- 新しいターゲット データセットの名前 (individual_cust など) を入力します。
- 両方のターゲットで出力プレビューを確認します:
business_cust ターゲットは、ビジネス顧客タイプの平均注文価格 (ここでは 157.463687151) を示します。
individual_cust ターゲットは、個人顧客タイプの平均注文価格 (ここでは 153.576530612) を示します。
- 変換フローのステータスが有効であることを確認したら、それを閉じます。
- [変換] ウィンドウで、 [準備] をクリックしてデータを準備します。
ベスト プラクティス
ソース データセットやターゲット データセットをフローに追加する場合は、ターゲット データセットの設定パネルでキーと Null 許容文字を設定する必要があります。