流数据 | Qlik Cloud帮助
跳到主要内容 跳到补充内容

流数据

载入过程将数据从源传输并将其存储在 Iceberg 表中。来自流数据源的更改会以近乎实时的方式持续应用到存储表中。

载入数据

数据在管道项目中载入,数据集存储在项目设置中定义的 S3 位置。

  1. 在您的项目中,单击 创建,然后单击 载入数据

  2. 为载入添加 任务名称 和可选的 描述

    单击 下一步

  3. 选择源连接。

    您可以选择现有的流源连接,也可以创建到源的新连接。

    有关更多信息,请参阅 连接到数据流

    单击 下一步 并按照以下针对您的数据源的说明进行操作。

选择数据

Apache Kafka

该列表显示源连接中定义的集群中可用的 Kafka 主题。

选择主题时,您可以选择特定的数据集。您还可以使用选择规则来包含或排除数据集组:

  • 使用 % 作为通配符来定义数据集的选择条件。

如果使用选择规则选择主题,您可以选择是将所有数据集加载到同一个目标表中,还是为每个源主题创建一个单独的目标表:

  • 默认情况下,目标 Iceberg 表名称派生自主题名称,其格式符合命名约定,例如,小写、删除空格、将破折号替换为下划线。在 定义目标数据集名称 中,您可以编辑目标表的名称

  • 当使用选择规则将多个主题加载到单个表中时,您必须提供目标名称。

  • 当使用选择规则并将数据加载到单独的表中(每个主题一个数据集)时,默认目标名称是主题名称。在此阶段,您无法在向导中编辑名称,但这可以在稍后的登陆任务中完成。

  • 如果配置了规则以选择要摄取的主题,则如果选中了登陆任务设置中模式演变下的 新主题  > 添加到目标 选项,任何符合规则条件的新主题也将被登陆。

选择一个或多个数据集,或使用选择规则,然后单击 添加。单击 下一步

Amazon Kinesis

该列表显示源连接中定义的可用 Kinesis 数据流。

选择一个或多个数据集,然后单击 添加。您可以在 选定的数据流 下看到添加的数据集。单击 下一步

Amazon S3

目录浏览器显示位于源连接的 S3 存储段中的所有目录的列表。 

信息注释使用数据模式选择目录可以提高性能。
  • 选择登陆数据时要包含的目录:

    • 对于每个目录,在 添加路径 中,输入路径和文件名模式:

      • 使用 * 作为通配符以匹配任何字符。

      • 要输入日期模式,请使用 <yyyy> 作为四位数年份占位符,<MM> 作为两位数月份占位符,<dd> 作为两位数日期占位符,<HH> 作为两位数小时占位符。例如:

        • MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv

        • MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv

  • 单击 预览 以打开 预览数据 对话框。将显示包含和排除的文件的列表。

  • 单击 验证 以检查路径和文件名模式是否正确且有效。

  • 定义目标数据集名称 中,提供一个名称以将主题映射到目标 Iceberg 表。单击 下一步

选择内容类型

选择源事件内容类型。

  • 选择数据事件的类型 中选择您要摄取的事件类型。

  • 有关更多信息,请参阅 连接到数据流

    所选的内容类型适用于所有主题、数据集或数据事件。您必须为您想要摄取的每种内容类型创建一个新任务。

  • 展开 验证事件是否正确加载 以确认数据可以被解析。最好在此阶段确保数据正确,否则您需要重新创建管道并再次加载数据。使用 选择数据集 检查特定的数据集并查看任何可能影响数据加载的警告。单击任何结构列旁边的眼睛图标以查看数据。

  • 单击 下一步

信息注释如果未检测到数据集的事件,则会在登陆中显示一条消息。当有事件要读取时,您将需要运行该任务,并且列将自动添加。

设置摄取属性

