Перейти к основному содержимому Перейти к дополнительному содержимому

Потоковые данные

Процесс адаптации передает данные из источника и сохраняет их в таблицах Iceberg. Изменения из потоковых источников данных постоянно применяются к таблицам хранилища почти в реальном времени.

Подключенные данные

Данные подключаются в рамках проекта конвейера, а наборы данных хранятся в местоположении S3, определенном в настройках проекта.

  1. В проекте щелкните Создать, затем Подключить данные.

  2. Добавьте Имя задачи и необязательное Описание для подключения.

    Нажмите Далее.

  3. Выберите подключение источника.

    Можно выбрать существующее подключение к источнику потоковой передачи или создать новое подключение к источнику.

    Для получения дополнительной информации см. Подключение к потокам данных

    Нажмите Далее и следуйте приведенным ниже инструкциям для вашего источника данных.

Выбор данных

Apache Kafka

В списке отображаются доступные разделы Kafka из кластера, определенного в исходном подключении.

При выборе разделов можно выбрать определенные наборы данных. Можно также использовать правила выбора для включения или исключения групп наборов данных:

  • Используйте % в качестве знака подстановки для определения критериев выбора для наборов данных.

Если разделы выбраны с помощью правил выбора, вы можете выбрать, загружать ли все наборы данных в одну и ту же целевую таблицу или создавать отдельную целевую таблицу для каждого исходного раздела:

  • По умолчанию имя целевой таблицы Iceberg формируется из имени раздела, отформатированное в соответствии с соглашениями об именовании, например, в нижнем регистре, с удаленными пробелами, с заменой дефисов на символы подчеркивания. В Определение имени целевого набора данных можно изменить имя целевой таблицы

  • Когда правила выбора используются для загрузки нескольких разделов в одну таблицу, необходимо указать целевое имя.

  • Когда используются правила выбора и данные загружаются в отдельные таблицы (один набор данных на раздел), целевыми именами по умолчанию являются имена разделов. На этом этапе вы не можете редактировать имена в мастере, но это можно сделать позже в задаче промежуточного хранения.

  • Если правило настроено для выбора разделов для приема, любые новые разделы, соответствующие критериям правила, также будут помещены в промежуточное хранение, если установлен флажок Новый раздел  > Добавить в цель в параметрах эволюции схемы в настройках задачи промежуточного хранения.

Выберите один или несколько наборов данных или используйте правило выбора, а затем нажмите Добавить. Нажмите Далее.

Amazon Kinesis

В списке отображаются доступные потоки Kinesis, определенные в исходном подключении.

Выберите один или несколько наборов данных и нажмите Добавить. Добавленные наборы данных отображаются в разделе Выбранные потоки. Нажмите Далее.

Amazon S3

В браузере каталогов отображается список всех каталогов, расположенных в S3-блоке исходного подключения. 

Примечание к информацииИспользование шаблона данных для выбора каталогов может повысить производительность.
  • Выберите каталоги, которые нужно включить при промежуточном хранении данных:

    • Для каждого каталога в Добавить путь введите путь и шаблон имени файла:

      • Используйте * как знак подстановки для сопоставления любого символа.

      • Чтобы ввести шаблон даты, используйте <yyyy> как заполнитель для четырехзначного года, <MM> как заполнитель для двухзначного месяца, <dd> как заполнитель для двухзначного дня и <HH> как заполнитель для двухзначного часа. Например:

        • MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv

        • MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv

  • Нажмите Предварительный просмотр, чтобы открыть диалоговое окно Предварительный просмотр данных. Отображается список включенных и исключенных файлов.

  • Нажмите Проверить, чтобы убедиться, что пути и шаблоны имен файлов верны и функциональны.

  • В разделе Определить имя целевого набора данных укажите имя, чтобы сопоставить раздел с целевой таблицей Iceberg. Нажмите Далее.

Выбор типа содержимого

