メイン コンテンツをスキップする 補完的コンテンツへスキップ

Kafka ターゲット

このトピックでは、Kafka ターゲット コネクタを使用して Kafka ターゲットへの接続を構成する方法について説明します。Kafka は、レプリケーション タスクのターゲットとしてのみ使用できます。

Kafka をターゲットとして設定するには、次が必要です。

  • 前提条件を満たす
  • Kafka への接続を構成する

接続プロパティの設定

コネクタを構成するには、次の手順を実行します。

  1. [接続] で、 [接続を作成] をクリックします。

  2. Kafka ターゲット コネクタを選択し、次の設定を行います。

データ ターゲット

データ ゲートウェイ

情報メモこのサブスクリプション ティアでは Data Movement gateway がサポートされていないため、Qlik Talend Cloud スターター サブスクリプションではこの項目を使用できません。

Amazon MSK にデータを移動する場合、Data Movement gateway が必要になるのは、Amazon MSK が Qlik Cloud からアクセスできず、プライベート リンクを使用してのみアクセスできる場合 (仮想プライベートクラウド内にある場合など) のみです。このような場合は、ターゲット データベースにアクセスする Data Movement gateway を選択します。ユースケースに応じて、これはデータ ソースからデータを移動するために展開された、同じまたは別の Data Movement gateway になります。

Qlik Cloud から直接アクセスできる Amazon MSK インスタンスにデータを移動する場合は、 [なし] を選択します。

情報メモ
  • オンプレミスの Kafka にデータを移動する場合 (以下の クラウド プロバイダー を参照)、Kafka がQlik Cloud から直接アクセスできるかどうかにかかわらず、Data Movement gateway を選択する必要があります。

  • Data Movement gateway を使用する必要がある場合は、バージョン 2025.5.40 以降が必要です。

Data Movement gateway のユース ケースに関する詳細については、「どのようなときに Data Movement gateway が必要ですか?」および「一般的な使用例」を参照してください。

[Cloud provider] (クラウドプロバイダー)

オンプレミスの Kafka を使用する場合は [なし] を、Amazon MSK を使用する場合は [Amazon MSK] を選択します。

ブローカー サーバー

1 つ以上のブローカー サーバーを次の形式で指定します (高可用性のため)。

server1[:port1][,server2[:port2]]

例:

192.168.1.100:9092,192.168.1.101:9093

データタスクは、最初に利用可能なホストに接続します。ポートを指定せずにホストが指定された場合、ポート 9092 が既定として使用されます。

情報メモ

SSL または Kerberos 認証を使用する場合、ブローカーの FQDN (つまり、IP アドレスではない) を指定する必要があります。

情報メモ

クラスター内のすべてのブローカー サーバーは、Data Movement gateway マシンからアクセスできる必要があります。ただし、すべてのサーバーを指定する必要はありません ブローカー サーバー 項目にペーストします。これは、Data Movement gateway がクラスター内の他のサーバーの接続詳細を取得するために、1 つのサーバーに接続するだけでよいためです。したがって、タスク実行時にもっとも利用可能である可能性が高いサーバーを指定することがベスト プラクティスです。データ タスクがメッセージを生成するサーバーは、トピック、パーティショニング トピック、パーティショニング設定によって決定されます。

アカウントのプロパティ

認証方法

