メイン コンテンツをスキップする 補完的コンテンツへスキップ

ストリーミング データ

オンボーディング プロセスでは、ソースからデータを転送し、Iceberg テーブルに保存します。ストリーミング データ ソースからの変更は、ほぼリアルタイムで継続的にストレージ テーブルに適用されます。

データをオンボード

データはパイプライン プロジェクト内でオンボーディングされ、データセットはプロジェクト設定で定義された S3 の場所に保存されます。

  1. プロジェクトで、 [作成] をクリックし、 [データのオンボード] をクリックします。

  2. オンボーディングの [タスク名] と、必要に応じて [説明] を追加します。

    [次へ] をクリックします。

  3. ソース接続を選択します。

    既存のストリーミング ソース接続を選択するか、ソースへの新しい接続を作成できます。

    詳細については、「データ ストリームへの接続」を参照してください。

    [次へ] をクリックし、データ ソースに合わせて次の手順に従います。

データを選択する

Apache Kafka

リストには、ソース接続で定義されたクラスター内で利用可能な Kafka トピックが表示されます。

トピックを選択する際、特定のデータセットを選択できます。また、選択ルールを使用して、データセットのグループを含めたり除外したりできます。

  • % をワイルドカードとして使用して、データセットの選択基準を定義します。

選択ルールを使用してトピックを選択した場合、すべてのデータセットを単一のターゲット テーブルにロードするか、ソース トピックごとに個別のターゲット テーブルを作成するかを選択できます。

  • 既定では、ターゲットとなる Iceberg テーブル名はソースのトピック名から導出され、命名規則に準拠するよう整形されます (例: 小文字化、スペースの削除、ハイフンをアンダースコアに置換など)。[ターゲットデータセット名の定義] で、ターゲット テーブルの名前を編集できます

  • 選択ルールを使用して複数のトピックを単一のテーブルにロードする場合、ターゲット名を指定する必要があります。

  • 選択ルールが使用され、データが個別のテーブル (トピックごとに 1 つのデータセット) にロードされる場合、既定のターゲット名はトピック名になります。この段階では、ウィザードで名前を編集することはできませんが、後ほどランディング タスクで編集できます。

  • 取り込みのためにトピックを選択するようにルールが構成されている場合、ランディング タスク設定のスキーマ進化の下にある [新しいトピック] > [ターゲットに追加] オプションがチェックされていると、ルールの条件を満たすすべての新しいトピックもランディングされます。

1 つ以上のデータセットを選択するか、選択ルールを指定して、 [追加] をクリックします。[次へ] をクリックします。

Amazon Kinesis

リストには、ソース接続で定義された利用可能な Kinesis ストリームが表示されます。

1 つ以上のデータセットを選択し、 [追加] をクリックします。[選択したストリーム] に、追加されたデータセットが表示されます。[次へ] をクリックします。

Amazon S3

ディレクトリ ブラウザーには、ソース接続の S3 バケットにあるすべてのディレクトリのリストが表示されます。 

情報メモデータパターンを使用してディレクトリを選択すると、パフォーマンスが向上します。
  • データをランディングするときに含めるディレクトリを選択します。

    • 各ディレクトリについて、 [パスを追加] で、パスとファイル名のパターンを入力します。

      • 任意の文字に一致させるワイルドカードとして、*を使用します。

      • 日付パターンを入力するには、4 桁の年を表すプレースホルダーとして <yyyy>、2 桁の月として <MM>、2 桁の日として <dd>、2 桁の時として <HH> を使用します。例:

        • MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv

        • MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv

  • [プレビュー] をクリックして、 [データのプレビュー] ダイアログを開きます。含まれるファイルと除外されるファイルのリストが表示されます。

  • [検証] をクリックして、パスとファイル名のパターンが正しく機能していることを確認します。

  • [ターゲットデータセット名の定義] で、トピックをターゲットの Iceberg テーブルにマッピングする名前を入力します。[次へ] をクリックします。

コンテンツ タイプの選択