Выберите тип содержимого событий источника.

  • Выберите тип событий, которые вы загружаете в Выберите тип событий данных.

  • Для получения дополнительной информации см. раздел Подключение к потокам данных.

    Выбранный тип содержимого применяется ко всем темам, наборам данных или событиям данных. Необходимо создать новую задачу для каждого типа содержимого, который вы хотите загрузить.

  • Разверните Проверьте правильность загрузки событий, чтобы убедиться, что данные могут быть проанализированы. Желательно убедиться, что данные на этом этапе верны, в противном случае вам придется воссоздать конвейер и загрузить данные снова. Используйте Выбрать набор данных, чтобы изучить конкретные наборы данных и проверить любые предупреждения, которые могут повлиять на загрузку данных. Нажмите значок глаза рядом с любыми столбцами структуры, чтобы просмотреть данные.

  • Нажмите Далее.

Примечание к информацииЕсли для набора данных не было обнаружено событий, сообщение отображается в промежуточном хранении. Вам потребуется запустить задачу, когда будут события для чтения, и столбцы будут добавлены автоматически.

Настройка свойств приема данных

Настройте параметры для вашего конвейера:

  • Прочитать данные из

    • Начать с самого раннего события: загрузить все исторические данные.

    • Начать сейчас: загружать новые данные, поступающие с момента запуска конвейера.

  • Развертка по столбцам

    • Сохранить вложенные столбцы: преобразования не применяются.

    • Развернуть в отдельные столбцы: данные разделяются на отдельные столбцы.

  • Настройки загрузки для новых наборов данных

    • Только добавление: как правило, лучший вариант для данных о событиях, поскольку они обычно имеют короткий срок жизни и не обновляются, например, Заказы.

    • Применить изменения: это лучше всего подходит для данных, которые обновляются со временем, например, Клиенты. Обновляются существующие записи и вставляются новые записи на основе ключевых полей. Позже вам потребуется указать ключевые поля при определении задачи.

  • Разделение целевой таблицы

    Параметр разделения целевой таблицы применяется ко всем таблицам в конвейере. Вы можете переопределить это позже на уровне таблицы, чтобы определить пользовательское разделение.

    Примечание к информацииЭтот параметр доступен только при выборе Только добавление в Настройках загрузки.
    • Без разделения: таблицы будут созданы без разделов.

    • Разделение по дате приема событий: таблицы разбиваются на разделы по дате появления событий в системе.

  • Изменение данных

    Примечание к информацииЭтот параметр доступен только при выборе Применить изменения в Настройках загрузки.
    • Включить мягкие удаления: Введите выражение, чтобы определить, какие записи пометить для удаления.

    • Создать хранилище исторических данных (Тип 2): Это позволит сохранить предыдущие версии измененных записей.

  • Нажмите Далее.

Сводка

Экран сводки предоставляет визуальное отображение вашего конвейера:

  • При желании для задачи потокового промежуточного хранения и Streaming Transform можно нажать Изменить имя и описание, чтобы ввести новые значения.

  • Выберите вариант того, что должно произойти После создания конвейера.

  • После настройки всех параметров нажмите Создать, чтобы создать потоковый конвейер.

  • Когда проект отобразится, вы сможете подготовить и запустить каждую задачу, чтобы начать прием данных.

Сопоставления типов данных

Исходная схема основана на выборке данных, взятой до фазы PREPARE при создании проекта конвейера, а эволюция схемы обрабатывается во время чтения. Задачи зеркалирования и другие последующие задачи, которые не поддерживают STRUCT и ARRAY, используют тип JSON. Данные можно анализировать с помощью SQL.

