ストリーミング データ
オンボーディング プロセスでは、ソースからデータを転送し、Iceberg テーブルに保存します。ストリーミング データ ソースからの変更は、ほぼリアルタイムで継続的にストレージ テーブルに適用されます。
データをオンボード
データはパイプライン プロジェクト内でオンボーディングされ、データセットはプロジェクト設定で定義された S3 の場所に保存されます。
-
プロジェクトで、 [作成] をクリックし、 [データのオンボード] をクリックします。
-
オンボーディングの [タスク名] と、必要に応じて [説明] を追加します。
[次へ] をクリックします。
-
ソース接続を選択します。
既存のストリーミング ソース接続を選択するか、ソースへの新しい接続を作成できます。
詳細については、「データ ストリームへの接続」を参照してください。
[次へ] をクリックし、データ ソースに合わせて次の手順に従います。
データを選択する
Apache Kafka および Amazon Kinesis
リストには、ソース接続で定義されたホストからの利用可能な Kafka トピックまたは Kinesis ストリームが表示されます。
トピックやストリームを選択する際、特定のデータセットを選択し、選択ルールを使用してデータセットのグループを含めたり除外したりできます。
-
% をワイルドカードとして使用して、データセットの選択基準を定義します。
-
%.% と指定すると、すべてのストリームに含まれるすべてのデータセットが対象となります。
選択ルールを使用してトピックを選択した場合、すべてのデータセットを単一のターゲット テーブルにロードするか、ソース トピックごとに個別のターゲット テーブルを作成するかを選択できます。
-
既定では、ターゲットとなる Iceberg テーブル名はソースのトピック名から導出され、命名規則に準拠するよう整形されます (例: 小文字化、スペースの削除、ハイフンをアンダースコアに置換など)。[ターゲットデータセット名の定義] で、ターゲット テーブルの名前を編集できます
-
選択ルールを使用して複数のトピックを単一のテーブルにロードする場合、ターゲット名を指定する必要があります。
-
選択ルールが使用され、データが個別のテーブル (トピックごとに 1 つのデータセット) にロードされる場合、既定のターゲット名はトピック名になります。この段階では、ウィザードで名前を編集することはできませんが、後ほどランディング タスクで編集できます。
-
取り込みのためにトピックを選択するようにルールが構成されている場合、ランディング タスク設定のスキーマ進化の下にある [新しいトピック] > [ターゲットに追加] オプションがチェックされていると、ルールの条件を満たすすべての新しいトピックもランディングされます。
1 つ以上のデータセットを選択し、 [選択したストリームを追加] をクリックします。[明示的に選択したストリーム] の下に、追加されたデータセットが表示されます。[Next] (次へ)をクリックします。
Amazon S3
ディレクトリ ブラウザーには、ソース接続の S3 バケットにあるすべてのディレクトリのリストが表示されます。
-
データをランディングするときに含めるディレクトリを選択します。
-
各ディレクトリについて、 [パスを追加] で、パスとファイル名のパターンを入力します。
-
任意の文字に一致させるワイルドカードとして、*を使用します。
-
日付パターンを入力するには、4 桁の年を表すプレースホルダーとして <yyyy>、2 桁の月として <MM>、2 桁の日として <dd>、2 桁の時として <HH> を使用します。例:
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
[プレビュー] をクリックして、 [データのプレビュー] ダイアログを開きます。含まれるファイルと除外されるファイルのリストが表示されます。
-
[検証] をクリックして、データを確認します。
-
[ターゲットデータセット名の定義] で、トピックをターゲットの Iceberg テーブルにマッピングする名前を入力します。[次へ] をクリックします。
コンテンツ タイプの選択
ソース イベントのコンテンツ タイプを選択します。
-
[データ イベントのタイプを選択] で、取り込むイベントのタイプを選択します。
-
詳細については、「データ ストリームへの接続」を参照してください。
選択したコンテンツ タイプは、すべてのトピックに適用されます。取り込むコンテンツ タイプごとに、新しいタスクを作成する必要があります。
-
データが解析できることを確認するには、 [イベントが正しくロードされていることを確認] を展開します。この段階でデータが正しいことを確認する必要があります。不備がある場合は、パイプラインを再作成してデータを再度ロードする必要があります。[データセットを選択] を使用して特定のデータセットを調査し、データのロードに影響を与える可能性のある警告を確認します。データを表示するには、任意の構造体列の横にある目のアイコンをクリックします。
-
[次へ] をクリックします。
取り込みプロパティの設定
パイプラインの設定を構成します。
-
データの読み取り元
-
もっとも古いイベントから開始: すべての履歴データを取り込みます。
-
現在から開始: パイプラインの開始時点以降に到着する新しいデータを取り込みます。
-
-
列のアンネスト
-
ネストされた列を保持: 変換は適用されません。
-
個別の列にアンネスト: データは個別の列に分割されます。
-
-
ロード設定
-
追加のみ: イベントデータは通常、ライフサイクルが短く更新されないため、このオプションが最適です。例: 注文。
-
マージ: 時間の経過とともにデータが更新されるケースにもっとも適しています。例: 顧客。
-
-
ターゲット テーブルのパーティション
ターゲット テーブルのパーティション オプションは、パイプライン内のすべてのテーブルに適用されます。後でテーブルレベルで設定を上書きし、個別のパーティション設定を行うこともできます。
-
パーティションなし: テーブルはパーティション化されずに作成されます。
-
イベント取り込み日でパーティション化: テーブルは、イベントが取り込まれた日付でパーティション化されます。
-
-
[Next] (次へ)をクリックします。
概要
概要画面には、パイプラインの構成が視覚的に表示されます。
-
オプションとして、「ストリーミングランディング」および「ストリーミング変換」タスクについて、 [名前と説明を編集] をクリックして新しい値を入力できます。
-
[パイプラインの作成後] に実行するオプションを選択します。
-
すべての設定を完了したら、 [作成] をクリックしてパイプライン プロジェクトを作成します。
-
プロジェクトが表示されたら、各タスクを準備して実行し、データの取り込みを開始できます。
-
ストリーミング ランディング タスクを準備して実行します。
詳細については、「Qlik Open Lakehouse へのストリーミング データのランディング」を参照してください。
-
ストリーミング変換タスクを準備して実行します。
詳細については、「ストリーミング データセットの保存」を参照してください。
-