配置管道的设置:

  • 读取数据自

    • 从最早的事件开始:摄取所有历史数据。

    • 从现在开始:摄取从管道启动时到达的新数据。

  • 列取消嵌套

    • 保留嵌套列:不应用任何转换。

    • 取消嵌套到单独的列中:数据被拆分到单独的列中。

  • 新数据集的加载设置

    • 仅追加:通常是事件数据的最佳选项,因为它通常生命周期短且不更新,例如,订单

    • 应用更改:这最适合随时间更新的数据,例如,客户。根据关键字段更新现有记录并插入新记录。稍后在定义任务时,您将需要指定关键字段。

  • 目标表分区

    目标表分区选项适用于管道中的所有表。您稍后可以在表级别覆盖此设置以定义自定义分区。

    信息注释仅当在 加载设置 中选择 仅追加 时,此选项才可用。
    • 无分区:创建表时不进行任何分区。

    • 按事件摄取日期分区:表按事件摄取的日期进行分区。

      信息注释当此选项与 hdr__from_timestamp 标题列选项一起选择时,hdr__from_timestamp 将用作默认分区列。有关将 hdr__from_timestamp 标题列添加到标准视图的信息,请参阅 表定义
  • 数据更改处理

    信息注释仅当在 加载设置 中选择 应用更改 时,此选项才可用。
    • 包含软删除:输入一个表达式以定义要标记为删除的记录。

    • 创建历史数据存储(类型 2):这将保留已更改记录的先前版本。

  • 单击 下一步

摘要

摘要屏幕提供管道的可视化显示:

  • (可选)对于流登陆和流转换任务,您可以单击 编辑名称和描述 以提供新值。

  • 选择 创建管道后 您希望发生的操作的选项。

  • 配置完所有设置后,单击 创建 以创建流管道。

  • 显示项目时,您可以准备并运行每个任务以开始摄取数据。

数据类型映射

初始源模式基于在创建管道项目时 PREPARE 阶段之前获取的数据样本,并且模式演变在读取时处理。不支持 STRUCT 和 ARRAY 的镜像任务和其他下游任务使用 JSON 类型。可以使用 SQL 解析数据。

以下数据类型映射适用于所有支持的数据源,但根据源文件类型而有所不同,应注意以下几点:

  • 数据类型是从正在载入的数据样本中推断出来的。例如,如果样本中的字段仅包含整数值,则在流登陆和转换任务中将其创建为 INT8。如果后续数据包含双精度小数值,则登陆文件将包含这些值;但是,在流转换任务中,如果 更改字段数据类型 设置为 忽略,则该列仍为 INT8,并且小数值将被截断。为避免意外截断,请确保样本数据在载入之前包含预期值的完整范围,或者在早期阶段将 更改字段数据类型 配置为 停止任务,并根据需要调整数据类型。

  • 如果将字段添加到源中的结构中,它将始终添加到登陆目标中。对于流转换,将根据在 流转换任务设置 > 模式演变 > 将字段添加到结构应用到目标忽略停止任务)中选择的选项应用该行为。

  • 如果特定记录中缺少字段,或者数组为空,则它们将被视为 null。

  • 如果数据集被数组展平,并且到达的记录中该数组为空或 null,则系统将创建一行,并且展平的字段为 null。它不会被自动排除。如果您想排除这些行,请手动添加过滤器,例如,array_element IS NOT NULL

  • UI 中显示的数据类型反映了所选的数据集粒度。对于展平的数组,显示的是单个元素的数据类型,而不是数组结构本身。

  • 不能在嵌套 JSON 字段内的结构中添加新属性,只能在根级别添加。

  • 在流转换任务中,仅支持对单层数组进行展平。当展平应用于多层数组时,例如,ARRAY<ARRAY<STRUCT>>,仅展平外层数组,结果为 ARRAY<STRUCT>,而不是完全展平的 STRUCT。此外,当前的 UI 仅允许在列级别配置展平。因此,选择多层数组会隐式地仅将展平应用于第一层数组。

  • 当您引用基元数组时,如果粒度是数组,则使用元素的数据类型。否则使用数组数据类型。

    在此示例中,OrderDetails 具有数据类型为 INT 的 CustomerID 数组。如果粒度为 OrderDetails.CustomerID,则 OrderDetails.CustomerID 表示 INT;如果粒度为 OrderDetails,则表示 ARRAY<INT>。

JSON

在 JSON 文件中,源中的数值决定了目标数据类型:

  • INT8 用于适合支持的整数范围且不包含小数部分的整数值。

  • 当值包含小数部分(浮点数)时,使用 REAL8 (DOUBLE)

  • 当数值超过支持的最大整数范围时,使用 STRING

数据类型映射如下:

源数据类型 Qlik Talend Data Integration 数据类型
STRING STRING
NUMBER INT8
NUMBER REAL8
NUMBER STRING
BOOLEAN BOOLEAN
ARRAY ARRAY
OBJECT STRUCT

CSV、TSV、REGEX 和 SPLIT

默认情况下,所有源数据类型都摄取为字符串。使用 自动推断类型 选项,将源类型和目标类型映射如下:

