データ ストリームへの接続
Qlik Open Lakehouse プロジェクトでは、次のストリーミング サービスがサポートされています。イベントデータは継続的に取り込まれ、ダウンストリームのデータ統合、分析、AI 向けにほぼリアルタイムの可用性を確保し、最新の運用アクティビティを反映する低遅延のパイプラインが実現されます。
Apache Kafka や Amazon Kinesis などのストリーミング サービスは、運用イベントが発生したときにそれをキャプチャするための、耐久性があり高スループットのパイプラインを提供します。バッチ取り込みに依存するファイル ベースのソースとは異なり、ストリーミング ソースはイベントの生成に合わせてデータを継続的に配信するため、ファイルの生成やスケジュールの実行を待つことなく、ほぼリアルタイムでの処理が可能となります。プロデューサーは、スキーマを保持し、パーティショニングをサポートする構造化または半構造化メッセージを公開します。同じレコードに対するすべての更新と削除には、同じパーティション キーを使用する必要があります。Kafka と Kinesis は、トピックまたはストリーム全体ではなく、単一のパーティションまたはシャード内でのみ順序を保証します。一貫したパーティション キーを使用することで、特定のレコードに対する変更が正しい順序で処理されることが保証されます。Qlik は、イベント データを継続的に取り込むためのストリーミング ソースとして、Amazon S3 もサポートしています。
ストリーミング取り込みとバッチ取り込み
ストリーミング データ ソースとバッチ データ ソースの違いは、次のとおりです。
-
両方のソースで、イベントは毎分効率的に取り込まれ、低レイテンシー処理とほぼリアルタイムでの分析をサポートします。
-
ストリーミング以外のソースでは、まず既存データのフル ロードが行われ、その後変更が取り込まれます。ソースからフル ロード データをリロードすることもできます。
-
ストリーミング ソースでは、初期ロードと後続のイベントとの間に明確な区別はありません。Qlik は保持を管理でき、パーティションもサポートします。
Qlik Open Lakehouse プロジェクトでは、ストリーミング ソースはストリーミング ランディング タスクおよびストリーミング変換タスクでのみ使用できます。
-
ストリーミング データはストリーミング ランディング タスクを使用して取り込まれます。このタスクは、個別のファイルを処理するのではなく、イベントが到着するたびにそれを読み込み、データを Amazon S3 ランディングさせ、Avroファイルとして永続化します。このアプローチにより、スキーマの進化が維持され、構造体などの複雑なデータ型に対応できるとともに、継続的な取り込みモデルを維持しながら、最適化されたクエリ パフォーマンスによる効率的なストレージを実現します。
-
ストリーミングソースからデータを取り込むと、Iceberg 形式で保存される各データセットに対してストリーミング変換タスクが自動的に追加されます。必要に応じて、ストリーミング変換タスクを使用することで、構造の標準化やイベント ペイロードの拡張、ダウンストリームの利用モデルに合わせたデータの整合を実行できます。
-
ミラー データ タスクを使用すると、ストリーミング ソースからのデータセットをクラウド データウェアハウスにミラーリングでき、データを重複させることなく、ダウンストリームのシステムがストリーミング イベントを利用できるようになります。詳細については、「クラウド データ ウェアハウスへのデータのミラーリング」を参照してください。
データ型のマッピング
初期のソース スキーマは、パイプライン プロジェクトの作成時、PREPARE フェーズより前に取得されたデータサンプルに基づいています。スキーマの進化は、読み込み時に処理されます。STRUCT および ARRAY をサポートしないミラー タスクやその他のダウンストリーム タスクでは、JSON タイプが使用されます。データは SQL を使用して解析できます。
次のデータ型マッピングは、サポートされているすべてのデータ ソースに適用されますが、ソース ファイルのタイプに応じて異なり、次の点に注意してください。
-
データ型は、オンボーディングされるデータのサンプルから推論されます。たとえば、項目がサンプル内に整数値のみを含む場合、ストリーミング ランディングおよび変換タスクで INT8 として作成されます。後続のデータに倍精度小数値が含まれる場合、ランディング ファイルにはそれらの値が含まれます。ただし、ストリーミング変換タスクで、 [項目のデータ タイプを変更] 設定が [無視] に設定されている場合、列は INT8 のまま維持され、小数値は切り捨てられます。意図しない切り捨てを防ぐには、オンボーディングする前に、サンプル データに想定される値の全範囲が含まれていることを確認するか、初期段階で [項目のデータ タイプを変更] を [タスクを停止] に設定し、必要に応じてデータ タイプを調整します。
-
ソース内の構造体に項目が追加された場合、ランディング ターゲット側にも必ずその項目が追加されます。ストリーミング変換の場合、動作は [ストリーミング変換タスク設定] > [スキーマ進化] > [構造体に項目を追加] (ターゲットに適用、無視、タスクを停止) で選択されたオプションに従って適用されます。
-
特定のレコードに項目がない場合、または配列が空の場合、それらは null として扱われます。
-
データセットが配列によってフラット化され、その配列が空または null であるレコードが到着した場合、システムは 1 行を作成し、フラット化された項目は null になります。このような行が自動的に除外されることはありません。これらの行を除外する場合は、手動でフィルターを追加します。例: array_element IS NOT NULL。
-
UI に表示されるデータ型は、選択したデータセットの粒度を反映します。フラット化された配列については、配列構造そのものではなく、各要素のデータ型が表示されます。
-
ネストされた JSON 項目内の構造体内には新しい属性を追加できず、追加できるのはルート レベルのみです。
-
ストリーミング変換タスクでは、配列の単一レベルのみのフラット化がサポートされています。多レベル配列にフラット化が適用されると、たとえば ARRAY<ARRAY<STRUCT>> の場合、外側の配列のみがフラット化され、完全にフラット化された STRUCT ではなく ARRAY<STRUCT> になります。さらに、現在の UI では、フラット化は列レベルでのみ構成できます。結果として、多レベル配列を選択すると、最初の配列レベルにのみフラット化が暗黙的に適用されます。
JSON
JSON ファイルでは、ソースの数値によってターゲットのデータ型が決まります。
-
INT8 は、サポートされている整数範囲内に収まり、小数部を含まない整数値に使用されます。
-
REAL8 (DOUBLE) は、値に小数部 (浮動小数点数) が含まれる場合に使用されます。
-
STRING は、数値がサポートされている最大整数範囲を超える場合に使用されます。
データ型は次のようにマッピングされます。
| ソース データ型 | Qlik Talend Data Integration データ型 |
|---|---|
| STRING | STRING |
| NUMBER | INT8 |
| NUMBER | REAL8 |
| NUMBER | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| OBJECT | STRUCT |
CSV、TSV、REGEX、SPLIT
既定では、すべてのソース データ型は文字列として取り込まれます。[タイプを自動的に推論] のオプションを使用して、ソース型とターゲット型を次のようにマッピングします。
| ソース データ型 | Qlik データ型 |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | yyyy-MM-dd HH:mm:ss または yyyy-MM-ddTHH:mm:ssz 形式のタイムスタンプは、datetime 型として解析されます。タイムゾーンが含まれている場合、値は文字列として解析されます。 |
Parquet
Parquet ファイルは、物理データ型と論理データ型をサポートしています。物理データ型は、INT32、DOUBLE、または BYTE_ARRAY など、値がディスクにどのように保存されるかを定義します。論理データ型は、物理表現の上にセマンティックな意味を提供します。たとえば、整数値が日付を表すかどうかを識別します。Parquet 列に論理型が割り当てられており、Qlik Open Lakehouse でサポートされている場合 (以下に記載)、ストリーミング ランディング タスクは、基になる物理型ではなく、その論理データ型を使用してターゲット スキーマを定義します。これにより、データが正しく解釈されることが保証され、精度、スケール、時間的な意味合いといった本来のセマンティックが保持されます。その結果、データをダウンストリームの形式に書き出す際に、より正確なスキーマが生成されます。
Parquet ファイルから取得されたデータは、次のようにマッピングされます。
| ソース データ型 | 論理型 | Qlik Talend Data Integration データ型 |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTE_ARRAY | STRING (Base64 としてエンコード) | |
| FIXED_LEN_BYTE_ARRAY | STRING (Base64 としてエンコード) | |
| BYTE_ARRAY | STRING | STRING |
| BYTE_ARRAY | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| FIXED_LEN_BYTE_ARRAY | DECIMAL | INT8/REAL8 (Base64 としてエンコード) |
| BYTE_ARRAY | DECIMAL | INT8/REAL8 (Base64 としてエンコード) |
| INT32 | DATE | DATE |
| INT32 | TIME(MILLIS,true) | INT8 |
| INT64 | TIME(MICROS,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| NESTED TYPES | STRUCT | |
| LIST | ARRAY | |
| MAP | ARRAY<STRUCT>. キーと値のペアを表す構造体の配列。 |
Avro
次のマッピングは、スキーマ レジストリを使用する Avro ファイルに適用されます。
| ソース データ型 | 論理型 | Qlik Talend Data Integration データ型 |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| LONG | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| RECORD | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | ARRAY<STRUCT> | |
| UNION | ||
| FIXED | STRING | |
| BYTES | DECIMAL | DECIMAL |
| FIXED | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | TIME-MILLIS | INT8 |
| INT | TIME-MICROS | TIME |
| LONG | TIMESTAMP-MILLIS | DATETIME |
| LONG | TIMESTAMP-MICROS | DATETIME |
ORC
次のマッピングは、ORC ファイルに適用されます。
| ソース データ型 | Qlik Talend Data Integration データ型 |
|---|---|
| BOOLEAN | BOOLEAN |
| BYTE | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| LONG | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| LIST | ARRAY |
| MAP | ARRAY<STRUCT>. キーと値のペアを表す構造体の配列。 |
| STRUCT | STRUCT |
| UNION |
制限事項
次の制限事項は、すべてのデータソースに適用されます。
-
ファイルが異なる種類である場合 (複数のソースやバージョンに由来する場合などに発生し得ます)、単一のサンプルファイルを使用して作成された変換タスク (例: オンボーディング時に作成されたタスク) では、それらの差異が自動的に考慮されることはありません。
-
たとえば、データをハッシュする必要があるなどの理由でランディング タスクでデータ型を変更する場合は、変換データ型が新しいデータ型と一致していることを確認してください。