跳到主要内容 跳到补充内容

Kafka 目标

本主题介绍如何使用 Kafka 目标连接器配置与 Kafka 目标的连接。Kafka 只能在复制任务中用作目标。

Kafka 设定为目标涉及以下方面:

设置连接属性

要配置连接器,请执行以下操作:

  1. 连接中单击创建连接

  2. 选择 Kafka 目标连接器,然后提供以下设置:

数据目标

数据网关

信息注释此字段不适用于 Qlik Talend Cloud Starter 订阅,因为此订阅层不支持 数据移动网关

将数据移动到 Amazon MSK 时,只有当 Amazon MSK 无法从 Qlik Cloud 访问,并且只能使用私有链接访问时(例如,如果它位于虚拟专用云中),才需要 数据移动网关。如果是这种情况,请选择要通过其访问目标数据库的 数据移动网关。根据您的使用情况,这将和从数据源移动数据时部署的 数据移动网关 相同,或者与之不同。

如果您要将数据移动到可从 Qlik Cloud 直接访问的 Amazon MSK 实例,请选择

信息注释
  • 如果您要将数据移动到本地 Kafka(请参阅下面的 云提供商),则必须选择一个 数据移动网关,无论 Kafka 是否可从 Qlik Cloud 直接访问。

  • 如果您需要使用 数据移动网关,则需要 2025.5.40 或更高版本。

有关 数据移动网关 用例的信息,请参阅 什么时候需要 数据移动网关?常见用例

云提供商

选择 以使用本地 Kafka,或选择 Amazon MSK 以使用 Amazon MSK。

代理服务器

使用以下格式指定一个或多个代理服务器(用于高可用性):

server1[:port1][,server2[:port2]]

示例:

192.168.1.100:9092,192.168.1.101:9093

数据任务将连接到第一个可用的主机。如果指定主机时未指定端口,则将使用端口 9092 作为默认值。

信息注释

当使用 SSL 或 Kerberos 身份验证时,您必须指定代理 FQDN(即,不是 IP 地址)。

信息注释

集群中的所有代理服务器都需要从 数据移动网关 机器访问。但是,您不需要指定所有服务器 代理服务器 字段。这是因为 数据移动网关 只需要连接到其中一台服务器,即可检索集群中其他服务器的连接详细信息。因此,最佳实践是指定任务运行时最有可能可用的服务器。数据任务生成消息的服务器由主题和分区主题以及分区设置决定。

帐户属性

验证方式

选择以下其中一项:

  • 无身份验证。
  • 证书如果您选择此选项,您还需要提供以下信息:

    备注公钥和私钥文件必须采用 PEM 格式。

    • 公钥文件: 浏览到采用 PEM 格式的公钥文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到在连接器设置中选择的数据移动网关
    • 私钥文件浏览到 PEM 格式的私钥文件。当您点击 保存 时,文件将上传到 Qlik Talend Cloud 并部署到连接器设置中选择的 数据移动网关
    • 私钥文件密码私钥文件的密码。
  • Kerberos (SASL/GSSAPI)选择使用 Kerberos 对 Kafka 集群进行身份验证。

    信息注释

    此身份验证方法在所选的 云提供商Amazon MSK 时不可用。

    • 主体用于对代理服务器进行身份验证的 Kerberos 主体。
    • Keytab 文件:浏览到 keytab 文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到在连接器设置中选择的数据移动网关
    信息注释

    为了在 Linux 上使用 Kerberos 身份验证,应安装 Kerberos 客户端(工作站)软件包。

  • 用户名和密码 (SASL/PLAIN)您可以选择此选项,使用用户名和密码进行身份验证 (SASL/PLAIN)。为防止密码以明文形式发送,强烈建议同时启用启用 TLS选项。

    信息注释

    当选定的云提供商Amazon MSK时,此身份验证方法不可用。

  • 用户名和密码 (SASL/SCRAM-SHA-256)您可以选择此选项,使用用户名和密码进行身份验证 (SASL/SCRAM-SHA-256)。

    请注意,选择此选项还需要将每个代理的 server.properties 文件配置为相应的 SASL/SCRAM 机制。

    信息注释

    当选定的 云提供商Amazon MSK 时,此身份验证方法不可用。

  • 用户名和密码 (SASL/SCRAM-SHA-512)您可以选择此选项以使用用户名和密码 (SASL/SCRAM-SHA-512) 进行身份验证。

    请注意,选择此选项还需要将每个代理的 server.properties 文件配置为相应的 SASL/SCRAM 机制。

SSL 选项