ソース イベントのコンテンツ タイプを選択します。

  • [データ イベントのタイプを選択] で、取り込むイベントのタイプを選択します。

  • 詳細については、「データ ストリームへの接続」を参照してください。

    選択されたコンテンツ タイプは、すべてのトピック、データセット、またはデータ イベントに適用されます。取り込むコンテンツ タイプごとに、新しいタスクを作成する必要があります。

  • データが解析できることを確認するには、 [イベントが正しくロードされていることを確認] を展開します。この段階でデータが正しいことを確認しておくのが賢明です。不備がある場合は、パイプラインを再作成してデータを再度ロードする必要があります。[データセットを選択] を使用して特定のデータセットを調査し、データのロードに影響を与える可能性のある警告を確認します。データを表示するには、任意の構造体列の横にある目のアイコンをクリックします。

  • [次へ] をクリックします。

情報メモデータセットでイベントが検出されなかった場合、ランディングにメッセージが表示されます。読み取るイベントがある場合にタスクを実行する必要があり、列は自動的に追加されます。

取り込みプロパティの設定

パイプラインの設定を構成します。

  • データの読み取り元

    • もっとも古いイベントから開始: すべての履歴データを取り込みます。

    • 現在から開始: パイプラインの開始時点以降に到着する新しいデータを取り込みます。

  • 列のアンネスト

    • ネストされた列を保持: 変換は適用されません。

    • 個別の列にアンネスト: データは個別の列に分割されます。

  • 新しいデータ セットのロード設定

    • 追加のみ: イベントデータは通常、ライフサイクルが短く更新されないため、このオプションが最適です。例: 注文

    • 変更の適用: 時間の経過とともにデータが更新されるケースにもっとも適しています。例: 顧客。キー項目に基づいて既存のレコードを更新し、新しいレコードを挿入します。後でタスクを定義するときに、キー項目を指定する必要があります。

  • ターゲット テーブルのパーティション

    ターゲット テーブルのパーティション オプションは、パイプライン内のすべてのテーブルに適用されます。後でテーブル レベルで設定を上書きし、カスタムパーティションを定義できます。

    情報メモこのオプションは、 [ロード設定] で [追加のみ] が選択されている場合にのみ使用できます。
    • パーティションなし: テーブルはパーティション化されずに作成されます。

    • イベント取り込み日でパーティション化: テーブルは、イベントが取り込まれた日付でパーティション化されます。

  • データ変更処理

    情報メモこのオプションは、 [ロード設定] で [変更の適用] が選択されている場合にのみ使用できます。
    • ソフト削除を含める: 削除対象としてマークするレコードを定義する式を入力します。

    • 履歴データ ストア (タイプ 2) を作成する: これにより、変更されたレコードの以前のバージョンが保持されます。

  • [次へ] をクリックします。

概要

概要画面には、パイプラインの構成が視覚的に表示されます。

  • オプションとして、「ストリーミングランディング」および「ストリーミング変換」タスクについて、 [名前と説明を編集] をクリックして新しい値を入力できます。

  • [パイプラインの作成後] に実行するオプションを選択します。

  • すべての設定を完了したら、 [作成] をクリックしてストリーミング パイプラインを作成します。

  • プロジェクトが表示されたら、各タスクを準備して実行し、データの取り込みを開始できます。

データ型のマッピング

初期のソース スキーマは、パイプライン プロジェクトの作成時、PREPARE フェーズより前に取得されたデータサンプルに基づいています。スキーマの進化は、読み込み時に処理されます。STRUCT および ARRAY をサポートしないミラー タスクやその他のダウンストリーム タスクでは、JSON タイプが使用されます。データは SQL を使用して解析できます。

