流数据
载入过程将数据从源传输并将其存储在 Iceberg 表中。来自流数据源的更改会以近乎实时的方式持续应用到存储表中。
载入数据
数据在管道项目中载入,数据集存储在项目设置中定义的 S3 位置。
-
在您的项目中,单击 创建,然后单击 载入数据。
-
为载入添加 任务名称 和可选的 描述。
单击 下一步。
-
选择源连接。
您可以选择现有的流源连接,也可以创建到源的新连接。
有关更多信息,请参阅 连接到数据流
单击 下一步 并按照以下针对您的数据源的说明进行操作。
选择数据
Apache Kafka
该列表显示源连接中定义的集群中可用的 Kafka 主题。
选择主题时,您可以选择特定的数据集。您还可以使用选择规则来包含或排除数据集组:
-
使用 % 作为通配符来定义数据集的选择条件。
如果使用选择规则选择主题,您可以选择是将所有数据集加载到同一个目标表中,还是为每个源主题创建一个单独的目标表:
-
默认情况下,目标 Iceberg 表名称派生自主题名称,其格式符合命名约定,例如,小写、删除空格、将破折号替换为下划线。在 定义目标数据集名称 中,您可以编辑目标表的名称
-
当使用选择规则将多个主题加载到单个表中时,您必须提供目标名称。
-
当使用选择规则并将数据加载到单独的表中(每个主题一个数据集)时,默认目标名称是主题名称。在此阶段,您无法在向导中编辑名称,但这可以在稍后的登陆任务中完成。
-
如果配置了规则以选择要摄取的主题,则如果选中了登陆任务设置中模式演变下的 新主题 > 添加到目标 选项,任何符合规则条件的新主题也将被登陆。
选择一个或多个数据集,或使用选择规则,然后单击 添加。单击 下一步。
Amazon Kinesis
该列表显示源连接中定义的可用 Kinesis 数据流。
选择一个或多个数据集,然后单击 添加。您可以在 选定的数据流 下看到添加的数据集。单击 下一步。
Amazon S3
目录浏览器显示位于源连接的 S3 存储段中的所有目录的列表。
-
选择登陆数据时要包含的目录:
-
对于每个目录,在 添加路径 中,输入路径和文件名模式:
-
使用 * 作为通配符以匹配任何字符。
-
要输入日期模式,请使用 <yyyy> 作为四位数年份占位符,<MM> 作为两位数月份占位符,<dd> 作为两位数日期占位符,<HH> 作为两位数小时占位符。例如:
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
单击 预览 以打开 预览数据 对话框。将显示包含和排除的文件的列表。
-
单击 验证 以检查路径和文件名模式是否正确且有效。
-
在 定义目标数据集名称 中,提供一个名称以将主题映射到目标 Iceberg 表。单击 下一步。
选择内容类型
选择源事件内容类型。
-
在 选择数据事件的类型 中选择您要摄取的事件类型。
-
有关更多信息,请参阅 连接到数据流。
所选的内容类型适用于所有主题、数据集或数据事件。您必须为您想要摄取的每种内容类型创建一个新任务。
-
展开 验证事件是否正确加载 以确认数据可以被解析。最好在此阶段确保数据正确,否则您需要重新创建管道并再次加载数据。使用 选择数据集 检查特定的数据集并查看任何可能影响数据加载的警告。单击任何结构列旁边的眼睛图标以查看数据。
-
单击 下一步。
设置摄取属性
配置管道的设置:
-
读取数据自
-
从最早的事件开始:摄取所有历史数据。
-
从现在开始:摄取从管道启动时到达的新数据。
-
-
列取消嵌套
-
保留嵌套列:不应用任何转换。
-
取消嵌套到单独的列中:数据被拆分到单独的列中。
-
-
新数据集的加载设置
-
仅追加:通常是事件数据的最佳选项,因为它通常生命周期短且不更新,例如,订单。
-
应用更改:这最适合随时间更新的数据,例如,客户。根据关键字段更新现有记录并插入新记录。稍后在定义任务时,您将需要指定关键字段。
-
-
目标表分区
目标表分区选项适用于管道中的所有表。您稍后可以在表级别覆盖此设置以定义自定义分区。
信息注释仅当在 加载设置 中选择 仅追加 时,此选项才可用。-
无分区:创建表时不进行任何分区。
-
按事件摄取日期分区:表按事件摄取的日期进行分区。
信息注释当此选项与 hdr__from_timestamp 标题列选项一起选择时,hdr__from_timestamp 将用作默认分区列。有关将 hdr__from_timestamp 标题列添加到标准视图的信息,请参阅 表定义。
-
-
数据更改处理
信息注释仅当在 加载设置 中选择 应用更改 时,此选项才可用。-
包含软删除:输入一个表达式以定义要标记为删除的记录。
-
创建历史数据存储(类型 2):这将保留已更改记录的先前版本。
-
-
单击 下一步。
摘要
摘要屏幕提供管道的可视化显示:
-
(可选)对于流登陆和流转换任务,您可以单击 编辑名称和描述 以提供新值。
-
选择 创建管道后 您希望发生的操作的选项。
-
配置完所有设置后,单击 创建 以创建流管道。
-
显示项目时,您可以准备并运行每个任务以开始摄取数据。
-
准备并运行流登陆任务。
有关更多信息,请参阅 将流数据登陆到 Qlik Open Lakehouse。
-
准备并运行流转换任务。
有关更多信息,请参阅 存储流式数据集。
-
数据类型映射
初始源模式基于在创建管道项目时 PREPARE 阶段之前获取的数据样本,并且模式演变在读取时处理。不支持 STRUCT 和 ARRAY 的镜像任务和其他下游任务使用 JSON 类型。可以使用 SQL 解析数据。
以下数据类型映射适用于所有支持的数据源,但根据源文件类型而有所不同,应注意以下几点:
-
数据类型是从正在载入的数据样本中推断出来的。例如,如果样本中的字段仅包含整数值,则在流登陆和转换任务中将其创建为 INT8。如果后续数据包含双精度小数值,则登陆文件将包含这些值;但是,在流转换任务中,如果 更改字段数据类型 设置为 忽略,则该列仍为 INT8,并且小数值将被截断。为避免意外截断,请确保样本数据在载入之前包含预期值的完整范围,或者在早期阶段将 更改字段数据类型 配置为 停止任务,并根据需要调整数据类型。
-
如果将字段添加到源中的结构中,它将始终添加到登陆目标中。对于流转换,将根据在 流转换任务设置 > 模式演变 > 将字段添加到结构(应用到目标、忽略、停止任务)中选择的选项应用该行为。
-
如果特定记录中缺少字段,或者数组为空,则它们将被视为 null。
-
如果数据集被数组展平,并且到达的记录中该数组为空或 null,则系统将创建一行,并且展平的字段为 null。它不会被自动排除。如果您想排除这些行,请手动添加过滤器,例如,array_element IS NOT NULL。
-
UI 中显示的数据类型反映了所选的数据集粒度。对于展平的数组,显示的是单个元素的数据类型,而不是数组结构本身。
-
不能在嵌套 JSON 字段内的结构中添加新属性,只能在根级别添加。
-
在流转换任务中,仅支持对单层数组进行展平。当展平应用于多层数组时,例如,ARRAY<ARRAY<STRUCT>>,仅展平外层数组,结果为 ARRAY<STRUCT>,而不是完全展平的 STRUCT。此外,当前的 UI 仅允许在列级别配置展平。因此,选择多层数组会隐式地仅将展平应用于第一层数组。
-
当您引用基元数组时,如果粒度是数组,则使用元素的数据类型。否则使用数组数据类型。
在此示例中,OrderDetails 具有数据类型为 INT 的 CustomerID 数组。如果粒度为 OrderDetails.CustomerID,则 OrderDetails.CustomerID 表示 INT;如果粒度为 OrderDetails,则表示 ARRAY<INT>。
JSON
在 JSON 文件中,源中的数值决定了目标数据类型:
-
INT8 用于适合支持的整数范围且不包含小数部分的整数值。
-
当值包含小数部分(浮点数)时,使用 REAL8 (DOUBLE)。
-
当数值超过支持的最大整数范围时,使用 STRING。
数据类型映射如下:
| 源数据类型 | Qlik Talend Data Integration 数据类型 |
|---|---|
| STRING | STRING |
| NUMBER | INT8 |
| NUMBER | REAL8 |
| NUMBER | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| OBJECT | STRUCT |
CSV、TSV、REGEX 和 SPLIT
默认情况下,所有源数据类型都摄取为字符串。使用 自动推断类型 选项,将源类型和目标类型映射如下:
| 源数据类型 | Qlik 数据类型 |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | 格式为 yyyy-MM-dd HH:mm:ss 或 yyyy-MM-ddTHH:mm:ssz 的时间戳被解析为 datetime 类型。如果包含时区,则该值被解析为字符串。 |
Parquet
Parquet 文件支持物理和逻辑数据类型。物理数据类型定义值在磁盘上的存储方式,例如 INT32、DOUBLE 或 BYTE_ARRAY。逻辑数据类型在物理表示之上提供语义含义,例如,标识整数值是否表示日期。当逻辑类型附加到 Parquet 列并在 Qlik Open Lakehouse 中受支持(如下所列)时,流登陆任务在定义目标模式时使用逻辑类型,而不是底层的物理类型。这可确保正确解释数据,保留预期的语义(例如精确度、小数位数和时间含义),并在将数据写入下游格式时产生更准确的模式。
源自 Parquet 文件的数据映射如下:
| 源数据类型 | 逻辑类型 | Qlik Talend Data Integration 数据类型 |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTE_ARRAY | STRING(编码为 Base64) | |
| FIXED_LEN_BYTE_ARRAY | STRING(编码为 Base64) | |
| BYTE_ARRAY | STRING | STRING |
| BYTE_ARRAY | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| FIXED_LEN_BYTE_ARRAY | DECIMAL | INT8/REAL8(编码为 Base64) |
| BYTE_ARRAY | DECIMAL | INT8/REAL8(编码为 Base64) |
| INT32 | DATE | DATE |
| INT32 | TIME(MILLIS,true) | INT8 |
| INT64 | TIME(MICROS,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| NESTED TYPES | STRUCT | |
| LIST | ARRAY | |
| MAP | ARRAY<STRUCT>。表示键值对的结构数组。 |
Avro
以下映射适用于具有模式注册表的 Avro 文件。
| 源数据类型 | 逻辑类型 | Qlik Talend Data Integration 数据类型 |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| LONG | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| RECORD | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | ARRAY<STRUCT> | |
| UNION | ||
| FIXED | STRING | |
| BYTES | DECIMAL | DECIMAL |
| FIXED | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | TIME-MILLIS | INT8 |
| INT | TIME-MICROS | TIME |
| LONG | TIMESTAMP-MILLIS | DATETIME |
| LONG | TIMESTAMP-MICROS | DATETIME |
ORC
以下映射适用于 ORC 文件。
| 源数据类型 | Qlik Talend Data Integration 数据类型 |
|---|---|
| BOOLEAN | BOOLEAN |
| BYTE | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| LONG | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| LIST | ARRAY |
| MAP | ARRAY<STRUCT>。表示键值对的结构数组。 |
| STRUCT | STRUCT |
| UNION |
限制和注意事项
-
如果结构或数组在登陆中被自动模式演变修改,则可能需要更新非由 Qlik Talend Cloud 流任务创建的下游视图,以免过时。
-
如果任务存在解析错误,它将不会进入错误状态,也不会被标记为需要注意。由于解析错误是一个不断增加的指标,因此没有错误状态的退出标准。
-
仅当没有任务使用该集群功能时,才允许删除该功能。
-
对具有相同主键的记录的更新和删除不得跨越分区边界,也就是说,它们需要映射到同一个分区。
-
如果源包含大量列,则在任务和目录中仅显示按频率排名的前 500 列。所有列都保存到 S3 登陆中的 Avro 文件中,但只有前 500 列存储在 Iceberg 表中。在模式演变中,如果添加了新列,即使它很频繁,也不会添加到前列中。