源数据类型 Qlik 数据类型
NUMERIC INT8/REAL8
True/TRUE/true/False/FALSE/false BOOLEAN
TIMESTAMP 格式为 yyyy-MM-dd HH:mm:ssyyyy-MM-ddTHH:mm:ssz 的时间戳被解析为 datetime 类型。如果包含时区,则该值被解析为字符串。

Parquet

Parquet 文件支持物理和逻辑数据类型。物理数据类型定义值在磁盘上的存储方式,例如 INT32、DOUBLE 或 BYTE_ARRAY。逻辑数据类型在物理表示之上提供语义含义,例如,标识整数值是否表示日期。当逻辑类型附加到 Parquet 列并在 Qlik Open Lakehouse 中受支持(如下所列)时,流登陆任务在定义目标模式时使用逻辑类型,而不是底层的物理类型。这可确保正确解释数据,保留预期的语义(例如精确度、小数位数和时间含义),并在将数据写入下游格式时产生更准确的模式。

源自 Parquet 文件的数据映射如下:

源数据类型 逻辑类型 Qlik Talend Data Integration 数据类型
BOOLEAN BOOLEAN
INT32 INT8
INT64 INT8
INT96 DATETIME
FLOAT REAL8
DOUBLE REAL8
BYTE_ARRAY STRING(编码为 Base64)
FIXED_LEN_BYTE_ARRAY STRING(编码为 Base64)
BYTE_ARRAY STRING STRING
BYTE_ARRAY ENUM STRING
INT32 DECIMAL INT8
INT64 DECIMAL INT8
FIXED_LEN_BYTE_ARRAY DECIMAL INT8/REAL8(编码为 Base64)
BYTE_ARRAY DECIMAL INT8/REAL8(编码为 Base64)
INT32 DATE DATE
INT32 TIME(MILLIS,true) INT8
INT64 TIME(MICROS,true) TIME
INT64 TIMESTAMP(MICROS,true) DATETIME
INT64 TIMESTAMP(MILLIS,true) DATETIME
NESTED TYPES STRUCT
LIST ARRAY
MAP ARRAY<STRUCT>。表示键值对的结构数组。

Avro

以下映射适用于具有模式注册表的 Avro 文件。

源数据类型 逻辑类型 Qlik Talend Data Integration 数据类型
BOOLEAN BOOLEAN
INT INT8
LONG INT8
FLOAT REAL8
DOUBLE REAL8
BYTES STRING
STRING STRING
RECORD STRUCT
ENUM STRING
ARRAY ARRAY
MAP ARRAY<STRUCT>
UNION
FIXED STRING
BYTES DECIMAL DECIMAL
FIXED DECIMAL DECIMAL
INT DATE DATE
INT TIME-MILLIS INT8
INT TIME-MICROS TIME
LONG TIMESTAMP-MILLIS DATETIME
LONG TIMESTAMP-MICROS DATETIME

ORC

以下映射适用于 ORC 文件。

源数据类型 Qlik Talend Data Integration 数据类型
BOOLEAN BOOLEAN
BYTE INT8
SHORT INT8
INT INT8
LONG INT8
DATE DATE
FLOAT REAL8
DOUBLE REAL8
TIMESTAMP DATETIME
BINARY STRING
DECIMAL REAL8
STRING STRING
VARCHAR STRING
CHAR STRING
LIST ARRAY
MAP ARRAY<STRUCT>。表示键值对的结构数组。
STRUCT STRUCT
UNION

限制和注意事项

  • 如果结构或数组在登陆中被自动模式演变修改,则可能需要更新非由 Qlik Talend Cloud 流任务创建的下游视图,以免过时。

  • 如果任务存在解析错误,它将不会进入错误状态,也不会被标记为需要注意。由于解析错误是一个不断增加的指标,因此没有错误状态的退出标准。

  • 仅当没有任务使用该集群功能时,才允许删除该功能。

  • 对具有相同主键的记录的更新和删除不得跨越分区边界,也就是说,它们需要映射到同一个分区。

  • 如果源包含大量列,则在任务和目录中仅显示按频率排名的前 500 列。所有列都保存到 S3 登陆中的 Avro 文件中,但只有前 500 列存储在 Iceberg 表中。在模式演变中,如果添加了新列,即使它很频繁,也不会添加到前列中。

本页面有帮助吗?

如果您发现此页面或其内容有任何问题 – 打字错误、遗漏步骤或技术错误 – 请告诉我们!