Kafka цель
В этом разделе объясняется настройка подключения к цели Kafka с использованием коннектора цели Kafka . Kafka можно использовать в качестве цели только в задаче репликации.
Настройка Kafka в качестве цели подразумевает следующее:
- Выполнение предварительных условий
- Настройка подключения к Kafka
Настройка свойств подключения
Чтобы настроить коннектор, выполните следующие действия:
-
В виде Подключения нажмите Создать подключение.
-
Выберите коннектор цели Kafka, а затем задайте следующие настройки.
Цель данных
Шлюз данных
При перемещении данных в Amazon MSK, Шлюз движения данных требуется, только если Amazon MSK недоступен из Qlik Cloud и доступен только по ссылке PrivateLink (например, если он расположен в виртуальном частном облаке). В таком случае выберите Шлюз движения данных, через который требуется обращаться к целевой базе данных. В зависимости от конкретной ситуации это будет либо тот же Шлюз движения данных, развернутый для перемещения данных, либо другой шлюз.
Если вы перемещаете данные в экземпляр Amazon MSK, который напрямую доступен из Qlik Cloud, выберите Нет.
-
Если вы перемещаете данные в локальную среду Kafka (см. Поставщик облака ниже), вы должны выбрать Шлюз движения данных, независимо от того, доступен ли Kafka напрямую из Qlik Cloud.
-
Если вам нужно использовать Шлюз движения данных, требуется версия 2025.5.40 или более поздняя.
Для получения информации о сценариях применения Шлюз движения данных см. разделы Когда требуется Шлюз движения данных? и Общие случаи использования.
Поставщик облака
Выберите Нет для использования Kafka локально или Amazon MSK для использования Amazon MSK.
Серверы брокеров
Укажите один или несколько серверов брокеров, используя следующий формат (для высокой доступности):
server1[:port1][,server2[:port2]]
Пример:
192.168.1.100:9092,192.168.1.101:9093
Задача данных подключится к первому доступному хосту. Если хост указан без порта, то порт 9092 будет использоваться по умолчанию.
При использовании аутентификации SSL или Kerberos необходимо указать полное доменное имя брокера (FQDN) (т.е. не IP-адрес).
Все серверы брокера в вашем кластере должны быть доступны с машины Шлюз движения данных. Однако вам не нужно указывать все серверы в Серверы брокера . Это потому, что Шлюз движения данных нужно подключиться только к одному из серверов, чтобы получить данные подключения для других серверов в кластере. Поэтому рекомендуется указывать серверы, которые с наибольшей вероятностью будут доступны при выполнении задачи. Серверы, на которые задача обработки данных отправляет сообщения, определяются темой и темой секционирования, а также параметрами секционирования.
Свойства учетной записи (Account properties)
Проверка подлинности метод
выберите одно из значений, указанных ниже.
- Отсутствует: Без аутентификации.
-
Сертификат: Если вы выберете этот параметр, вам также необходимо предоставить следующую информацию:
ПримечаниеФайлы открытого и закрытого ключей должны быть в формате PEM.
- Файл открытого ключа: Укажите путь к файлу открытого ключа в формате PEM. Когда вы нажмете Сохранить, файл будет загружен в Qlik Talend Cloud и развернут в Шлюз движения данных, выбранном в настройках коннектора.
- Файл закрытого ключа: Укажите путь к файлу закрытого ключа в формате PEM. При нажатии Сохранить файл будет загружен в Qlik Talend Cloud и развернут на Шлюз движения данных, выбранном в настройках коннектора.
- Пароль файла закрытого ключа: Пароль для файла закрытого ключа.
-
Kerberos (SASL/GSSAPI): Выберите для аутентификации в кластере Kafka с использованием Kerberos.
Примечание к информацииЭтот метод аутентификации недоступен, если выбранный поставщик облачных услуг — Amazon MSK.
- Субъект: Субъект Kerberos, используемый для аутентификации на сервере(ах) брокера.
- Файл keytab: Перейдите к файлу keytab. При нажатии Сохранить файл будет загружен в Qlik Talend Cloud и развернут на Шлюз движения данных, выбранном в настройках коннектора.
Примечание к информацииДля использования аутентификации Kerberos в Linux необходимо установить пакет клиента Kerberos (рабочей станции).
-
Имя пользователя и пароль (SASL/PLAIN): Вы можете выбрать этот параметр для аутентификации с помощью имени пользователя и пароля (SASL/PLAIN). Чтобы предотвратить отправку пароля в открытом виде, настоятельно рекомендуется также включить параметр Включить TLS.
Примечание к информацииЭтот метод аутентификации недоступен, если выбранный Поставщик облачных услуг — Amazon MSK.
-
Имя пользователя и пароль (SASL/SCRAM-SHA-256): Вы можете выбрать этот параметр для аутентификации с помощью имени пользователя и пароля (SASL/SCRAM-SHA-256).
Обратите внимание, что выбор этой опции также требует, чтобы файл каждого брокера server.properties был настроен с соответствующим механизмом SASL/SCRAM.
Примечание к информацииЭтот метод аутентификации недоступен, когда выбранный поставщик облачных услуг является Amazon MSK.
-
Имя пользователя и пароль (SASL/SCRAM-SHA-512): Вы можете выбрать эту опцию для аутентификации с помощью имени пользователя и пароля (SASL/SCRAM-SHA-512).
Обратите внимание, что выбор этой опции также требует, чтобы файл каждого брокера server.properties был настроен с соответствующим механизмом SASL/SCRAM.
Параметры SSL
Включить TLS (поддерживает TLS 1.0, 1.1 и 1.2)
Выберите этот параметр, чтобы зашифровать обмен данными между Qlik Talend Cloud и сервером(ами) брокера. Если брокеры настроены на требование SSL, то вы должны выбрать этот параметр.
-
Файл ЦС: Перейдите к сертификату ЦС в формате PEM. При нажатии Сохранить файл будет загружен в Qlik Talend Cloud и развернут в Шлюз движения данных, выбранном в настройках коннектора.
Примечание к информацииНе требуется, когда выбранный облачный провайдер — Amazon MSK.
Публикация сообщений метаданных
Реестр схем будет доступен для публикации сообщений метаданных
Из раскрывающегося списка выберите один из следующих вариантов:
-
Нет
При выборе этого варианта будут опубликованы только сообщения с данными.
-
Реестр схем Confluent
Если вы выберете этот параметр, вы также должны настроить свойства подключения к реестру схем, описанные ниже.
-
Параметры Confluent Schema Registry поддерживают только формат сообщений Avro.
-
Настоятельно рекомендуется не публиковать сообщения схемы в тот же раздел, что и сообщения данных.
-
Если разделы не существуют, настройте брокеры с помощью auto.create.topics.enable=true, чтобы разрешить задаче данных создавать разделы во время выполнения. В противном случае задача завершится ошибкой.
Формат сообщений
Выберите JSON или Avro в качестве формата сообщения. Avro будет единственным доступным вариантом, если вы выбрали Confluent schema registry.
Свойства подключения реестра схем
Серверы реестра схем
Укажите один или несколько серверов реестра схем, используя следующий формат (для обеспечения высокой доступности):
При публикации схем данных в Confluent Schema Registry:
server1:port1[,server2[:port2]]
Пример:
192.168.1.100:8081,192.168.1.101:8081
Задача данных подключится к первому доступному хосту.
При публикации схем данных в Hortonworks Schema Registry:
server1:port1[,server2[:port2]]
Пример:
192.168.1.100:7788,192.168.1.101:7788
Задача данных подключится к первому доступному хосту.
Проверка подлинности метод
Выберите один из следующих вариантов аутентификации Schema Registry:
- Отсутствует: Аутентификация отсутствует.
-
Kerberos: Выберите для аутентификации с использованием Kerberos.
Примечание к информацииДля использования аутентификации Kerberos в Linux должен быть установлен пакет клиента Kerberos (рабочей станции).
- Принципал: Принципал Kerberos, используемый для аутентификации в Schema Registry.
-
Файл keytab: Перейдите к файлу keytab. При нажатии кнопки Сохранить файл будет загружен в Qlik Talend Cloud и развернут на любом Шлюз движения данных, выбранном в настройках коннектора.
-
Сертификат: Выберите для аутентификации с использованием сертификата.
Примечание к информацииЭта опция поддерживается только при публикации в Confluent Schema Registry.
Если вы выберете эту опцию, вам также потребуется предоставить следующую информацию:
- Файл открытого ключа: Перейдите к файлу открытого ключа в формате PEM. При нажатии Сохранить файл будет загружен в Qlik Talend Cloud и развернут в Шлюз движения данных, выбранном в настройках коннектора.
- Файл закрытого ключа: Перейдите к файлу закрытого ключа в формате PEM. При нажатии Сохранить файл будет загружен в Qlik Talend Cloud и развернут в Шлюз движения данных, выбранном в настройках коннектора.
- Пароль закрытого ключа: Пароль для файла закрытого ключа.
-
Имя пользователя и пароль: Выберите для аутентификации с именем пользователя и паролем. Затем введите свои учетные данные в полях Имя пользователя и пароль .
Примечание к информацииЭта опция поддерживается только при публикации в реестр схем Confluent.
-
Сертификат + Имя пользователя и пароль: Выберите для аутентификации с использованием как сертификата, так и имени пользователя и пароля.
Если этот параметр выбран, введите необходимую информацию в поля Файл открытого ключа, Файл закрытого ключа, Пароль закрытого ключа, Имя пользователя и Пароль , описанные выше.
Примечание к информацииЭта опция поддерживается только при публикации в Confluent Schema Registry.
- Включить TLS (поддерживает TLS 1.0, 1.1 и 1.2): Выберите эту опцию для шифрования данных между машиной Шлюз движения данных и сервером(ами) Schema Registry. Если серверы настроены на требование SSL, то вы должны выбрать эту опцию.
Файл ЦС: Укажите путь к сертификату ЦС в формате PEM. При нажатии Сохранить файл будет загружен в Qlik Talend Cloud и развернут на Шлюз движения данных, выбранном в настройках коннектора.
Внутренние свойства
Внутренние свойства предназначены для особых вариантов использования и поэтому не отображаются в диалоговом окне. Их можно использовать только по указанию службы поддержки Qlik.
Используйте кнопки и
справа от полей, чтобы при необходимости добавить или удалить свойства.
Имя
Отображаемое имя подключения.
Предварительные требования
Прежде чем использовать Kafka в качестве цели, должны быть выполнены следующие предварительные условия:
-
Открытые порты TCP для всех брокеров с Шлюз движения данных машины
-
Настройте разрешения, которые позволят задаче данных записывать в целевые топики. Один из способов сделать это — использовать скрипт Kafka ACLs (kafka-acls).
-
Либо создайте топик с именем attrep_apply_exceptions перед запуском задачи данных, либо настройте брокеры с помощью auto.create.topics.enable=true.
Обратите внимание, что если этот топик не существует, задача всегда будет завершаться сбоем при обнаружении ошибки данных, независимо от политики обработки ошибок.
Описание управляющей таблицы attrep_apply_exceptions см. в Применять исключения.
Ограничения и замечания
При определении задачи с Kafka в качестве целевого коннектора применяются следующие ограничения:
-
Неподдерживаемые параметры задачи:
-
Целевой коннектор Kafka не поддерживает неограниченный размер LOB. Поэтому при перемещении данных из исходных таблиц со столбцами LOB не выбирайте параметр Разрешить неограниченный размер LOB.
-
Режим Сохранить изменения не поддерживается.
-
Режим применения изменений «Пакетная оптимизация» не поддерживается. Kafka всегда работает в режиме применения изменений «Транзакционная».
- Параметр применения изменений «Игнорировать ALTER» не поддерживается для изменений типов исходных данных и переименования таблиц.
- Опция «Удалить и создать таблицу» поля «Подготовка целевой таблицы» не поддерживается.
- Опция «Усечь перед загрузкой» поля «Подготовка целевой таблицы» не поддерживается.
- Таблица управления изменением секционирования данных не поддерживается.
-
- Общие ограничения:
Имена разделов Kafka не могут содержать пробелы или превышать 255 символов (249 начиная с Kafka 0.10) и могут содержать только следующие символы:
a-z|A-Z|0-9|. (точка)|_ (нижнее подчеркивание)|- (минус)
Если имена исходных таблиц превышают максимально допустимую длину или содержат неподдерживаемые символы, вам необходимо либо изменить имена перед запуском задачи, либо определить глобальное преобразование.
Имена столбцов должны начинаться с [A-Za-z_] (буквы или символ подчеркивания), за которым следует [A-Za-z0-9_] (буквы, цифры или символ подчеркивания). Например, _Test_ является допустимым именем столбца, тогда как &Test — нет.
Если имя исходного столбца не соответствует этому правилу, то для переименования столбца следует использовать преобразование.
- Удаление или переименование исходной таблицы не поддерживается.
Типы данных
В следующей таблице показаны типы данных Kafka, которые поддерживаются при использовании Qlik Cloud, и сопоставление по умолчанию из типов данных Qlik Cloud.
При использовании формата сообщений JSON двоичные значения представляются в виде шестнадцатеричных цифр.
| Типы данных Qlik Cloud | Целевые типы данных Kafka в сообщениях схемы |
|---|---|
|
DATE |
DATE |
|
TIME |
TIME |
|
DATETIME |
DATETIME |
|
BYTES |
BYTES (длина) |
|
BLOB |
BLOB |
|
REAL4 |
REAL4 (7) |
|
REAL8 |
REAL8 (14) |
|
INT1 |
INT1 (3) |
|
INT2 |
INT2 (5) |
|
INT4 |
INT4 (10) |
|
INT8 |
INT8 (19) |
|
UINT1 |
UINT1 (3) |
|
UINT2 |
UINT2 (5) |
|
UINT4 Примечание к информации
Значения более 2^31-1 не поддерживаются. |
UINT4 (10) |
|
UINT8 Примечание к информации
Значения более 2^63-1 не поддерживаются. |
UINT8 (20) |
|
NUMERIC |
NUMERIC (p,s) |
|
STRING |
STRING (длина) |
|
WSTRING |
STRING (длина) |
|
CLOB |
CLOB |
|
NCLOB |
NCLOB |
|
BOOLEAN |
BOOLEAN (1) |
Сопоставление с JSON и Avro
Формат сообщений Avro использует логические типы для более точного представления типа данных.
Qlik Cloud Типы данных будут сопоставлены только с поддерживаемыми логическими типами данных Avro, если установлен флажок Использовать логические типы данных для определенных типов данных.
| Типы данных Qlik Cloud | JSON | Логические типы данных Avro |
|---|---|---|
|
DATE |
STRING |
DATE Аннотирует Avro INT. |
|
TIME |
STRING |
TIME-MILLIS Аннотирует Avro INT. |
|
TIMESTAMP |
STRING |
TIMESTAMP-MICROS Аннотирует Avro LONG. |
|
STRING |
STRING |
- |
|
WSTRING |
STRING |
- |
|
CLOB |
STRING |
- |
|
NCLOB |
STRING |
- |
|
NUMERIC |
STRING |
DECIMAL (p,s) Аннотирует Avro BYTES. |
|
BYTES |
BYTES |
- |
|
BLOB |
BYTES |
- |
|
REAL4 |
FLOAT |
- |
|
REAL8 |
DOUBLE |
- |
|
INT1 |
INT |
- |
|
INT2 |
INT |
- |
|
INT4 |
INT |
- |
|
UINT1 |
INT |
- |
|
UINT2 |
INT |
- |
|
UINT4 |
Long |
- |
|
INT8 |
Long |
- |
|
UINT8 |
STRING |
DECIMAL (20,0) Аннотирует Avro BYTES. |
|
BOOLEAN |
BOOLEAN |
- |
Работа с Confluent Cloud
В следующем разделе объясняется, как настроить коннектор Kafka для работы с Confluent Cloud.
- Создайте кластер в Confluent Cloud.
- Скопируйте значение сервера начальной загрузки из настроек кластера в поле серверов брокера на вкладке Общие в настройках коннектора.
- В настройках коннектора выберите Имя пользователя и пароль (SASL/PLAIN) из раскрывающегося списка Метод аутентификации.
- На экране доступа к API вашего кластера Confluent создайте пару ключей API.
- Скопируйте ключ и секрет в поля Имя пользователя и Пароль коннектора Kafka соответственно.
- В настройках коннектора Kafka выберите параметр Enable TLS и укажите полный путь к файлу ЦС в поле CA path. Файл ЦС должен быть создан в соответствии с рекомендациями Confluent и должен содержать ЦС, который подписывает все cертификаты Confluent Cloud, а также его корневой подписант ЦС.
Теперь вы готовы к работе с Confluent Cloud.