Kafka 目标
本主题介绍如何使用 Kafka 目标连接器配置与 Kafka 目标的连接。Kafka 只能在复制任务中用作目标。
将 Kafka 设定为目标涉及以下方面:
- 满足 先决条件
- 配置连接,指向 Kafka
设置连接属性
要配置连接器,请执行以下操作:
-
在连接中单击创建连接。
-
选择 Kafka 目标连接器,然后提供以下设置:
数据目标
数据网关
将数据移动到 Amazon MSK 时,只有当 Amazon MSK 无法从 Qlik Cloud 访问,并且只能使用私有链接访问时(例如,如果它位于虚拟专用云中),才需要 数据移动网关。如果是这种情况,请选择要通过其访问目标数据库的 数据移动网关。根据您的使用情况,这将和从数据源移动数据时部署的 数据移动网关 相同,或者与之不同。
如果您要将数据移动到可从 Qlik Cloud 直接访问的 Amazon MSK 实例,请选择 无。
-
如果您要将数据移动到本地 Kafka(请参阅下面的 云提供商),则必须选择一个 数据移动网关,无论 Kafka 是否可从 Qlik Cloud 直接访问。
-
如果您需要使用 数据移动网关,则需要 2025.5.40 或更高版本。
有关 数据移动网关 用例的信息,请参阅 什么时候需要 数据移动网关? 和 常见用例。
云提供商
选择 无 以使用本地 Kafka,或选择 Amazon MSK 以使用 Amazon MSK。
代理服务器
使用以下格式指定一个或多个代理服务器(用于高可用性):
server1[:port1][,server2[:port2]]
示例:
192.168.1.100:9092,192.168.1.101:9093
数据任务将连接到第一个可用的主机。如果指定主机时未指定端口,则将使用端口 9092 作为默认值。
当使用 SSL 或 Kerberos 身份验证时,您必须指定代理 FQDN(即,不是 IP 地址)。
集群中的所有代理服务器都需要从 数据移动网关 机器访问。但是,您不需要指定所有服务器 代理服务器 字段。这是因为 数据移动网关 只需要连接到其中一台服务器,即可检索集群中其他服务器的连接详细信息。因此,最佳实践是指定任务运行时最有可能可用的服务器。数据任务生成消息的服务器由主题和分区主题以及分区设置决定。
帐户属性
验证方式
选择以下其中一项:
- 无: 无身份验证。
-
证书:如果您选择此选项,您还需要提供以下信息:
备注公钥和私钥文件必须采用 PEM 格式。
- 公钥文件: 浏览到采用 PEM 格式的公钥文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到在连接器设置中选择的数据移动网关。
- 私钥文件:浏览到 PEM 格式的私钥文件。当您点击 保存 时,文件将上传到 Qlik Talend Cloud 并部署到连接器设置中选择的 数据移动网关。
- 私钥文件密码:私钥文件的密码。
-
Kerberos (SASL/GSSAPI):选择使用 Kerberos 对 Kafka 集群进行身份验证。
信息注释此身份验证方法在所选的 云提供商 为 Amazon MSK 时不可用。
- 主体:用于对代理服务器进行身份验证的 Kerberos 主体。
- Keytab 文件:浏览到 keytab 文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到在连接器设置中选择的数据移动网关。
信息注释为了在 Linux 上使用 Kerberos 身份验证,应安装 Kerberos 客户端(工作站)软件包。
-
用户名和密码 (SASL/PLAIN):您可以选择此选项,使用用户名和密码进行身份验证 (SASL/PLAIN)。为防止密码以明文形式发送,强烈建议同时启用启用 TLS选项。
信息注释当选定的云提供商为Amazon MSK时,此身份验证方法不可用。
-
用户名和密码 (SASL/SCRAM-SHA-256):您可以选择此选项,使用用户名和密码进行身份验证 (SASL/SCRAM-SHA-256)。
请注意,选择此选项还需要将每个代理的 server.properties 文件配置为相应的 SASL/SCRAM 机制。
信息注释当选定的 云提供商 为 Amazon MSK 时,此身份验证方法不可用。
-
用户名和密码 (SASL/SCRAM-SHA-512):您可以选择此选项以使用用户名和密码 (SASL/SCRAM-SHA-512) 进行身份验证。
请注意,选择此选项还需要将每个代理的 server.properties 文件配置为相应的 SASL/SCRAM 机制。
SSL 选项
选择此选项以加密 Qlik Talend Cloud 与代理服务器之间的通信。如果代理配置为需要 SSL,则您必须选择此选项。
-
CA 文件:浏览到 PEM 格式的 CA 证书。当您单击保存时,文件将上传到 Qlik Talend Cloud 并部署到连接器设置中选择的 数据移动网关。
信息注释当所选的云提供商是Amazon MSK时,不需要。
元数据消息发布
模式注册表将可用于元数据消息发布
从下拉列表中选择以下选项之一:
-
否
选择此选项后,只有数据消息将被发布。
-
Confluent 模式注册表
如果您选择此选项,您还必须配置Schema registry 连接属性,如下所述。
-
Confluent Schema Registry 选项仅支持 Avro 消息格式。
-
强烈建议不要将 schema 消息发布到与数据消息相同的主题。
-
如果主题不存在,请使用auto.create.topics.enable=true配置代理,以使数据任务能够在运行时创建主题。否则,任务将失败。
消息格式
选择 JSON 或 Avro 作为消息格式。如果您选择了 Confluent schema registry,则 Avro 将是唯一可用的选项。
Schema registry 连接属性
Schema registry 服务器
使用以下格式指定一个或多个 Schema Registry 服务器(用于高可用性):
将数据架构发布到 Confluent Schema Registry 时:
server1:port1[,server2[:port2]]
示例:
192.168.1.100:8081,192.168.1.101:8081
数据任务将连接到第一个可用的主机。
将数据架构发布到 Hortonworks Schema Registry 时:
server1:port1[,server2[:port2]]
示例:
192.168.1.100:7788,192.168.1.101:7788
数据任务将连接到第一个可用的主机。
验证方式
选择以下其中一个 Schema Registry 身份验证选项:
- 无:无身份验证。
-
Kerberos: 选择使用 Kerberos 进行身份验证。
信息注释要在 Linux 上使用 Kerberos 身份验证,应安装 Kerberos 客户端(工作站)软件包。
- 主体: 用于针对 Schema Registry 进行身份验证的 Kerberos 主体。
-
Keytab 文件:浏览到 keytab 文件。当您点击 保存 时,文件将上传到 Qlik Talend Cloud 并部署到连接器设置中选择的任何 数据移动网关。
-
证书:选择使用证书进行身份验证。
信息注释此选项仅在发布到 Confluent Schema Registry 时受支持。
如果您选择此选项,还需要提供以下信息:
- 公钥文件:浏览到 PEM 格式的公钥文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到连接器设置中选择的数据移动网关。
- 私钥文件:浏览到 PEM 格式的私钥文件。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到连接器设置中选择的数据移动网关。
- 私钥密码:私钥文件的密码。
-
用户名和密码:选择使用用户名和密码进行身份验证。然后,在用户名和密码字段中输入您的登录凭据。
信息注释仅在发布到 Confluent Schema Registry 时支持此选项。
-
证书 + 用户名和密码:选择使用证书、用户名和密码进行身份验证。
选择此选项后,请在上述公钥文件、私钥文件、私钥密码、用户名和密码字段中输入所需信息。
信息注释此选项仅在发布到 Confluent Schema Registry 时受支持。
- 启用 TLS(支持 TLS 1.0、1.1 和 1.2):选择此选项可加密数据移动网关计算机与 Schema Registry 服务器之间的数据。如果服务器配置为需要 SSL,则您必须选择此选项。
CA 文件: 浏览 PEM 格式的 CA 证书。当您点击保存时,文件将上传到Qlik Talend Cloud并部署到在连接器设置中选择的数据移动网关。
内部属性
内部属性用于特殊用例,因此不会在对话框中公开。只有在 Qlik 支持部门的指示下,您才应该使用它们。
使用字段右侧的 和
按钮可以根据需要添加或删除特性。
名称
连接的显示名称。
先决条件
在将 Kafka 用作目标之前,必须满足以下先决条件:
-
设置权限以允许数据任务写入目标主题。一种方法是使用 Kafka ACLs 脚本 (kafka-acls)。
-
要么在数据任务启动前创建名为attrep_apply_exceptions的主题,要么使用auto.create.topics.enable=true配置代理。
请注意,如果此主题不存在,则无论错误处理策略如何,任务在遇到数据错误时都将始终失败。
有关attrep_apply_exceptions控制表的说明,请参阅应用例外。
限制和考虑事项
定义任务时,如果将 Kafka 作为目标连接器,则适用以下限制:
-
不支持的任务设置:
-
Kafka 目标连接器不支持无限 LOB 大小。因此,从包含 LOB 列的源表移动数据时,不要选择允许无限 LOB 大小选项。
-
不支持存储更改模式。
-
批量优化应用模式不受支持。Kafka 始终在事务性应用模式下工作。
- 对于源数据类型更改和表重命名,不支持忽略 ALTER 应用更改设置。
- 目标表准备字段的“删除并创建表”选项不受支持。
- 目标表准备字段的“加载前截断”选项不受支持。
- 更改数据分区控制表不受支持。
-
- 一般限制:
Kafka 主题名称不能包含空格或超过 255 个字符(Kafka 0.10 为 249 个字符),并且只能包含以下字符:
a-z|A-Z|0-9|.(点)|_(下划线)|-(减号)
如果源表名称超出允许的最大长度或包含不支持的字符,您需要在启动任务之前修改名称或定义全局转换。
列名必须以 [A-Za-z_](字母或下划线)开头,后跟 [A-Za-z0-9_](字母、数字或下划线)。例如,_Test_ 是有效的列名,而 &Test 则不是。
如果源列名不符合此规则,则应使用转换来重命名该列。
- 不支持删除或重命名源表。
数据类型
下表显示了使用 Qlik Cloud 时支持的 Kafka 数据类型以及自 Qlik Cloud 数据类型的默认映射。
当使用 JSON 消息格式时,二进制值表示为十六进制数字。
| Qlik Cloud 数据类型 | Kafka 模式消息中的目标数据类型 |
|---|---|
|
DATE |
DATE |
|
TIME |
TIME |
|
DATETIME |
DATETIME |
|
BYTES |
BYTES(长度) |
|
BLOB |
BLOB |
|
REAL4 |
REAL4 (7) |
|
REAL8 |
REAL8 (14) |
|
INT1 |
INT1 (3) |
|
INT2 |
INT2 (5) |
|
INT4 |
INT4 (10) |
|
INT8 |
INT8 (19) |
|
UINT1 |
UINT1 (3) |
|
UINT2 |
UINT2 (5) |
|
UINT4 信息注释
不支持大于 2^31-1 的值。 |
UINT4 (10) |
|
UINT8 信息注释
不支持大于 2^63-1 的值。 |
UINT8 (20) |
|
NUMERIC |
NUMERIC (p,s) |
|
STRING |
STRING(长度) |
|
WSTRING |
STRING(长度) |
|
CLOB |
CLOB |
|
NCLOB |
NCLOB |
|
BOOLEAN |
BOOLEAN (1) |
映射到 JSON 和 Avro
Avro 消息格式使用逻辑类型以更精确地表示数据类型。
Qlik Cloud 数据类型将仅映射到支持的 Avro 逻辑数据类型,如果选中了 对特定数据类型使用逻辑数据类型 复选框。
| Qlik Cloud 数据类型 | JSON | Avro 逻辑数据类型 |
|---|---|---|
|
DATE |
STRING |
DATE 注解一个 Avro INT。 |
|
TIME |
STRING |
TIME-MILLIS 注解一个 Avro INT。 |
|
TIMESTAMP |
STRING |
TIMESTAMP-MICROS 注解一个 Avro LONG。 |
|
STRING |
STRING |
- |
|
WSTRING |
STRING |
- |
|
CLOB |
STRING |
- |
|
NCLOB |
STRING |
- |
|
NUMERIC |
STRING |
DECIMAL (p,s) 标注一个 Avro BYTES。 |
|
BYTES |
BYTES |
- |
|
BLOB |
BYTES |
- |
|
REAL4 |
FLOAT |
- |
|
REAL8 |
DOUBLE |
- |
|
INT1 |
INT |
- |
|
INT2 |
INT |
- |
|
INT4 |
INT |
- |
|
UINT1 |
INT |
- |
|
UINT2 |
INT |
- |
|
UINT4 |
LONG |
- |
|
INT8 |
LONG |
- |
|
UINT8 |
STRING |
DECIMAL (20,0) 标注一个 Avro BYTES。 |
|
BOOLEAN |
BOOLEAN |
- |
使用 Confluent Cloud
以下部分解释了如何配置 Kafka 连接器以使用 Confluent Cloud。
- 在 Confluent Cloud 中创建一个集群。
- 将集群设置中的引导服务器值复制到连接器设置的常规选项卡中的代理服务器字段。
- 在连接器设置中,从身份验证方法下拉列表中选择用户名和密码 (SASL/PLAIN)。
- 在您的 Confluent 集群的API 访问屏幕中,创建API 密钥对。
- 将密钥和机密分别复制到 Kafka 连接器的用户名和密码字段中。
- 在 Kafka 连接器设置中,选择启用 TLS选项,并在CA 路径字段中指定 CA 文件的完整路径。CA文件应根据Confluent的指南创建,并应包含签署所有Confluent Cloud证书的CA及其根CA签名者。
您现在应该已准备就绪,可以使用Confluent Cloud。