次のデータ型マッピングは、サポートされているすべてのデータ ソースに適用されますが、ソース ファイルのタイプに応じて異なり、次の点に注意してください。

  • データ型は、オンボーディングされるデータのサンプルから推論されます。たとえば、項目がサンプル内に整数値のみを含む場合、ストリーミング ランディングおよび変換タスクで INT8 として作成されます。後続のデータに倍精度小数値が含まれる場合、ランディング ファイルにはそれらの値が含まれます。ただし、ストリーミング変換タスクで、 [項目のデータ タイプを変更] 設定が [無視] に設定されている場合、列は INT8 のまま維持され、小数値は切り捨てられます。意図しない切り捨てを防ぐには、オンボーディングする前に、サンプル データに想定される値の全範囲が含まれていることを確認するか、初期段階で [項目のデータ タイプを変更] を [タスクを停止] に設定し、必要に応じてデータ タイプを調整します。

  • ソース内の構造体に項目が追加された場合、ランディング ターゲット側にも必ずその項目が追加されます。ストリーミング変換の場合、動作は [ストリーミング変換タスク設定] > [スキーマ進化] > [構造体に項目を追加] (ターゲットに適用無視タスクを停止) で選択されたオプションに従って適用されます。

  • 特定のレコードに項目がない場合、または配列が空の場合、それらは null として扱われます。

  • データセットが配列によってフラット化され、その配列が空または null であるレコードが到着した場合、システムは 1 行を作成し、フラット化された項目は null になります。このような行が自動的に除外されることはありません。これらの行を除外する場合は、手動でフィルターを追加します。例: array_element IS NOT NULL

  • UI に表示されるデータ型は、選択したデータセットの粒度を反映します。フラット化された配列については、配列構造そのものではなく、各要素のデータ型が表示されます。

  • ネストされた JSON 項目内の構造体内には新しい属性を追加できず、追加できるのはルート レベルのみです。

  • ストリーミング変換タスクでは、配列の単一レベルのみのフラット化がサポートされています。多レベル配列にフラット化が適用されると、たとえば ARRAY<ARRAY<STRUCT>> の場合、外側の配列のみがフラット化され、完全にフラット化された STRUCT ではなく ARRAY<STRUCT> になります。さらに、現在の UI では、フラット化は列レベルでのみ構成できます。結果として、多レベル配列を選択すると、最初の配列レベルにのみフラット化が暗黙的に適用されます。

  • プリミティブの配列を参照する場合、粒度が配列であれば要素のデータ タイプが使用されます。それ以外の場合は、配列のデータ タイプが使用されます。

    この例では、OrderDetails にはデータ タイプが INT の CustomerID の配列があります。粒度が OrderDetails.CustomerID の場合 OrderDetails.CustomerID は INT を意味し、粒度が OrderDetails の場合は ARRAY<INT> を意味します。

JSON

JSON ファイルでは、ソースの数値によってターゲットのデータ型が決まります。

  • INT8 は、サポートされている整数範囲内に収まり、小数部を含まない整数値に使用されます。

  • REAL8 (DOUBLE) は、値に小数部 (浮動小数点数) が含まれる場合に使用されます。

  • STRING は、数値がサポートされている最大整数範囲を超える場合に使用されます。

データ型は次のようにマッピングされます。

ソース データ型 Qlik Talend Data Integration データ型
STRING STRING
NUMBER INT8
NUMBER REAL8
NUMBER STRING
BOOLEAN BOOLEAN
ARRAY ARRAY
OBJECT STRUCT

CSV、TSV、REGEX、SPLIT

既定では、すべてのソース データ型は文字列として取り込まれます。[タイプを自動的に推論] のオプションを使用して、ソース型とターゲット型を次のようにマッピングします。

ソース データ型 Qlik データ型
NUMERIC INT8/REAL8
True/TRUE/true/False/FALSE/false BOOLEAN
TIMESTAMP yyyy-MM-dd HH:mm:ss または yyyy-MM-ddTHH:mm:ssz 形式のタイムスタンプは、datetime 型として解析されます。タイムゾーンが含まれている場合、値は文字列として解析されます。

Parquet

Parquet ファイルは、物理データ型と論理データ型をサポートしています。物理データ型は、INT32、DOUBLE、または BYTE_ARRAY など、値がディスクにどのように保存されるかを定義します。論理データ型は、物理表現の上にセマンティックな意味を提供します。たとえば、整数値が日付を表すかどうかを識別します。Parquet 列に論理型が割り当てられており、Qlik Open Lakehouse でサポートされている場合 (以下に記載)、ストリーミング ランディング タスクは、基になる物理型ではなく、その論理データ型を使用してターゲット スキーマを定義します。これにより、データが正しく解釈されることが保証され、精度、スケール、時間的な意味合いといった本来のセマンティックが保持されます。その結果、データをダウンストリームの形式に書き出す際に、より正確なスキーマが生成されます。