选择此选项以加密 Qlik Talend Cloud 与代理服务器之间的通信。如果代理配置为需要 SSL,则您必须选择此选项。

  • CA 文件:浏览到 PEM 格式的 CA 证书。当您单击保存时,文件将上传到 Qlik Talend Cloud 并部署到连接器设置中选择的 数据移动网关

    信息注释

    当所选的云提供商Amazon MSK时,不需要。

元数据消息发布

模式注册表将可用于元数据消息发布

从下拉列表中选择以下选项之一:

  • 选择此选项后,只有数据消息将被发布。

  • Confluent 模式注册表

    如果您选择此选项,您还必须配置Schema registry 连接属性,如下所述。

信息注释
  • Confluent Schema Registry 选项仅支持 Avro 消息格式。

  • 强烈建议不要将 schema 消息发布到与数据消息相同的主题。

  • 如果主题不存在,请使用auto.create.topics.enable=true配置代理,以使数据任务能够在运行时创建主题。否则,任务将失败。

消息格式

选择 JSONAvro 作为消息格式。如果您选择了 Confluent schema registry,则 Avro 将是唯一可用的选项。

Schema registry 连接属性

Schema registry 服务器

使用以下格式指定一个或多个 Schema Registry 服务器(用于高可用性):

将数据架构发布到 Confluent Schema Registry 时:

server1:port1[,server2[:port2]]

示例:  

192.168.1.100:8081,192.168.1.101:8081

数据任务将连接到第一个可用的主机。

将数据架构发布到 Hortonworks Schema Registry 时:

server1:port1[,server2[:port2]]

示例:  

192.168.1.100:7788,192.168.1.101:7788

数据任务将连接到第一个可用的主机。

验证方式

选择以下其中一个 Schema Registry 身份验证选项:

  • :无身份验证。
  • Kerberos: 选择使用 Kerberos 进行身份验证。

    信息注释

    要在 Linux 上使用 Kerberos 身份验证,应安装 Kerberos 客户端(工作站)软件包。

    • 主体 用于针对 Schema Registry 进行身份验证的 Kerberos 主体。
    • Keytab 文件:浏览到 keytab 文件。当您点击 保存 时,文件将上传到 Qlik Talend Cloud 并部署到连接器设置中选择的任何 数据移动网关

  • 证书选择使用证书进行身份验证。

    信息注释

    此选项仅在发布到 Confluent Schema Registry 时受支持。

    如果您选择此选项,还需要提供以下信息:

    • 公钥文件:浏览到 PEM 格式的公钥文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到连接器设置中选择的数据移动网关
    • 私钥文件:浏览到 PEM 格式的私钥文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到连接器设置中选择的数据移动网关
    • 私钥密码私钥文件的密码。
  • 用户名和密码选择使用用户名和密码进行身份验证。然后,在用户名密码字段中输入您的登录凭据。

    信息注释

    仅在发布到 Confluent Schema Registry 时支持此选项。

  • 证书 + 用户名和密码选择使用证书、用户名和密码进行身份验证。

    选择此选项后,请在上述公钥文件私钥文件私钥密码用户名密码字段中输入所需信息。

    信息注释

    此选项仅在发布到 Confluent Schema Registry 时受支持。

  • 启用 TLS(支持 TLS 1.0、1.1 和 1.2):选择此选项可加密数据移动网关计算机与 Schema Registry 服务器之间的数据。如果服务器配置为需要 SSL,则您必须选择此选项。
    • CA 文件: 浏览 PEM 格式的 CA 证书。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到在连接器设置中选择的数据移动网关

内部属性

内部属性用于特殊用例,因此不会在对话框中公开。只有在 Qlik 支持部门的指示下,您才应该使用它们。

使用字段右侧的 新建取消 按钮可以根据需要添加或删除特性。

名称

连接的显示名称。

先决条件

在将 Kafka 用作目标之前,必须满足以下先决条件:

  • 数据移动网关机器向所有代理开放 TCP 端口

  • 设置权限以允许数据任务写入目标主题。一种方法是使用 Kafka ACLs 脚本 (kafka-acls)。

  • 要么在数据任务启动前创建名为attrep_apply_exceptions的主题,要么使用auto.create.topics.enable=true配置代理。

    请注意,如果此主题不存在,则无论错误处理策略如何,任务在遇到数据错误时都将始终失败。

    有关attrep_apply_exceptions控制表的说明,请参阅应用例外

限制和考虑事项