Нижеследующие сопоставления типов данных применимы ко всем поддерживаемым источникам данных, но различаются в зависимости от типа исходного файла, и следует отметить следующее:

  • Типы данных выводятся из образца загружаемых данных. Например, если поле содержит только целочисленные значения в образце, оно создается как INT8 в задачах потокового промежуточного хранения и преобразования. Если последующие данные включают дробные значения двойной точности, файлы промежуточного хранения содержат эти значения; однако в задаче преобразования потока, если для параметра Изменить тип данных поля установлено значение Игнорировать, столбец остается INT8, а дробные значения усекаются. Чтобы избежать непреднамеренного усечения, убедитесь, что данные образца включают полный диапазон ожидаемых значений перед подключением, или настройте Изменить тип данных поля на Остановить задачу на ранних этапах и при необходимости скорректируйте типы данных.

  • Если поле добавляется в структуру в источнике, оно всегда добавляется в целевой объект промежуточного хранения. Для потокового преобразования поведение применяется в соответствии с опцией, выбранной в Настройки задачи потокового преобразования > Эволюция схемы > Добавить поля в структуру (Применить к целевому объекту, Игнорировать, Остановить задачу).

  • Если поле отсутствует в определенной записи или массив пуст, они рассматриваются как null.

  • Если набор данных сглаживается массивом, и поступает запись, где этот массив пуст или null, система создает одну строку, и сглаженное поле становится null. Это не исключается автоматически. Если вы хотите исключить эти строки, вручную добавьте фильтр, например, array_element IS NOT NULL.

  • Типы данных, отображаемые в пользовательском интерфейсе, отражают выбранную детализацию набора данных. Для выпрямленных массивов отображается тип данных отдельного элемента, а не сама структура массива.

  • Новый атрибут нельзя добавить внутри структуры вложенного поля JSON, только на корневом уровне.

  • В задачах преобразования потоковой передачи поддерживается сглаживание только для одного уровня массива. Когда сглаживание применяется к многоуровневому массиву, например, ARRAY<ARRAY<STRUCT>>, сглаживается только внешний массив, что приводит к ARRAY<STRUCT>, а не к полностью сглаженной структуре STRUCT. Кроме того, текущий пользовательский интерфейс позволяет настраивать сглаживание только на уровне столбца. В результате выбор многоуровневого массива неявно применяет сглаживание только к первому уровню массива.

  • При ссылке на массив примитивов используется тип данных элемента, если детализация является массивом. В противном случае используется тип данных массива.

    В этом примере OrderDetails содержит массив CustomerID типа данных INT. OrderDetails.CustomerID означает INT, если детализация — OrderDetails.CustomerID, и ARRAY<INT>, если детализация — OrderDetails.

JSON

В файлах JSON числовое значение в источнике определяет целевой тип данных:

  • INT8 используется для целочисленных значений, которые находятся в пределах поддерживаемого диапазона целых чисел и не содержат дробной части.

  • REAL8 (DOUBLE) используется, когда значение содержит дробную часть (число с плавающей запятой).

  • STRING используется, когда числовое значение превышает максимально поддерживаемый диапазон целых чисел.

Типы данных сопоставляются следующим образом:

Типы исходных данных Qlik Talend Data Integration типов данных
STRING STRING
NUMBER INT8
NUMBER REAL8
NUMBER STRING
BOOLEAN BOOLEAN
ARRAY ARRAY
OBJECT STRUCT

CSV, TSV, REGEX и SPLIT

По умолчанию все исходные типы данных импортируются как строка. Используйте параметр Автоматически определять типы для сопоставления исходных и целевых типов следующим образом:

Типы исходных данных Qlik типов данных
NUMERIC INT8/REAL8
True/TRUE/true/False/FALSE/false BOOLEAN
TIMESTAMP Метки времени в формате yyyy-MM-dd HH:mm:ss или yyyy-MM-ddTHH:mm:ssz анализируются как тип datetime. Если включен часовой пояс, значение анализируется как строка.

Parquet

Файлы Parquet поддерживают физические и логические типы данных. Физические типы данных определяют, как значения хранятся на диске, например INT32, DOUBLE или BYTE_ARRAY. Логические типы данных обеспечивают семантическое значение поверх физического представления, например, определяя, представляет ли целочисленное значение дату. Когда логический тип прикреплен к столбцу Parquet и поддерживается в Открытое озеро данных Qlik (как указано ниже), задача промежуточного хранения Streaming использует логический тип при определении целевой схемы, а не базовый физический тип. Это гарантирует правильную интерпретацию данных, сохраняет предполагаемую семантику, такую как Πрецизионность, масштаб и временное значение, и приводит к более точным схемам при записи данных в нижестоящие форматы.

Данные, полученные из файлов Parquet, сопоставляются следующим образом:

