Потоковые данные
Процесс адаптации передает данные из источника и сохраняет их в таблицах Iceberg. Изменения из потоковых источников данных постоянно применяются к таблицам хранилища почти в реальном времени.
Подключенные данные
Данные подключаются в рамках проекта конвейера, а наборы данных хранятся в местоположении S3, определенном в настройках проекта.
-
В проекте щелкните Создать, затем Подключить данные.
-
Добавьте Имя задачи и необязательное Описание для подключения.
Нажмите Далее.
-
Выберите подключение источника.
Можно выбрать существующее подключение к источнику потоковой передачи или создать новое подключение к источнику.
Для получения дополнительной информации см. Подключение к потокам данных
Нажмите Далее и следуйте приведенным ниже инструкциям для вашего источника данных.
Выбор данных
Apache Kafka
В списке отображаются доступные разделы Kafka из кластера, определенного в исходном подключении.
При выборе разделов можно выбрать определенные наборы данных. Можно также использовать правила выбора для включения или исключения групп наборов данных:
-
Используйте % в качестве знака подстановки для определения критериев выбора для наборов данных.
Если разделы выбраны с помощью правил выбора, вы можете выбрать, загружать ли все наборы данных в одну и ту же целевую таблицу или создавать отдельную целевую таблицу для каждого исходного раздела:
-
По умолчанию имя целевой таблицы Iceberg формируется из имени раздела, отформатированное в соответствии с соглашениями об именовании, например, в нижнем регистре, с удаленными пробелами, с заменой дефисов на символы подчеркивания. В Определение имени целевого набора данных можно изменить имя целевой таблицы
-
Когда правила выбора используются для загрузки нескольких разделов в одну таблицу, необходимо указать целевое имя.
-
Когда используются правила выбора и данные загружаются в отдельные таблицы (один набор данных на раздел), целевыми именами по умолчанию являются имена разделов. На этом этапе вы не можете редактировать имена в мастере, но это можно сделать позже в задаче промежуточного хранения.
-
Если правило настроено для выбора разделов для приема, любые новые разделы, соответствующие критериям правила, также будут помещены в промежуточное хранение, если установлен флажок Новый раздел > Добавить в цель в параметрах эволюции схемы в настройках задачи промежуточного хранения.
Выберите один или несколько наборов данных или используйте правило выбора, а затем нажмите Добавить. Нажмите Далее.
Amazon Kinesis
В списке отображаются доступные потоки Kinesis, определенные в исходном подключении.
Выберите один или несколько наборов данных и нажмите Добавить. Добавленные наборы данных отображаются в разделе Выбранные потоки. Нажмите Далее.
Amazon S3
В браузере каталогов отображается список всех каталогов, расположенных в S3-блоке исходного подключения.
-
Выберите каталоги, которые нужно включить при промежуточном хранении данных:
-
Для каждого каталога в Добавить путь введите путь и шаблон имени файла:
-
Используйте * как знак подстановки для сопоставления любого символа.
-
Чтобы ввести шаблон даты, используйте <yyyy> как заполнитель для четырехзначного года, <MM> как заполнитель для двухзначного месяца, <dd> как заполнитель для двухзначного дня и <HH> как заполнитель для двухзначного часа. Например:
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
Нажмите Предварительный просмотр, чтобы открыть диалоговое окно Предварительный просмотр данных. Отображается список включенных и исключенных файлов.
-
Нажмите Проверить, чтобы убедиться, что пути и шаблоны имен файлов верны и функциональны.
-
В разделе Определить имя целевого набора данных укажите имя, чтобы сопоставить раздел с целевой таблицей Iceberg. Нажмите Далее.
Выбор типа содержимого
Выберите тип содержимого событий источника.
-
Выберите тип событий, которые вы загружаете в Выберите тип событий данных.
-
Для получения дополнительной информации см. раздел Подключение к потокам данных.
Выбранный тип содержимого применяется ко всем темам, наборам данных или событиям данных. Необходимо создать новую задачу для каждого типа содержимого, который вы хотите загрузить.
-
Разверните Проверьте правильность загрузки событий, чтобы убедиться, что данные могут быть проанализированы. Желательно убедиться, что данные на этом этапе верны, в противном случае вам придется воссоздать конвейер и загрузить данные снова. Используйте Выбрать набор данных, чтобы изучить конкретные наборы данных и проверить любые предупреждения, которые могут повлиять на загрузку данных. Нажмите значок глаза рядом с любыми столбцами структуры, чтобы просмотреть данные.
-
Нажмите Далее.
Настройка свойств приема данных
Настройте параметры для вашего конвейера:
-
Прочитать данные из
-
Начать с самого раннего события: загрузить все исторические данные.
-
Начать сейчас: загружать новые данные, поступающие с момента запуска конвейера.
-
-
Развертка по столбцам
-
Сохранить вложенные столбцы: преобразования не применяются.
-
Развернуть в отдельные столбцы: данные разделяются на отдельные столбцы.
-
-
Настройки загрузки для новых наборов данных
-
Только добавление: как правило, лучший вариант для данных о событиях, поскольку они обычно имеют короткий срок жизни и не обновляются, например, Заказы.
-
Применить изменения: это лучше всего подходит для данных, которые обновляются со временем, например, Клиенты. Обновляются существующие записи и вставляются новые записи на основе ключевых полей. Позже вам потребуется указать ключевые поля при определении задачи.
-
-
Разделение целевой таблицы
Параметр разделения целевой таблицы применяется ко всем таблицам в конвейере. Вы можете переопределить это позже на уровне таблицы, чтобы определить пользовательское разделение.
Примечание к информацииЭтот параметр доступен только при выборе Только добавление в Настройках загрузки.-
Без разделения: таблицы будут созданы без разделов.
-
Разделение по дате приема событий: таблицы разбиваются на разделы по дате появления событий в системе.
-
-
Изменение данных
Примечание к информацииЭтот параметр доступен только при выборе Применить изменения в Настройках загрузки.-
Включить мягкие удаления: Введите выражение, чтобы определить, какие записи пометить для удаления.
-
Создать хранилище исторических данных (Тип 2): Это позволит сохранить предыдущие версии измененных записей.
-
-
Нажмите Далее.
Сводка
Экран сводки предоставляет визуальное отображение вашего конвейера:
-
При желании для задачи потокового промежуточного хранения и Streaming Transform можно нажать Изменить имя и описание, чтобы ввести новые значения.
-
Выберите вариант того, что должно произойти После создания конвейера.
-
После настройки всех параметров нажмите Создать, чтобы создать потоковый конвейер.
-
Когда проект отобразится, вы сможете подготовить и запустить каждую задачу, чтобы начать прием данных.
-
Подготовьте и запустите задачу потокового промежуточного хранения.
Для получения дополнительной информации см. раздел Выгрузка потоковых данных в Открытое озеро данных Qlik.
-
Подготовьте и запустите задачу потокового преобразования.
Для получения дополнительной информации см. раздел Хранение потоковых наборов данных.
-
Сопоставления типов данных
Исходная схема основана на выборке данных, взятой до фазы PREPARE при создании проекта конвейера, а эволюция схемы обрабатывается во время чтения. Задачи зеркалирования и другие последующие задачи, которые не поддерживают STRUCT и ARRAY, используют тип JSON. Данные можно анализировать с помощью SQL.
Нижеследующие сопоставления типов данных применимы ко всем поддерживаемым источникам данных, но различаются в зависимости от типа исходного файла, и следует отметить следующее:
-
Типы данных выводятся из образца загружаемых данных. Например, если поле содержит только целочисленные значения в образце, оно создается как INT8 в задачах потокового промежуточного хранения и преобразования. Если последующие данные включают дробные значения двойной точности, файлы промежуточного хранения содержат эти значения; однако в задаче преобразования потока, если для параметра Изменить тип данных поля установлено значение Игнорировать, столбец остается INT8, а дробные значения усекаются. Чтобы избежать непреднамеренного усечения, убедитесь, что данные образца включают полный диапазон ожидаемых значений перед подключением, или настройте Изменить тип данных поля на Остановить задачу на ранних этапах и при необходимости скорректируйте типы данных.
-
Если поле добавляется в структуру в источнике, оно всегда добавляется в целевой объект промежуточного хранения. Для потокового преобразования поведение применяется в соответствии с опцией, выбранной в Настройки задачи потокового преобразования > Эволюция схемы > Добавить поля в структуру (Применить к целевому объекту, Игнорировать, Остановить задачу).
-
Если поле отсутствует в определенной записи или массив пуст, они рассматриваются как null.
-
Если набор данных сглаживается массивом, и поступает запись, где этот массив пуст или null, система создает одну строку, и сглаженное поле становится null. Это не исключается автоматически. Если вы хотите исключить эти строки, вручную добавьте фильтр, например, array_element IS NOT NULL.
-
Типы данных, отображаемые в пользовательском интерфейсе, отражают выбранную детализацию набора данных. Для выпрямленных массивов отображается тип данных отдельного элемента, а не сама структура массива.
-
Новый атрибут нельзя добавить внутри структуры вложенного поля JSON, только на корневом уровне.
-
В задачах преобразования потоковой передачи поддерживается сглаживание только для одного уровня массива. Когда сглаживание применяется к многоуровневому массиву, например, ARRAY<ARRAY<STRUCT>>, сглаживается только внешний массив, что приводит к ARRAY<STRUCT>, а не к полностью сглаженной структуре STRUCT. Кроме того, текущий пользовательский интерфейс позволяет настраивать сглаживание только на уровне столбца. В результате выбор многоуровневого массива неявно применяет сглаживание только к первому уровню массива.
-
При ссылке на массив примитивов используется тип данных элемента, если детализация является массивом. В противном случае используется тип данных массива.
В этом примере OrderDetails содержит массив CustomerID типа данных INT. OrderDetails.CustomerID означает INT, если детализация — OrderDetails.CustomerID, и ARRAY<INT>, если детализация — OrderDetails.
JSON
В файлах JSON числовое значение в источнике определяет целевой тип данных:
-
INT8 используется для целочисленных значений, которые находятся в пределах поддерживаемого диапазона целых чисел и не содержат дробной части.
-
REAL8 (DOUBLE) используется, когда значение содержит дробную часть (число с плавающей запятой).
-
STRING используется, когда числовое значение превышает максимально поддерживаемый диапазон целых чисел.
Типы данных сопоставляются следующим образом:
| Типы исходных данных | Qlik Talend Data Integration типов данных |
|---|---|
| STRING | STRING |
| NUMBER | INT8 |
| NUMBER | REAL8 |
| NUMBER | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| OBJECT | STRUCT |
CSV, TSV, REGEX и SPLIT
По умолчанию все исходные типы данных импортируются как строка. Используйте параметр Автоматически определять типы для сопоставления исходных и целевых типов следующим образом:
| Типы исходных данных | Qlik типов данных |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | Метки времени в формате yyyy-MM-dd HH:mm:ss или yyyy-MM-ddTHH:mm:ssz анализируются как тип datetime. Если включен часовой пояс, значение анализируется как строка. |
Parquet
Файлы Parquet поддерживают физические и логические типы данных. Физические типы данных определяют, как значения хранятся на диске, например INT32, DOUBLE или BYTE_ARRAY. Логические типы данных обеспечивают семантическое значение поверх физического представления, например, определяя, представляет ли целочисленное значение дату. Когда логический тип прикреплен к столбцу Parquet и поддерживается в Открытое озеро данных Qlik (как указано ниже), задача промежуточного хранения Streaming использует логический тип при определении целевой схемы, а не базовый физический тип. Это гарантирует правильную интерпретацию данных, сохраняет предполагаемую семантику, такую как Πрецизионность, масштаб и временное значение, и приводит к более точным схемам при записи данных в нижестоящие форматы.
Данные, полученные из файлов Parquet, сопоставляются следующим образом:
| Типы исходных данных | Логические типы | Qlik Talend Data Integration типов данных |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| МАССИВ БАЙТОВ | STRING (Закодировано как Base64) | |
| массив байтов фиксированной длины | СТРОКА (Закодировано как Base64) | |
| МАССИВ БАЙТОВ | STRING | STRING |
| МАССИВ БАЙТОВ | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| массив байтов фиксированной длины | DECIMAL | INT8/REAL8 (Закодировано как Base64) |
| МАССИВ БАЙТОВ | DECIMAL | INT8/REAL8 (Закодировано как Base64) |
| INT32 | DATE | DATE |
| INT32 | ВРЕМЯ(МИЛЛИС,true) | INT8 |
| INT64 | ВРЕМЯ(МИКРОС,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| ВЛОЖЕННЫЕ ТИПЫ | STRUCT | |
| СПИСОК | ARRAY | |
| MAP | МАССИВ<СТРУКТУРА>. Массив структур, представляющих пары «ключ-значение». |
Avro
Следующие сопоставления применяются к файлам Avro с реестром схем.
| Типы исходных данных | Логические типы | Qlik Talend Data Integration типов данных |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| Long | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| ЗАПИСЬ | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | МАССИВ<СТРУКТУРА> | |
| ОБЪЕДИНЕНИЕ | ||
| ФИКСИРОВАННЫЙ | STRING | |
| BYTES | DECIMAL | DECIMAL |
| ФИКСИРОВАННЫЙ | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | TIME-MILLIS | INT8 |
| INT | TIME-MICROS | TIME |
| Long | TIMESTAMP-MILLIS | DATETIME |
| Long | TIMESTAMP-MICROS | DATETIME |
ORC
Следующие сопоставления применяются к файлам ORC.
| Типы исходных данных | Qlik Talend Data Integration типов данных |
|---|---|
| BOOLEAN | BOOLEAN |
| БАЙТ | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| Long | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| СПИСОК | ARRAY |
| MAP | МАССИВ<СТРУКТУРА>. Массив структур, представляющих пары ключ-значение. |
| STRUCT | STRUCT |
| ОБЪЕДИНЕНИЕ |
Ограничения и замечания
-
Если структура или массив изменяются автоматическим развитием схемы в промежуточном хранении, нижестоящие представления, которые не были созданы задачей Qlik Talend Cloud потоковой передачи, возможно, потребуется обновить, чтобы они не устарели.
-
Если в задаче есть ошибки синтаксического анализа, она не перейдет в состояние ошибки и не будет помечена как требующая внимания. Поскольку ошибки синтаксического анализа являются постоянно растущим показателем, критерий выхода из состояния ошибки отсутствует.
-
Удаление возможности кластера разрешено только в том случае, если нет задач, использующих эту возможность.
-
Обновления и удаления записи с одним и тем же первичным ключом не должны пересекать границу раздела, то есть они должны быть сопоставлены с одним и тем же разделом.
-
Если источник содержит большое количество столбцов, в задачах и в каталоге отображаются только 500 наиболее часто встречающихся столбцов. Все столбцы сохраняются в файлах Avro в S3 для промежуточного хранения, но только 500 наиболее часто встречающихся столбцов хранятся в таблицах Iceberg. При эволюции схемы, если добавлен новый столбец, он не будет добавлен в список основных столбцов, даже если он часто встречается.