次のいずれかを選択します。

  • なし: 認証なし。
  • 証明書に設定して有効にします。このオプションを選択した場合は、次の情報も提供する必要があります。

    メモ  パブリック キー ファイルとプライベートキー ファイルは PEM 形式である必要があります。

    • パブリック キー ファイル: PEM 形式のパブリック キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
    • プライベートキーファイルに設定して有効にします。PEM 形式のプライベート キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
    • プライベート キー パスワードに設定して有効にします。プライベート キー ファイルのパスワード。
  • Kerberos (SASL/GSSAPI): Kerberos を使用して Kafka クラスターに対して認証をする場合に選択します。

    情報メモ

    選択された [クラウド プロバイダー] が [Amazon MSK] の場合、この認証方法は使用できません。

    • プリンシパル: ブローカー サーバーに対して認証するために使用される Kerberos プリンシパル。
    • keytab ファイル: Keytab ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
    情報メモ

    Linux で Kerberos 認証を使用するには、Kerberos クライアント (ワークステーション) パッケージがインストールされている必要があります。

  • ユーザー名とパスワード (SASL/PLAIN): このオプションを選択すると、ユーザー名とパスワード (SASL/PLAIN) を使用して認証できます。パスワードが平文で送信されるのを防ぐため、 [TLS を有効にする] オプションも有効にすることを強くお勧めします。

    情報メモ

    選択した [クラウド プロバイダー] が [Amazon MSK] の場合、この認証方法は使用できません。

  • ユーザー名とパスワード (SASL/SCRAM-SHA-256): このオプションを選択すると、ユーザー名とパスワード (SASL/SCRAM-SHA-256) を使用して認証できます。

    このオプションを選択する場合は、各ブローカーの server.properties ファイルにも対応する SASL/SCRAM メカニズムを設定する必要があります。

    情報メモ

    選択した [クラウド プロバイダー] が [Amazon MSK] の場合、この認証方法は使用できません。

  • ユーザー名とパスワード (SASL/SCRAM-SHA-512): このオプションを選択すると、ユーザー名とパスワード (SASL/SCRAM-SHA-512) を使用して認証できます。

    このオプションを選択する場合は、各ブローカーの server.properties ファイルにも対応する SASL/SCRAM メカニズムを設定する必要があります。

SSL オプション

このオプションは、Qlik Talend Cloud とブローカー サーバー間の通信を暗号化する場合に選択します。ブローカーが SSL を要求するように構成されている場合、このオプションを必ず選択してください。

  • CA ファイル: PEM 形式の CA 証明書を参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。

    情報メモ

    選択した [クラウド プロバイダー] が [Amazon MSK] の場合は必要ありません。

メタデータ メッセージの公開

公開にスキーマ レジストリに使用できるメタデータ メッセージ

ドロップダウン リストから、次のオプションのいずれかを選択します。

  • いいえ

    このオプションを選択すると、データ メッセージのみが公開されます。

  • Confluent Schema Registry

    このオプションを選択する場合は、以下に説明するスキーマ レジストリ接続プロパティも構成する必要があります。

情報メモ
  • Confluent Schema Registry のオプションは、Avro メッセージ形式のみをサポートしています。

  • データメッセージと同じトピックにスキーマ メッセージを公開しないことを強くお勧めします。

  • トピックが存在しない場合は、データタスクが実行時にトピックを作成できるように、auto.create.topics.enable=true を使用してブローカーを設定します。存在しない場合、タスクは失敗します。

メッセージ形式

メッセージ形式として JSON または Avro を選択します。Confluent Schema Registry を選択した場合、Avro のみが利用可能なオプションになります。

スキーマ レジストリ接続プロパティ

スキーマ レジストリ サーバー

1 つ以上のスキーマ レジストリ サーバーを次の形式で指定します (高可用性のため)。

Confluent Schema Registry にデータ スキーマを公開する場合:

server1:port1[,server2[:port2]]

 

192.168.1.100:8081,192.168.1.101:8081

データタスクは、最初に利用可能なホストに接続します。

Hortonworks Schema Registry にデータ スキーマを公開する場合:

server1:port1[,server2[:port2]]

 

192.168.1.100:7788,192.168.1.101:7788

データタスクは、最初に利用可能なホストに接続します。

認証方法