定义任务时,如果将 Kafka 作为目标连接器,则适用以下限制:

  • 不支持的任务设置:

    • Kafka 目标连接器不支持无限 LOB 大小。因此,从包含 LOB 列的源表移动数据时,不要选择允许无限 LOB 大小选项。

    • 不支持存储更改模式。

    • 批量优化应用模式不受支持。Kafka 始终在事务性应用模式下工作。

    • 对于源数据类型更改和表重命名,不支持忽略 ALTER 应用更改设置。
    • 目标表准备字段的“删除并创建表”选项不受支持。
    • 目标表准备字段的“加载前截断”选项不受支持。
    • 更改数据分区控制表不受支持。
  • 一般限制:
    • Kafka 主题名称不能包含空格或超过 255 个字符(Kafka 0.10 为 249 个字符),并且只能包含以下字符:

      a-z|A-Z|0-9|.(点)|_(下划线)|-(减号)

      如果源表名称超出允许的最大长度或包含不支持的字符,您需要在启动任务之前修改名称或定义全局转换。

    • 列名必须以 [A-Za-z_](字母或下划线)开头,后跟 [A-Za-z0-9_](字母、数字或下划线)。例如,_Test_ 是有效的列名,而 &Test 则不是。

      如果源列名不符合此规则,则应使用转换来重命名该列。

    • 不支持删除或重命名源表。

数据类型

下表显示了使用 Qlik Cloud 时支持的 Kafka 数据类型以及自 Qlik Cloud 数据类型的默认映射。

信息注释

当使用 JSON 消息格式时,二进制值表示为十六进制数字。

Kafka 数据映射
Qlik Cloud 数据类型 Kafka 模式消息中的目标数据类型

DATE

DATE

TIME

TIME

DATETIME

DATETIME

BYTES

BYTES(长度)

BLOB

BLOB

REAL4

REAL4 (7)

REAL8

REAL8 (14)

INT1

INT1 (3)

INT2

INT2 (5)

INT4

INT4 (10)

INT8

INT8 (19)

UINT1

UINT1 (3)

UINT2

UINT2 (5)

UINT4

信息注释

不支持大于 2^31-1 的值。

UINT4 (10)

UINT8

信息注释

不支持大于 2^63-1 的值。

UINT8 (20)

NUMERIC

NUMERIC (p,s)

STRING

STRING(长度)

WSTRING

STRING(长度)

CLOB

CLOB

NCLOB

NCLOB

BOOLEAN

BOOLEAN (1)

映射到 JSON 和 Avro

Avro 消息格式使用逻辑类型以更精确地表示数据类型。

Qlik Cloud 数据类型将仅映射到支持的 Avro 逻辑数据类型,如果选中了 对特定数据类型使用逻辑数据类型 复选框。

数据类型映射
Qlik Cloud 数据类型 JSON Avro 逻辑数据类型

DATE

STRING

DATE

注解一个 Avro INT。

TIME

STRING

TIME-MILLIS

注解一个 Avro INT。

TIMESTAMP

STRING

TIMESTAMP-MICROS

注解一个 Avro LONG。

STRING

STRING

-

WSTRING

STRING

-

CLOB

STRING

-

NCLOB

STRING

-

NUMERIC

STRING

DECIMAL (p,s)

标注一个 Avro BYTES。

BYTES

BYTES

-

BLOB

BYTES

-

REAL4

FLOAT

-

REAL8

DOUBLE

-

INT1

INT

-

INT2

INT

-

INT4

INT

-

UINT1

INT

-

UINT2

INT

-

UINT4

LONG

-

INT8

LONG

-

UINT8

STRING

DECIMAL (20,0)

标注一个 Avro BYTES。

BOOLEAN

BOOLEAN

-

使用 Confluent Cloud

以下部分解释了如何配置 Kafka 连接器以使用 Confluent Cloud。

  1. 在 Confluent Cloud 中创建一个集群。
  2. 将集群设置中的引导服务器值复制到连接器设置的常规选项卡中的代理服务器字段。
  3. 在连接器设置中,从身份验证方法下拉列表中选择用户名和密码 (SASL/PLAIN)
  4. 在您的 Confluent 集群的API 访问屏幕中,创建API 密钥对
  5. 将密钥和机密分别复制到 Kafka 连接器的用户名密码字段中。
  6. 在 Kafka 连接器设置中,选择启用 TLS选项,并在CA 路径字段中指定 CA 文件的完整路径。CA文件应根据Confluent的指南创建,并应包含签署所有Confluent Cloud证书的CA及其根CA签名者。

您现在应该已准备就绪,可以使用Confluent Cloud。

本页面有帮助吗?

如果您发现此页面或其内容有任何问题 – 打字错误、遗漏步骤或技术错误 – 请告诉我们!