Parquet ファイルから取得されたデータは、次のようにマッピングされます。

ソース データ型 論理型 Qlik Talend Data Integration データ型
BOOLEAN   BOOLEAN
INT32   INT8
INT64   INT8
INT96   DATETIME
FLOAT   REAL8
DOUBLE   REAL8
BYTE_ARRAY   STRING (Base64 としてエンコード)
FIXED_LEN_BYTE_ARRAY   STRING (Base64 としてエンコード)
BYTE_ARRAY STRING STRING
BYTE_ARRAY ENUM STRING
INT32 DECIMAL INT8
INT64 DECIMAL INT8
FIXED_LEN_BYTE_ARRAY DECIMAL INT8/REAL8 (Base64 としてエンコード)
BYTE_ARRAY DECIMAL INT8/REAL8 (Base64 としてエンコード)
INT32 DATE DATE
INT32 TIME(MILLIS,true) INT8
INT64 TIME(MICROS,true) TIME
INT64 TIMESTAMP(MICROS,true) DATETIME
INT64 TIMESTAMP(MILLIS,true) DATETIME
NESTED TYPES   STRUCT
LIST   ARRAY
MAP   ARRAY<STRUCT>. キーと値のペアを表す構造体の配列。

Avro

次のマッピングは、スキーマ レジストリを使用する Avro ファイルに適用されます。

ソース データ型 論理型 Qlik Talend Data Integration データ型
BOOLEAN   BOOLEAN
INT   INT8
LONG   INT8
FLOAT   REAL8
DOUBLE   REAL8
BYTES   STRING
STRING   STRING
RECORD   STRUCT
ENUM   STRING
ARRAY   ARRAY
MAP   ARRAY<STRUCT>
UNION    
FIXED   STRING
BYTES DECIMAL DECIMAL
FIXED DECIMAL DECIMAL
INT DATE DATE
INT TIME-MILLIS INT8
INT TIME-MICROS TIME
LONG TIMESTAMP-MILLIS DATETIME
LONG TIMESTAMP-MICROS DATETIME

ORC

次のマッピングは、ORC ファイルに適用されます。

ソース データ型 Qlik Talend Data Integration データ型
BOOLEAN BOOLEAN
BYTE INT8
SHORT INT8
INT INT8
LONG INT8
DATE DATE
FLOAT REAL8
DOUBLE REAL8
TIMESTAMP DATETIME
BINARY STRING
DECIMAL REAL8
STRING STRING
VARCHAR STRING
CHAR STRING
LIST ARRAY
MAP ARRAY<STRUCT>. キーと値のペアを表す構造体の配列。
STRUCT STRUCT
UNION  

制限と考慮事項

  • ランディングで自動スキーマ進化によって構造または配列が変更された場合、Qlik Talend Cloud ストリーミング タスクによって作成されなかったダウンストリーム ビューは、古くならないように更新が必要になる場合があります。

  • タスクに解析エラーがある場合、エラー状態にはならず、注意が必要としてマークされません。解析エラーは常に増加するメトリクスであるため、エラー状態の終了基準はありません。

  • クラスター機能の削除は、その機能を使用しているタスクがない場合にのみ許可されます。

  • 同じプライマリ キーを持つレコードに対する更新と削除は、パーティション境界を越えてはなりません。つまり、同じパーティションにマッピングされる必要があります。

  • ソースに多数の列が含まれている場合、タスクおよびカタログには、出現頻度の高い上位 500 列のみが表示されます。すべての列は S3 ランディングの Avro ファイルに保存されますが、上位 500 列のみが Iceberg テーブルに保存されます。スキーマ進化で新しい列が追加された場合、その列の出現頻度が高かったとしても上位の列には追加されません。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。