Типы исходных данных Логические типы Qlik Talend Data Integration типов данных
BOOLEAN   BOOLEAN
INT32   INT8
INT64   INT8
INT96   DATETIME
FLOAT   REAL8
DOUBLE   REAL8
МАССИВ БАЙТОВ   STRING (Закодировано как Base64)
массив байтов фиксированной длины   СТРОКА (Закодировано как Base64)
МАССИВ БАЙТОВ STRING STRING
МАССИВ БАЙТОВ ENUM STRING
INT32 DECIMAL INT8
INT64 DECIMAL INT8
массив байтов фиксированной длины DECIMAL INT8/REAL8 (Закодировано как Base64)
МАССИВ БАЙТОВ DECIMAL INT8/REAL8 (Закодировано как Base64)
INT32 DATE DATE
INT32 ВРЕМЯ(МИЛЛИС,true) INT8
INT64 ВРЕМЯ(МИКРОС,true) TIME
INT64 TIMESTAMP(MICROS,true) DATETIME
INT64 TIMESTAMP(MILLIS,true) DATETIME
ВЛОЖЕННЫЕ ТИПЫ   STRUCT
СПИСОК   ARRAY
MAP   МАССИВ<СТРУКТУРА>. Массив структур, представляющих пары «ключ-значение».

Avro

Следующие сопоставления применяются к файлам Avro с реестром схем.

Типы исходных данных Логические типы Qlik Talend Data Integration типов данных
BOOLEAN   BOOLEAN
INT   INT8
Long   INT8
FLOAT   REAL8
DOUBLE   REAL8
BYTES   STRING
STRING   STRING
ЗАПИСЬ   STRUCT
ENUM   STRING
ARRAY   ARRAY
MAP   МАССИВ<СТРУКТУРА>
ОБЪЕДИНЕНИЕ    
ФИКСИРОВАННЫЙ   STRING
BYTES DECIMAL DECIMAL
ФИКСИРОВАННЫЙ DECIMAL DECIMAL
INT DATE DATE
INT TIME-MILLIS INT8
INT TIME-MICROS TIME
Long TIMESTAMP-MILLIS DATETIME
Long TIMESTAMP-MICROS DATETIME

ORC

Следующие сопоставления применяются к файлам ORC.

Типы исходных данных Qlik Talend Data Integration типов данных
BOOLEAN BOOLEAN
БАЙТ INT8
SHORT INT8
INT INT8
Long INT8
DATE DATE
FLOAT REAL8
DOUBLE REAL8
TIMESTAMP DATETIME
BINARY STRING
DECIMAL REAL8
STRING STRING
VARCHAR STRING
CHAR STRING
СПИСОК ARRAY
MAP МАССИВ<СТРУКТУРА>. Массив структур, представляющих пары ключ-значение.
STRUCT STRUCT
ОБЪЕДИНЕНИЕ  

Ограничения и замечания

  • Если структура или массив изменяются автоматическим развитием схемы в промежуточном хранении, нижестоящие представления, которые не были созданы задачей Qlik Talend Cloud потоковой передачи, возможно, потребуется обновить, чтобы они не устарели.

  • Если в задаче есть ошибки синтаксического анализа, она не перейдет в состояние ошибки и не будет помечена как требующая внимания. Поскольку ошибки синтаксического анализа являются постоянно растущим показателем, критерий выхода из состояния ошибки отсутствует.

  • Удаление возможности кластера разрешено только в том случае, если нет задач, использующих эту возможность.

  • Обновления и удаления записи с одним и тем же первичным ключом не должны пересекать границу раздела, то есть они должны быть сопоставлены с одним и тем же разделом.

  • Если источник содержит большое количество столбцов, в задачах и в каталоге отображаются только 500 наиболее часто встречающихся столбцов. Все столбцы сохраняются в файлах Avro в S3 для промежуточного хранения, но только 500 наиболее часто встречающихся столбцов хранятся в таблицах Iceberg. При эволюции схемы, если добавлен новый столбец, он не будет добавлен в список основных столбцов, даже если он часто встречается.

Помогла ли вам эта страница?

Если вы обнаружили какую-либо проблему на этой странице или с ее содержанием — будь то опечатка, пропущенный шаг или техническая ошибка, сообщите нам об этом!