ユースケース: 財務データを処理するパイプラインを作成
- Cloud API Services Platform
- Cloud Big Data
- Cloud Big Data Platform
- Cloud Data Fabric
- Cloud Data Integration
- Cloud Data Management Platform
- Cloud Pipeline Designer Standard Edition
- Data Fabric
階層化された財務データ(IBAN、口座、取引情報など)をエンリッチ化してフィルタリングし、実行済み取引の総額を集計してカウントするパイプラインの作成方法を説明します。
手順
- [Pipelines] (パイプライン)ページで[Add pipeline] (パイプラインを追加)をクリックします。新しいパイプラインが開きます。
-
上部にあるツールバーでパイプラインのデフォルト名の横にある鉛筆アイコンをクリックし、パイプラインにわかりやすい名前を付けます。
例
Process financial data - [ADD SOURCE] (ソースを追加)をクリックし、ソースデータを選択できるパネルを開きます。ここでは、前に作成したfinancial dataデータセットを使用します。
-
データセットを選択し、[Select] (選択)をクリックしてパイプラインに追加できるようにします。
データセットがソースとして追加され、JSONデータを既にプレビューできるようになります。
- をクリックし、パイプラインにPython 3プロセッサーを追加します。このプロセッサーは、入力データを処理しエンリッチ化するPythonコードをコピーするために使用されます。
-
プロセッサーに意味のある名前を付けます。
例
enrich with IBAN validation -
[Python code] (Pythonコード)エリアに、以下のコードを入力します:
import string; ## IBAN Validation function; ALPHA = {c: str(ord(c) % 55) for c in string.ascii_uppercase}; def reverse_iban(iban): return iban[4:] + iban[:4]; def check_iban(iban): return int(''.join(ALPHA.get(c, c) for c in reverse_iban(iban))) % 97 == 1; output = input; transaction = input['transaction']; this_account = transaction["this_account"]; account_routing = this_account["account_routing"]; account_iban = account_routing["address"].replace(" ", ""); output['iban_valid'] = check_iban(account_iban)
このコードによって以下の処理が可能になります。- IBAN構文の有効性を確認する
- 既存のレコードにiban_validという名前の新しいフィールドを追加し、IBANチェックの結果に応じて、値をtrueまたはfalseにする
-
[Save] (保存)をクリックして設定を保存します。
入力データが処理されます。変更はプレビューできます。iban_validという新しいフィールドがどのレコードにも追加されています。
- をクリックし、パイプラインにFilterプロセッサーを追加します。このプロセッサーは、承認されたトランザクション(ACでタグ付け)を拒否されたトランザクション(DCでタグ付け)から分離するために使用されます。
-
プロセッサーに意味のある名前を付けます。
例
filter on accepted transactions -
[Filters] (フィルター)エリアで次の操作を行います。
- [Input] (入力)リストで.transaction.details.typeを選択します。この値に基づいて顧客をフィルタリングします。
- レコードのフィルタリング時にファンクションを適用したくないので、[Optionally select a function to apply] (適用するファンクションをオプションとして選択)リストで[None] (なし)を選択します。
-
[Operator] (オペレーター)リストから[= =]を選択し、承認されたトランザクションをフィルタリングするため、[Value] (値)リストにACと入力します。
このエリアではavpath構文を使用できます。詳細は、avpathとは、そして使用する理由とはを参照してください。
- [Save] (保存)をクリックして設定を保存します。
入力データが処理されます。変更はプレビューできます。出力に保持されるのは許可されたトランザクション(AC)が含まれているレコードのみです。 - をクリックし、パイプラインにAggregateプロセッサーを追加します。このプロセッサーは、トランザクションをグルーピングし、トランザクションの合計金額を計算するために使用します。
-
プロセッサーに意味のある名前を付けます。
例
count transaction amounts with valid IBAN -
[Group by] (グループ基準)エリアで、集計セットに使用するフィールドを指定します。
- [Field path] (フィールドパス)リストで.transaction.details.descriptionを選択します。
- 新しいエレメントを追加し、このリストで.iban_validを選択します。
-
[Operations] (操作)エリアで集計操作を追加します:
- [Field path] (フィールドパス)リストで.transaction.details.value.amountを、[Operation] (操作)リストで[Sum] (合計)を選択します。
- 生成されたフィールドに名前(total_amountなど)を付けます。
- [Save] (保存)をクリックして設定を保存します。
入力データは適宜処理され、フィルタリング操作やグルーピング操作の後に計算データをプレビューできるようになります。IBANが有効な取引が252件、IBANが無効な取引が81件あります。 - パイプラインの[ADD DESTINATION] (デスティネーションを追加)アイテムをクリックしてパネルを開き、出力データ用に以前作成した財務データデータセットを選択します。テストデータセットはソースとデスティネーションで異なる動作をするため、データセットは入力と出力で同じものを使用できます。デスティネーションで使用する場合データは無視されます。
-
デスティネーションに意味のある名前を付けます。
例
processed data - [Save] (保存)をクリックして設定を保存します。