流式数据
引入过程将数据从源传输并存储到 Iceberg 表中。流式数据源的更改会近乎实时地持续应用到存储表中。
录入数据
数据在管道项目中引入,数据集存储在项目设置中定义的 S3 位置。
-
在您的项目中,单击创建,然后单击引入数据。
-
为引入添加任务名称和可选的描述。
单击下一步。
-
选择源连接。
您可以选择现有数据流源连接,或创建到源的新连接。
有关更多信息,请参阅连接到数据流
单击 下一步,并按照以下说明操作,针对您的数据源。
选择数据
Apache Kafka
此列表显示源连接中定义的集群提供的可用 Kafka 主题。
选择主题时,您可以选择特定数据集。您也可以使用选择规则来包含或排除数据集的群组:
-
使用 % 作为通配符来定义数据集的选择条件。
如果使用选择规则选择了主题,您可以选择是将所有数据集加载到同一个目标表,还是为每个源主题创建一个单独的目标表:
-
默认情况下,目标 Iceberg 表名派生自主题名称,并根据命名约定进行格式化,例如,小写、删除空格、将破折号替换为下划线。在 定义目标数据集名称 中,您可以编辑目标表的名称
-
当使用选择规则将多个主题加载到单个表中时,您必须提供目标名称。
-
当使用选择规则并将数据加载到单独的表(每个主题一个数据集)中时,默认目标名称是主题名称。在此阶段,您无法在向导中编辑名称,但可以在稍后的登陆任务中完成此操作。
-
如果配置了规则以选择要引入的主题,则如果登陆任务设置中架构演变下的新主题 > 添加到目标选项已选中,任何符合规则条件的新主题也将登陆。
选择一个或多个数据集,或使用选择规则,然后单击添加。单击下一步。
Amazon Kinesis
此列表显示源连接中定义的可用 Kinesis 数据流。
选择一个或多个数据集,然后单击添加。您可以在选定的数据流下查看添加的数据集。单击下一步。
Amazon S3
目录浏览器显示源连接的S3存储段中所有目录的列表。
-
选择登陆数据时要包含的目录:
-
对于每个目录,在添加路径中,输入路径和文件名模式:
-
使用 * 作为通配符来匹配任何字符。
-
要输入日期模式,请使用 <yyyy> 作为四位年份占位符,<MM> 作为两位月份占位符,<dd> 作为两位日期占位符,以及 <HH> 作为两位小时占位符。例如:
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
单击 预览 以打开 预览数据 对话框。显示包含和排除的文件列表。
-
单击 验证 以检查路径和文件名模式是否正确且功能正常。
-
在定义目标数据集名称中,提供一个名称以将主题映射到目标 Iceberg 表。单击下一步。
选择内容类型
选择源事件内容类型。
-
选择您在选择数据事件类型中摄取事件的类型。
-
有关更多信息,请参阅连接到数据流。
所选内容类型适用于所有主题、数据集或数据事件。您必须针对要摄取的每种内容类型创建一个新任务。
-
展开验证事件是否已正确加载以确认数据可以解析。建议在此阶段确保数据正确,否则您需要重新创建管道并再次加载数据。使用选择数据集检查特定数据集并查看可能影响数据加载的任何警告。单击任何结构列旁边的眼睛图标以查看数据。
-
单击下一步。
设置提取属性
配置管道的设置:
-
读取数据自
-
从最早的事件开始:摄取所有历史数据。
-
从现在开始:摄取从管道启动时开始到达的新数据。
-
-
列取消嵌套
-
保留嵌套列:不应用任何转换。
-
解嵌套到单独的列中:数据被拆分为单独的列。
-
-
新数据集的加载设置
-
仅追加:通常是事件数据的最佳选项,因为它通常生命周期短且不会更新,例如订单。
-
应用更改:这最适合随时间更新的数据,例如客户。根据关键字段更新现有记录并插入新记录。稍后定义任务时,您需要指定关键字段。
-
-
目标表分区
目标表分区选项适用于管道中的所有表。您可以在以后在表级别覆盖此设置,以定义自定义分区。
信息注释仅当在加载设置中选择仅追加时,此选项才可用。-
无分区:创建的表不带任何分区。
-
按事件摄入日期分区:表按事件摄入日期分区。
-
-
数据变更处理
信息注释仅当在加载设置中选择应用更改时,此选项才可用。-
包含软删除:输入表达式以定义要标记为删除的记录。
-
创建历史数据存储(类型 2):这将保留已更改记录的先前版本。
-
-
单击下一步。
汇总
摘要屏幕提供管道的可视化显示:
-
(可选)对于流式登陆和流式转换任务,您可以单击编辑名称和描述以提供新值。
-
选择管道创建后要执行的操作选项。
-
配置完所有设置后,单击创建以创建数据流管道。
-
项目显示后,您可以准备并运行每个任务以开始摄取数据。
-
准备并运行流式登陆任务。
有关更多信息,请参阅登陆流数据到 Qlik Open Lakehouse。
-
准备并运行流式转换任务。
有关更多信息,请参阅存储数据流数据集。
-
数据类型映射
初始源架构基于在创建管道项目时 PREPARE 阶段之前获取的数据样本,并且架构演变在读取时处理。不支持 STRUCT 和 ARRAY 的镜像任务及其他下游任务使用 JSON 类型。数据可以使用 SQL 进行解析。
以下数据类型映射适用于所有受支持的数据源,但会根据源文件类型而异,并且应注意以下事项:
-
数据类型是从正在载入的数据样本中推断出来的。例如,如果字段在样本中仅包含整数值,则它在数据流登陆和转换任务中创建为 INT8。如果后续数据包含双精度小数,则登陆文件会包含这些值;但是,在数据流转换任务中,如果更改字段数据类型设置为忽略,则列仍为 INT8,并且小数部分将被截断。为避免意外截断,请确保样本数据在载入前包含所有预期的值范围,或者在早期阶段将更改字段数据类型配置为停止任务,并根据需要调整数据类型。
-
如果源中的结构体添加了字段,则该字段始终会添加到登陆目标。对于数据流转换,将根据在数据流转换任务设置 > 架构演变 > 向结构体添加字段中选择的选项应用行为(应用于目标、忽略、停止任务)。
-
如果特定记录中缺少字段,或者数组为空,则它们被视为 null。
-
如果数据集通过数组进行平展,并且到达的记录中该数组为空或 null,则系统会创建一行,并且平展字段为 null。它不会自动排除。如果您想排除这些行,请手动添加筛选器,例如 array_element IS NOT NULL。
-
UI 中显示的数据类型反映了所选数据集的粒度。对于扁平数组,显示的是单个元素的数据类型,而不是数组结构本身。
-
不能在嵌套 JSON 字段中的结构体内部添加新属性,只能在根级别添加。
-
在数据流转换任务中,仅支持对数组的单个级别进行展平。当展平应用于多级数组时,例如 ARRAY<ARRAY<STRUCT>>,只有外部数组会被展平,从而得到 ARRAY<STRUCT>,而不是完全展平的 STRUCT。此外,当前 UI 仅允许在列级别配置展平。因此,选择多级数组会隐式地将展平仅应用于第一个数组级别。
-
当您引用基元数组时,如果粒度是数组,则使用元素的数据类型。否则,将使用数组数据类型。
在此示例中,OrderDetails 具有数据类型为 INT 的 CustomerID 数组。如果粒度是 OrderDetails.CustomerID,则 OrderDetails.CustomerID 表示 INT;如果粒度是 OrderDetails,则表示 ARRAY<INT>。
JSON
在 JSON 文件中,源中的数值决定了目标数据类型:
-
INT8 用于在支持的整数范围内且不包含小数部分的整数值。
-
REAL8 (DOUBLE) 用于值包含小数部分(浮点数)的情况。
-
STRING 用于数值超出支持的最大整数范围的情况。
数据类型映射如下:
| 源数据类型 | Qlik Talend Data Integration 个数据类型 |
|---|---|
| STRING | STRING |
| 数字 | INT8 |
| 数字 | REAL8 |
| 数字 | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| 'object' | STRUCT |
CSV、TSV、REGEX 和 SPLIT
默认情况下,所有源数据类型都将摄取为字符串。使用选项自动推断类型,将源类型和目标类型映射如下:
| 源数据类型 | Qlik 个数据类型 |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | 格式为yyyy-MM-dd HH:mm:ss或yyyy-MM-ddTHH:mm:ssz的时间戳将被解析为日期时间类型。如果包含时区,则该值将解析为字符串。 |
Parquet
Parquet 文件支持物理和逻辑数据类型。物理数据类型定义了值在磁盘上的存储方式,例如 INT32、DOUBLE 或 BYTE_ARRAY。逻辑数据类型在物理表示之上提供语义,例如,识别整数值是否表示日期。当逻辑类型附加到 Parquet 列并在 Qlik Open Lakehouse 中受支持(如下所列)时,Streaming 登陆任务在定义目标架构时使用逻辑类型,而不是底层物理类型。这确保数据得到正确解释,保留了预期的语义(如精确度、比例和时间含义),并在数据写入下游格式时生成更准确的架构。
从 Parquet 文件获取的数据映射如下:
| 源数据类型 | 逻辑类型 | Qlik Talend Data Integration 个数据类型 |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTE_ARRAY | STRING(编码为 Base64) | |
| FIXED_LEN_BYTE_ARRAY | 字符串(Base64 编码) | |
| BYTE_ARRAY | STRING | STRING |
| BYTE_ARRAY | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| FIXED_LEN_BYTE_ARRAY | DECIMAL | INT8/REAL8(Base64 编码) |
| BYTE_ARRAY | DECIMAL | INT8/REAL8(Base64 编码) |
| INT32 | DATE | DATE |
| INT32 | TIME(MILLIS,true) | INT8 |
| INT64 | TIME(MICROS,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| 嵌套类型 | STRUCT | |
| 列表 | ARRAY | |
| MAP | ARRAY<STRUCT>。表示键值对的结构体数组。 |
Avro
以下映射适用于带有架构注册表的 Avro 文件。
| 源数据类型 | 逻辑类型 | Qlik Talend Data Integration 个数据类型 |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| LONG | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| 记录 | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | 数组<结构体> | |
| 并集 | ||
| 固定 | STRING | |
| BYTES | DECIMAL | DECIMAL |
| 固定 | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | 时间-毫秒 | INT8 |
| INT | TIME-MICROS | TIME |
| LONG | TIMESTAMP-MILLIS | DATETIME |
| LONG | TIMESTAMP-MICROS | DATETIME |
ORC
以下映射适用于 ORC 文件。
| 源数据类型 | Qlik Talend Data Integration 个数据类型 |
|---|---|
| BOOLEAN | BOOLEAN |
| 字节 | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| LONG | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| 列表 | ARRAY |
| MAP | ARRAY<STRUCT>。表示键值对的结构体数组。 |
| STRUCT | STRUCT |
| 并集 |
限制和考虑事项
-
如果结构或数组在登陆中通过自动架构演进进行修改,未由 Qlik Talend Cloud 数据流任务创建的下游视图可能需要更新,以免过时。
-
如果任务存在解析错误,它将不会进入错误状态,也不会被标记为需要关注。由于解析错误是一个持续增长的指标,因此错误状态没有退出标准。
-
仅当没有任务使用该集群功能时,才允许删除该功能。
-
对具有相同主键的记录进行更新和删除不得跨越分区边界,即它们需要映射到同一分区。
-
如果源包含大量列,则在任务和目录中仅显示按频率排序的前 500 列。所有列都保存到 S3 登陆中的 Avro 文件,但只有前 500 列存储在 Iceberg 表中。在架构演变中,如果添加了新列,即使该列很常用,也不会将其添加到热门列中。