次のスキーマ レジストリ認証オプションのいずれかを選択します。

  • なし: 認証なし。
  • Kerberos: Kerberos を使用して認証する場合に選択します。

    情報メモ

    Linux で Kerberos 認証を使用するには、Kerberos クライアント (ワークステーション) パッケージがインストールされている必要があります。

    • プリンシパル: スキーマ レジストリに対して認証するために使用される Kerberos プリンシパル。
    • Keytab ファイル: Keytab ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。

  • 証明書: 証明書を使用して認証する場合に選択します。

    情報メモ

    このオプションは、Confluent Schema Registryに公開する場合にのみサポートされます。

    このオプションを選択した場合は、次の情報も提供する必要があります。

    • パブリック キー ファイル: PEM 形式のパブリック キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
    • プライベート キー ファイル: PEM 形式のプライベート キー ファイルを参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。
    • プライベート キー パスワード: プライベート キー ファイルのパスワード。
  • ユーザー名とパスワード: ユーザー名とパスワードで認証する場合に選択します。次に、 [ユーザー名] と [パスワード] 項目にログイン資格情報を入力します。

    情報メモ

    このオプションは、Confluent Schema Registry に公開する場合にのみサポートされます。

  • 証明書 + ユーザー名とパスワード: 証明書とユーザー名とパスワードの両方を使用して認証する場合に選択します。

    このオプションを選択した場合、上記の [パブリック キー ファイル]、 [プライベート キー ファイル]、 [プライベート キー パスワード]、 [ユーザー名]、 [パスワード] 項目に必須情報を入力します。

    情報メモ

    このオプションは、Confluent Schema Registry に公開する場合にのみサポートされます。

  • TLS を有効にする (TLS 1.0、1.1、1.2 をサポート): Data Movement gateway マシンとスキーマ レジストリ サーバー間のデータを暗号化するには、このオプションを選択します。サーバーが SSL を要求するように構成されている場合、このオプションを必ず選択してください。
    • CAファイル: PEM 形式の CA 証明書を参照します。[保存] をクリックすると、ファイルは Qlik Talend Cloud にアップロードされ、コネクタ設定で選択された Data Movement gateway に展開されます。

内部プロパティ

内部プロパティは、特殊なユース ケース向けであるため、ダイアログで公開されません。Qlik サポートによって指示があった場合にのみ使用する必要があります。

項目右側にある 新規作成キャンセル ボタンを使用して、必要に応じてプロパティを追加したり削除したりします。

名前

接続の表示名です。

前提条件

Kafka をターゲットとして使用するには、以下の前提条件を満たす必要があります。

  • Data Movement gateway マシンからすべてのブローカーへの TCP ポートを開きます。

  • データ タスクがターゲット トピックに書き込めるように、権限を設定します。これは、Kafka ACLs スクリプト (kafka-acls) を使用して実行することもできます。

  • データ タスクを開始する前に、attrep_apply_exceptions という名前のトピックを作成するか、auto.create.topics.enable=true でブローカーを設定します。

    このトピックが存在しない場合、エラー処理ポリシーに関係なく、データ エラーが発生するとタスクは常に失敗します。

    attrep_apply_exceptions コントロール テーブルの説明については、「適用の例外」を参照してください。

制限と考慮事項

Kafka をターゲット コネクタとしてタスクを定義する場合、次の制限が適用されます。

  • サポートされていないタスク設定:

    • Kafka ターゲット コネクタは、無制限の LOB サイズをサポートしていません。したがって、LOB 列を持つソース テーブルからデータを移動する場合、 [無制限の LOB サイズを許可] オプションを選択しないでください

    • 変更の保存モードはサポートされていません。

    • バッチ最適化適用モードはサポートされていません。Kafka は常にトランザクションの適用モードで動作します。

    • [ALTER を無視] 適用変更設定は、ソース データ タイプの変更およびテーブル名の変更にはサポートされていません。
    • [ターゲット テーブルの準備] 項目のドロップしてテーブルを作成オプションはサポートされていません。
    • [ターゲット テーブルの準備] 項目のロード前に切り捨てオプションはサポートされていません。
    • 変更データ パーティショニングコントロール テーブルはサポートされていません。
  • 一般的な制限事項:
    • Kafka トピック名にはスペースを含めることはできず、255 文字 (Kafka 0.10 以降は 249 文字) を超えることはできません。また、次の文字のみを使用できます。

      a-z|A-Z|0-9|. (ドット)|_(アンダースコア)|-(マイナス)

      ソーステーブル名が許可されている最大長を超えている場合、またはサポートされていない文字が含まれている場合は、タスクを開始する前に名前を変更するか、グローバル変換を定義する必要があります。

    • 列名は [A-Za-z_] (文字またはアンダースコア) で始まり、その後に [A-Za-z0-9_] (文字、数字、またはアンダースコア) が続く必要があります。たとえば、_Test_ は有効な列名ですが、&Test は有効ではありません。

      ソース列名がこのルールに従わない場合、変換を使用して列名を変更する必要があります。

    • ソース テーブルの削除または名前の変更はサポートされていません。

