Kafka ターゲット
このトピックでは、Kafka ターゲット コネクタを使用して Kafka ターゲットへの接続を構成する方法について説明します。Kafka は、レプリケーション タスクのターゲットとしてのみ使用できます。
Kafka をターゲットとして設定するには、次が必要です。
- 前提条件を満たす
- Kafka への接続を構成する
接続プロパティの設定
コネクタを構成するには、次の手順を実行します。
-
[接続] で、 [接続を作成] をクリックします。
-
Kafka ターゲット コネクタを選択し、次の設定を行います。
データ ターゲット
データ ゲートウェイ
Amazon MSK にデータを移動する場合、Data Movement gateway が必要になるのは、Amazon MSK が Qlik Cloud からアクセスできず、プライベート リンクを使用してのみアクセスできる場合 (仮想プライベートクラウド内にある場合など) のみです。このような場合は、ターゲット データベースにアクセスする Data Movement gateway を選択します。ユースケースに応じて、これはデータ ソースからデータを移動するために展開された、同じまたは別の Data Movement gateway になります。
Qlik Cloud から直接アクセスできる Amazon MSK インスタンスにデータを移動する場合は、 [なし] を選択します。
-
オンプレミスの Kafka にデータを移動する場合 (以下の クラウド プロバイダー を参照)、Kafka がQlik Cloud から直接アクセスできるかどうかにかかわらず、Data Movement gateway を選択する必要があります。
-
Data Movement gateway を使用する必要がある場合は、バージョン 2025.5.40 以降が必要です。
Data Movement gateway のユース ケースに関する詳細については、「どのようなときに Data Movement gateway が必要ですか?」および「一般的な使用例」を参照してください。
[Cloud provider] (クラウドプロバイダー)
オンプレミスの Kafka を使用する場合は [なし] を、Amazon MSK を使用する場合は [Amazon MSK] を選択します。
ブローカー サーバー
1 つ以上のブローカー サーバーを次の形式で指定します (高可用性のため)。
server1[:port1][,server2[:port2]]
例:
192.168.1.100:9092,192.168.1.101:9093
データタスクは、最初に利用可能なホストに接続します。ポートを指定せずにホストが指定された場合、ポート 9092 が既定として使用されます。
SSL または Kerberos 認証を使用する場合、ブローカーの FQDN (つまり、IP アドレスではない) を指定する必要があります。
クラスター内のすべてのブローカー サーバーは、Data Movement gateway マシンからアクセスできる必要があります。ただし、すべてのサーバーを指定する必要はありません ブローカー サーバー 項目にペーストします。これは、Data Movement gateway がクラスター内の他のサーバーの接続詳細を取得するために、1 つのサーバーに接続するだけでよいためです。したがって、タスク実行時にもっとも利用可能である可能性が高いサーバーを指定することがベスト プラクティスです。データ タスクがメッセージを生成するサーバーは、トピック、パーティショニング トピック、パーティショニング設定によって決定されます。
アカウントのプロパティ
認証方法
次のいずれかを選択します。
- なし: 認証なし。
-
証明書に設定して有効にします。このオプションを選択した場合は、次の情報も提供する必要があります。
メモ パブリック キー ファイルとプライベートキー ファイルは PEM 形式である必要があります。
- パブリック キー ファイル: PEM 形式のパブリック キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
- プライベートキーファイルに設定して有効にします。PEM 形式のプライベート キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
- プライベート キー パスワードに設定して有効にします。プライベート キー ファイルのパスワード。
-
Kerberos (SASL/GSSAPI): Kerberos を使用して Kafka クラスターに対して認証をする場合に選択します。
情報メモ選択された [クラウド プロバイダー] が [Amazon MSK] の場合、この認証方法は使用できません。
- プリンシパル: ブローカー サーバーに対して認証するために使用される Kerberos プリンシパル。
- keytab ファイル: Keytab ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
情報メモLinux で Kerberos 認証を使用するには、Kerberos クライアント (ワークステーション) パッケージがインストールされている必要があります。
-
ユーザー名とパスワード (SASL/PLAIN): このオプションを選択すると、ユーザー名とパスワード (SASL/PLAIN) を使用して認証できます。パスワードが平文で送信されるのを防ぐため、 [TLS を有効にする] オプションも有効にすることを強くお勧めします。
情報メモ選択した [クラウド プロバイダー] が [Amazon MSK] の場合、この認証方法は使用できません。
-
ユーザー名とパスワード (SASL/SCRAM-SHA-256): このオプションを選択すると、ユーザー名とパスワード (SASL/SCRAM-SHA-256) を使用して認証できます。
このオプションを選択する場合は、各ブローカーの server.properties ファイルにも対応する SASL/SCRAM メカニズムを設定する必要があります。
情報メモ選択した [クラウド プロバイダー] が [Amazon MSK] の場合、この認証方法は使用できません。
-
ユーザー名とパスワード (SASL/SCRAM-SHA-512): このオプションを選択すると、ユーザー名とパスワード (SASL/SCRAM-SHA-512) を使用して認証できます。
このオプションを選択する場合は、各ブローカーの server.properties ファイルにも対応する SASL/SCRAM メカニズムを設定する必要があります。
SSL オプション
TLS を有効にする (TLS 1.0、1.1、1.2 をサポート)
このオプションは、Qlik Talend Cloud とブローカー サーバー間の通信を暗号化する場合に選択します。ブローカーが SSL を要求するように構成されている場合、このオプションを必ず選択してください。
-
CA ファイル: PEM 形式の CA 証明書を参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
情報メモ選択した [クラウド プロバイダー] が [Amazon MSK] の場合は必要ありません。
メタデータ メッセージの公開
公開にスキーマ レジストリに使用できるメタデータ メッセージ
ドロップダウン リストから、次のオプションのいずれかを選択します。
-
いいえ
このオプションを選択すると、データ メッセージのみが公開されます。
-
Confluent Schema Registry
このオプションを選択する場合は、以下に説明するスキーマ レジストリ接続プロパティも構成する必要があります。
-
Confluent Schema Registry のオプションは、Avro メッセージ形式のみをサポートしています。
-
データメッセージと同じトピックにスキーマ メッセージを公開しないことを強くお勧めします。
-
トピックが存在しない場合は、データタスクが実行時にトピックを作成できるように、auto.create.topics.enable=true を使用してブローカーを設定します。存在しない場合、タスクは失敗します。
メッセージ形式
メッセージ形式として JSON または Avro を選択します。Confluent Schema Registry を選択した場合、Avro のみが利用可能なオプションになります。
スキーマ レジストリ接続プロパティ
スキーマ レジストリ サーバー
1 つ以上のスキーマ レジストリ サーバーを次の形式で指定します (高可用性のため)。
Confluent Schema Registry にデータ スキーマを公開する場合:
server1:port1[,server2[:port2]]
192.168.1.100:8081,192.168.1.101:8081
データタスクは、最初に利用可能なホストに接続します。
Hortonworks Schema Registry にデータ スキーマを公開する場合:
server1:port1[,server2[:port2]]
192.168.1.100:7788,192.168.1.101:7788
データタスクは、最初に利用可能なホストに接続します。
認証方法
次のスキーマ レジストリ認証オプションのいずれかを選択します。
- なし: 認証なし。
-
Kerberos: Kerberos を使用して認証する場合に選択します。
情報メモLinux で Kerberos 認証を使用するには、Kerberos クライアント (ワークステーション) パッケージがインストールされている必要があります。
- プリンシパル: スキーマ レジストリに対して認証するために使用される Kerberos プリンシパル。
-
Keytab ファイル: Keytab ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
-
証明書: 証明書を使用して認証する場合に選択します。
情報メモこのオプションは、Confluent Schema Registryに公開する場合にのみサポートされます。
このオプションを選択した場合は、次の情報も提供する必要があります。
- パブリック キー ファイル: PEM 形式のパブリック キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
- プライベート キー ファイル: PEM 形式のプライベート キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
- プライベート キー パスワード: プライベート キー ファイルのパスワード。
-
ユーザー名とパスワード: ユーザー名とパスワードで認証する場合に選択します。次に、 [ユーザー名] と [パスワード] 項目にログイン資格情報を入力します。
情報メモこのオプションは、Confluent Schema Registry に公開する場合にのみサポートされます。
-
証明書 + ユーザー名とパスワード: 証明書とユーザー名とパスワードの両方を使用して認証する場合に選択します。
このオプションを選択した場合、上記の [パブリック キー ファイル]、 [プライベート キー ファイル]、 [プライベート キー パスワード]、 [ユーザー名]、 [パスワード] 項目に必須情報を入力します。
情報メモこのオプションは、Confluent Schema Registry に公開する場合にのみサポートされます。
- TLS を有効にする (TLS 1.0、1.1、1.2 をサポート): Data Movement gateway マシンとスキーマ レジストリ サーバー間のデータを暗号化するには、このオプションを選択します。サーバーが SSL を要求するように構成されている場合、このオプションを必ず選択してください。
CAファイル: PEM 形式の CA 証明書を参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
内部プロパティ
内部プロパティは、特殊なユース ケース向けであるため、ダイアログで公開されません。Qlik サポートによって指示があった場合にのみ使用する必要があります。
項目右側にある と
ボタンを使用して、必要に応じてプロパティを追加したり削除したりします。
名前
接続の表示名です。
前提条件
Kafka をターゲットとして使用するには、以下の前提条件を満たす必要があります。
-
データ タスクがターゲット トピックに書き込めるように、権限を設定します。これは、Kafka ACLs スクリプト (kafka-acls) を使用して実行することもできます。
-
データ タスクを開始する前に、attrep_apply_exceptions という名前のトピックを作成するか、auto.create.topics.enable=true でブローカーを設定します。
このトピックが存在しない場合、エラー処理ポリシーに関係なく、データ エラーが発生するとタスクは常に失敗します。
attrep_apply_exceptions コントロール テーブルの説明については、「適用の例外」を参照してください。
制限と考慮事項
Kafka をターゲット コネクタとしてタスクを定義する場合、次の制限が適用されます。
-
サポートされていないタスク設定:
-
Kafka ターゲット コネクタは、無制限の LOB サイズをサポートしていません。したがって、LOB 列を持つソース テーブルからデータを移動する場合、 [無制限の LOB サイズを許可] オプションを選択しないでください。
-
変更の保存モードはサポートされていません。
-
バッチ最適化適用モードはサポートされていません。Kafka は常にトランザクションの適用モードで動作します。
- [ALTER を無視] 適用変更設定は、ソース データ タイプの変更およびテーブル名の変更にはサポートされていません。
- [ターゲット テーブルの準備] 項目のドロップしてテーブルを作成オプションはサポートされていません。
- [ターゲット テーブルの準備] 項目のロード前に切り捨てオプションはサポートされていません。
- 変更データ パーティショニングコントロール テーブルはサポートされていません。
-
- 一般的な制限事項:
Kafka トピック名にはスペースを含めることはできず、255 文字 (Kafka 0.10 以降は 249 文字) を超えることはできません。また、次の文字のみを使用できます。
a-z|A-Z|0-9|. (ドット)|_(アンダースコア)|-(マイナス)
ソーステーブル名が許可されている最大長を超えている場合、またはサポートされていない文字が含まれている場合は、タスクを開始する前に名前を変更するか、グローバル変換を定義する必要があります。
列名は [A-Za-z_] (文字またはアンダースコア) で始まり、その後に [A-Za-z0-9_] (文字、数字、またはアンダースコア) が続く必要があります。たとえば、_Test_ は有効な列名ですが、&Test は有効ではありません。
ソース列名がこのルールに従わない場合、変換を使用して列名を変更する必要があります。
- ソース テーブルの削除または名前の変更はサポートされていません。
データ タイプ
次の表は、Qlik Cloud 使用時にサポートされる Kafka データ タイプと、Qlik Cloud データ タイプからの既定のマッピングを示しています。
JSON メッセージ形式を使用する場合、バイナリ値は 16 進数で表現されます。
| Qlik Cloud データ タイプ | スキーマ メッセージの Kafka ターゲット データ タイプ |
|---|---|
|
DATE |
DATE |
|
TIME |
TIME |
|
DATETIME |
DATETIME |
|
BYTES |
BYTES (長さ) |
|
BLOB |
BLOB |
|
REAL4 |
REAL4 (7) |
|
REAL8 |
REAL8 (14) |
|
INT1 |
INT1 (3) |
|
INT2 |
INT2 (5) |
|
INT4 |
INT4 (10) |
|
INT8 |
INT8 (19) |
|
UINT1 |
UINT1 (3) |
|
UINT2 |
UINT2 (5) |
|
UINT4 情報メモ
2^31-1 を超える値はサポートされていません。 |
UINT4 (10) |
|
UINT8 情報メモ
2^63-1 を超える値はサポートされていません。 |
UINT8 (20) |
|
NUMERIC |
NUMERIC (p,s) |
|
STRING |
STRING (長さ) |
|
WSTRING |
STRING (長さ) |
|
CLOB |
CLOB |
|
NCLOB |
NCLOB |
|
BOOLEAN |
BOOLEAN (1) |
JSON および Avro へのマッピング
Avro メッセージ形式では、データ タイプをより正確に表現するために論理型が使用されます。
Qlik Cloud データ タイプは、 [特定のデータ タイプに論理データ タイプを使用] チェックボックスが選択されている場合にのみ、サポートされている Avro 論理データ タイプにマッピングされます。
| Qlik Cloud データ型 | JSON | Avro 論理データ タイプ |
|---|---|---|
|
DATE |
STRING |
DATE Avro INT に注釈を付けます。 |
|
TIME |
STRING |
TIME-MILLIS Avro INT に注釈を付けます。 |
|
TIMESTAMP |
STRING |
TIMESTAMP-MICROS Avro LONG に注釈を付けます。 |
|
STRING |
STRING |
- |
|
WSTRING |
STRING |
- |
|
CLOB |
STRING |
- |
|
NCLOB |
STRING |
- |
|
NUMERIC |
STRING |
DECIMAL (p,s) Avro BYTES に注釈を付けます。 |
|
BYTES |
BYTES |
- |
|
BLOB |
BYTES |
- |
|
REAL4 |
FLOAT |
- |
|
REAL8 |
DOUBLE |
- |
|
INT1 |
INT |
- |
|
INT2 |
INT |
- |
|
INT4 |
INT |
- |
|
UINT1 |
INT |
- |
|
UINT2 |
INT |
- |
|
UINT4 |
LONG |
- |
|
INT8 |
LONG |
- |
|
UINT8 |
STRING |
DECIMAL (20,0) Avro BYTES に注釈を付けます。 |
|
BOOLEAN |
BOOLEAN |
- |
Confluent Cloud での作業
次のセクションでは、Kafka コネクタを Confluent Cloud で動作するように構成する方法を説明します。
- Confluent Cloud でクラスターを作成します。
- クラスター設定から [Bootstrap サーバー] の値を、コネクタ設定の [基本設定] タブにある [ブローカー サーバー] 項目にコピーします。
- コネクタ設定で、 [認証方法] ドロップダウン リストから [ユーザー名とパスワード (SASL/PLAIN)] を選択します。
- Confluent Cluster の [API アクセス] 画面で、 [API キー ペア] を作成します。
- キーとシークレットをコピーし、それぞれ Kafka コネクタの [ユーザー名 ] と [パスワード] 項目に貼り付けます。
- Kafka コネクタ設定で、 [TLS を有効にする] オプションを選択し、 [CA パス] 項目に CA ファイルのフル パスを指定します。CA ファイルは Confluent のガイドラインに従って作成する必要があり、すべての Confluent Cloud 証明書に署名する CA とそのルート CA 署名者を含める必要があります。
これで Confluent Cloud を使用する準備ができました。