Apache Kafka データ ストリーム
Apache Kafka クラスターに接続し、Qlik Open Lakehouse プロジェクトにおけるストリーミング データソースとして利用できます。Kafka 接続は、ストリーミング ランディング タスクとストリーミング変換タスクでのみ使用できます。
Qlik Open Lakehouse を使用すると、組織はオープンでスケーラブルなアーキテクチャ上に、リアルタイムかつ分析準備の整ったデータパイプラインを構築できます。Apache Kafka をストリーミング ソースとして統合することで、Qlik は大容量のイベント データを Apache Iceberg テーブルに継続的に取り込むプロセスをサポートします。この組み合わせにより、低レイテンシーのデータ可用性と堅牢なスキーマ進化が実現され、チームはリアルタイムで得られたインサイトを実務に活用し、後続のデータ変換プロセスを加速させることができます。
ストリーミング ランディング タスクとストリーミング変換タスクにより、Kafka トピックを Qlik Open Lakehouse プロジェクトの中心的なコンポーネントとして位置づけることができます。データが Iceberg にストリームされると、分析、AI、機械学習ワークロードですぐにアクセスできるようになり、時間制約のある意思決定とスケーラブルなデータ エンジニアリングの実践をサポートします。その結果、クエリ実行に最適化された統合的なデータレイヤーが構築され、ストリーミング アーキテクチャ全体の信頼性とパフォーマンスが向上します。クラウド データ ウェアハウス クエリ エンジンを使用して Kafka からデータを分析するには、データを Qlik Open Lakehouse にランディングして保存し、 [Mirror data task (ミラー データ タスク)] を使用してデータをウェアハウスにミラーリングします。
前提条件
Kafka ストリーミング ソースを作成および使用する際には、次の要件が適用されます。
-
ブローカー サーバーへのネットワーク接続を持つネットワーク統合が必要です。
-
接続する Kafka クラスターが、ランディングタスクを実行する Lakehouse クラスターが配置されている VPC からアクセス可能であることを確認してください。
-
Kafka ストリーミング ソース接続には、Qlik Open Lakehouse のターゲット プラットフォームが必要です。
Kafka 接続プロパティの設定
Kafka 接続を構成するには、次の手順を実行します。
-
[接続] で、[接続を作成] をクリックします。
-
接続を作成する [スペース] を選択するか、 [新しいデータ スペースを作成] を選択します。
-
[コネクタ名] リストから [Kafka] を選択するか、検索ボックスを使用します。タイプがソースで、カテゴリがストリーミングであることを確認します。
-
次のプロパティを設定します。
データ ソース
データソースの接続プロパティを次のように設定します。
-
リストから [ネットワーク統合] を選択します。
-
[ブローカーサーバー] に、hostname:port という形式で単一のホストを入力します。例: host1:9092。
ホストのリストを入力するには、hostname:port, hostname:port の形式を使用します。例: host1:9092,host2:9092。
認証の詳細
-
リストから [認証方法] を選択します。
-
SASL/SCRAM-SHA-512: このオプションは、SCRAM-SHA-512 メカニズムを使用してユーザー名とパスワードによる認証を行います。これは SCRAM のバリエーションの中でもっとも安全性が高いものであり、Kafka クラスター側にも対応する SCRAM-SHA-512 認証情報が設定されている必要があります。
-
:
SASL/SCRAM-SHA-256
接続の [ユーザー名] と [パスワード] を入力します。
TLS
オプションとして、認証局 (CA) を追加できます。
CAを追加するには、 [Use custom trust CA] (カスタム信頼 CA を使用) を選択します。
[CA パス] に、Qlik Cloud にアップロードする CA ファイルのパスを入力します。CA ファイルは、タスクを実行するクラスターで使用できます。
追加の Kafka プロパティ
追加の Kafka プロパティはオプションです。
リソースの識別、整理、管理に役立つ、任意のタグのキーと値を追加します。
スキーマ レジストリへの接続
スキーマ レジストリ サーバーはオプションです。
スキーマ レジストリに接続するには、 [スキーマ レジストリ サーバーを設定] をクリックして次の設定を構成します。
スキーマ レジストリ URI: URI を http://schema-registry1.example.com:8081;http://schema-registry2.example.com:8081 の形式で入力します。
ユーザー名: サーバー接続用のユーザー名を入力します。
パスワード: サーバー接続用のパスワードを入力します。
スキーマ レジストリへの接続の TLS
スキーマ レジストリ サーバーを構成する場合、認証局 (CA) を追加するオプションを選択できます。
CA を追加するには、 [Use custom trust CA] (カスタム信頼 CA を使用) を選択します。
[CA パス] に、Qlik Cloud にアップロードする CA ファイルのパスを入力します。CA ファイルは、タスクを実行するクラスターで使用できます。
接続の作成
セキュリティ方法の設定が完了したら、次の手順に従って接続を作成します。
[名前] に、接続の表示名を入力します。例: My Kafka Streaming Source connection
[接続をテスト] をクリックして、資格情報を検証します。
[作成] をクリックします。
トピックをデータセットにマッピングする
Kafka ソースから取り込む場合、次のユース ケースがサポートされています。
| トピック | ターゲット データセット | ユース ケース | マッピング |
|---|---|---|---|
| 1 | 1 | 各トピックはターゲット データセットにロードされます。 | ストリーミングランディング タスクのデータセット マッピングでサポートされています。 |
| 1 つ | 複数 | トピックを複数のデータセットに複製します。 | [ターゲットに追加] を複数回使用することでサポートされます。 |
| 1 | 複数 | イベントを複数のターゲットに分割します。例: 1 つのイベントに含まれる orders と order lines を別々のデータセットに分割。 | ストリーミング 変換タスクでサポートされています。データセットを複製し、各データセットで異なる項目を選択します。または、Fork プロセッサーとSelect columns プロセッサーを変換フロー内で使用します。 |
| 1 | 複数 | 特定の列値に基づいて、トピックを複数のデータセットに分割します。 | ストリーミング 変換タスクでサポートされています。トピックを異なるデータセットに分割するために使用される各列値に対して、Filter プロセッサーを構成します。一致しないレコードを処理するには、一致しないデータを別のデータセットに出力する追加のFilter プロセッサーを構成します。 |
| 複数 | 1 | 特定の条件を満たすすべてのトピックを同じターゲット データセットに取り込むか、特定のトピックを同じデータセットに取り込みます。 | ストリーミング ランディング タスクのデータセット マッピングでサポートされています。複数のトピックが単一のデータセットにロードされ、トピックのロードタスクのいずれかが失敗した場合、データセットはエラーになり、他のトピックのロードは中断されます。 |