データ タイプ

次の表は、Qlik Cloud 使用時にサポートされる Kafka データ タイプと、Qlik Cloud データ タイプからの既定のマッピングを示しています。

情報メモ

JSON メッセージ形式を使用する場合、バイナリ値は 16 進数で表現されます。

Kafka データ マッピング
Qlik Cloud データ タイプ スキーマ メッセージの Kafka ターゲット データ タイプ

DATE

DATE

TIME

TIME

DATETIME

DATETIME

BYTES

BYTES (長さ)

BLOB

BLOB

REAL4

REAL4 (7)

REAL8

REAL8 (14)

INT1

INT1 (3)

INT2

INT2 (5)

INT4

INT4 (10)

INT8

INT8 (19)

UINT1

UINT1 (3)

UINT2

UINT2 (5)

UINT4

情報メモ

2^31-1 を超える値はサポートされていません。

UINT4 (10)

UINT8

情報メモ

2^63-1 を超える値はサポートされていません。

UINT8 (20)

NUMERIC

NUMERIC (p,s)

STRING

STRING (長さ)

WSTRING

STRING (長さ)

CLOB

CLOB

NCLOB

NCLOB

BOOLEAN

BOOLEAN (1)

JSON および Avro へのマッピング

Avro メッセージ形式では、データ タイプをより正確に表現するために論理型が使用されます。

Qlik Cloud データ タイプは、 [特定のデータ タイプに論理データ タイプを使用] チェックボックスが選択されている場合にのみ、サポートされている Avro 論理データ タイプにマッピングされます。

データ タイプ マッピング
Qlik Cloud データ型 JSON Avro 論理データ タイプ

DATE

STRING

DATE

Avro INT に注釈を付けます。

TIME

STRING

TIME-MILLIS

Avro INT に注釈を付けます。

TIMESTAMP

STRING

TIMESTAMP-MICROS

Avro LONG に注釈を付けます。

STRING

STRING

-

WSTRING

STRING

-

CLOB

STRING

-

NCLOB

STRING

-

NUMERIC

STRING

DECIMAL (p,s)

Avro BYTES に注釈を付けます。

BYTES

BYTES

-

BLOB

BYTES

-

REAL4

FLOAT

-

REAL8

DOUBLE

-

INT1

INT

-

INT2

INT

-

INT4

INT

-

UINT1

INT

-

UINT2

INT

-

UINT4

LONG

-

INT8

LONG

-

UINT8

STRING

DECIMAL (20,0)

Avro BYTES に注釈を付けます。

BOOLEAN

BOOLEAN

-

Confluent Cloud での作業

次のセクションでは、Kafka コネクタを Confluent Cloud で動作するように構成する方法を説明します。

  1. Confluent Cloud でクラスターを作成します。
  2. クラスター設定から [Bootstrap サーバー] の値を、コネクタ設定の [基本設定] タブにある [ブローカー サーバー] 項目にコピーします。
  3. コネクタ設定で、 [認証方法] ドロップダウン リストから [ユーザー名とパスワード (SASL/PLAIN)] を選択します。
  4. Confluent Cluster の [API アクセス] 画面で、 [API キー ペア] を作成します。
  5. キーとシークレットをコピーし、それぞれ Kafka コネクタの [ユーザー名 ] と [パスワード] 項目に貼り付けます。
  6. Kafka コネクタ設定で、 [TLS を有効にする] オプションを選択し、 [CA パス] 項目に CA ファイルのフル パスを指定します。CA ファイルは Confluent のガイドラインに従って作成する必要があり、すべての Confluent Cloud 証明書に署名する CA とそのルート CA 署名者を含める必要があります。

これで Confluent Cloud を使用する準備ができました。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。