Saltar al contenido principal Saltar al contenido complementario

Transmisión constante o streaming de datos

El proceso de incorporación transfiere datos de forma constante desde el origen y los almacena en tablas Iceberg. Los cambios desde las fuentes de datos de streaming se aplican continuamente a las tablas de almacenamiento en tiempo cuasi real.

Incorporar datos

Los datos se incorporan dentro de un proyecto de canalización y los conjuntos de datos se almacenan en la ubicación de S3 definida en la configuración del proyecto.

  1. En su proyecto, haga clic en Crear y después en Aterrizar datos.

  2. Agregue un Nombre de tarea y una Descripción opcional para la incorporación.

    Haga clic en Siguiente.

  3. Seleccione la conexión de origen.

    Puede seleccionar una conexión de streaming de origen o crear una nueva conexión a la fuente.

    Para más información, vea Conexión con flujos de datos o data streams.

    Haga clic en Siguiente y siga las instrucciones a continuación para su fuente de datos.

Seleccionar datos

Apache Kafka

La lista muestra los temas de Kafka disponibles del clúster definido en la conexión de origen.

Al seleccionar sus temas, puede seleccionar conjuntos de datos específicos. También puede utilizar reglas de selección para incluir o excluir grupos de conjuntos de datos:

  • Utilice % como comodín para definir criterios de selección para los conjuntos de datos.

Si los temas se seleccionan mediante reglas de selección, puede elegir si cargar todos los conjuntos de datos en la misma tabla de destino o crear una tabla de destino independiente para cada tema de origen:

  • De manera predeterminada, el nombre de la tabla de destino en Iceberg se deriva del nombre del tema y recibe un formato para cumplir con las convenciones de nomenclatura; por ejemplo, se escribe en minúsculas, se eliminan los espacios y se sustituyen los guiones por guiones bajos. En Definir nombre del conjunto de datos de destino, puede editar el nombre de la tabla de destino

  • Cuando se utilizan reglas de selección para cargar varios temas en una sola tabla, debe proporcionar el nombre de destino.

  • Cuando se utilizan reglas de selección y los datos se cargan en tablas aparte (un conjunto de datos por tema), los nombres de destino predeterminados son los nombres de los temas. En esta etapa, no puede editar los nombres en el asistente, pero esto se puede hacer más adelante en la tarea de destino.

  • Si se configura una regla para seleccionar temas para la ingesta, cualquier tema nuevo que cumpla los criterios de la regla también se destinará si la opción Nuevo tema  > Agregar a destino en la evolución del esquema en la configuración de la tarea de destino está seleccionada.

Seleccione uno o más conjuntos de datos, o utilice una regla de selección, y haga clic en Agregar. Haga clic en Siguiente.

Amazon Kinesis

La lista muestra los streams (o flujos de datos) de Kinesis disponibles definidos en la conexión de origen.

Seleccione uno o más conjuntos de datos y haga clic en Agregar. Puede ver los conjuntos de datos agregados en Streams seleccionados. Haga clic en Siguiente.

Amazon S3

El explorador de directorios muestra una lista de todos los directorios ubicados en el depósito S3 de su conexión de origen. 

Nota informativaEl uso de un patrón de datos para seleccionar directorios puede mejorar el rendimiento.
  • Seleccione los directorios que desea incluir al insertar datos:

    • Para cada directorio, en Agregar ruta, introduzca la ruta y el patrón de nombre de archivo:

      • Use * como comodín para que coincida con cualquier carácter.

      • Para introducir un patrón de fecha, use <aaaa> como marcador de posición para el año de cuatro dígitos, <MM> como marcador de posición para el mes, de dos dígitos, <dd> como marcador de posición para el día, de dos dígitos y <HH> como marcador de posición para la hora de dos dígitos. Por ejemplo:

        • MyDir3/<aaaa>_<MM>_<dd>_<HH>_orders.csv

        • MyDir3/<aaaa>/<MM>/<dd>/<HH>_orders.csv

  • Haga clic en Vista previa para abrir el cuadro de diálogo Vista previa de datos. Se muestra una lista de archivos incluidos y excluidos.

  • Haga clic en Validar para comprobar que las rutas y los patrones de nombres de archivo son correctos y funcionales.

  • En Definir nombre del conjunto de datos de destino, proporcione un nombre para asociar el tema a la tabla de destino de Iceberg. Haga clic en Siguiente.

Seleccionar el tipo de contenido

Elija el tipo de contenido de los eventos de origen.

  • Seleccione el tipo de eventos que está ingiriendo en Elija el tipo de eventos de datos.

  • Para más información, vea Conexión con flujos de datos o data streams.

    El tipo de contenido seleccionado se aplica a todos los temas, conjuntos de datos o eventos de datos. Debe crear una nueva tarea para cada tipo de contenido que desee ingerir.

  • Expanda Verifique que los eventos se cargaron correctamente para confirmar que los datos se pueden analizar. Es una buena idea asegurarse de que los datos sean correctos en esta etapa; de lo contrario, deberá recrear el proceso y cargar los datos de nuevo. Utilice Seleccionar conjunto de datos para examinar conjuntos de datos específicos y comprobar si hay advertencias que puedan afectar a la carga de los datos. Haga clic en el icono del ojo junto a cualquier columna de estructura para ver los datos.

  • Haga clic en Siguiente.

Nota informativaSi no se detectaron eventos para el conjunto de datos, se muestra un mensaje en destino. Deberá ejecutar la tarea cuando haya eventos para leer, y las columnas se añadirán automáticamente.

Configuración de las propiedades de ingesta

Configure los ajustes de su proceso:

  • Leer datos desde

    • Empezar desde el evento más antiguo: ingiere todos los datos históricos.

    • Empezar desde ahora: ingiere los nuevos datos que lleguen desde el momento en que se inicia el proceso.

  • Desanidamiento de columnas

    • Conservar columnas anidadas: no se aplican transformaciones.

    • Desanidar en columnas separadas: los datos se dividen en columnas separadas.

  • Configuración de carga para nuevos conjuntos de datos

    • Solo añadir: generalmente la mejor opción para los datos de eventos, ya que suelen tener una vida útil corta y no se actualizan, por ejemplo, Pedidos.

    • Aplicar cambios: esto es más adecuado para datos que se actualizan con el tiempo, por ejemplo, Clientes. Actualiza los registros existentes e inserta nuevos registros basándose en campos clave. Necesitará especificar los campos clave más adelante cuando defina la tarea.

  • Partición de las tablas de destino

    La opción de partición de tabla de destino se aplica a todas las tablas en el proceso. Podrá anular esto más adelante a nivel de tabla para definir una partición personalizada.

    Nota informativaEsta opción solo está disponible cuando Solo anexar está seleccionado en Configuración de carga.
    • Sin partición: las tablas se crean sin ninguna partición.

    • Partición por fecha de ingesta de eventos: las tablas se particionan según la fecha en que se ingieren los eventos.

  • Manejo de cambios de datos

    Nota informativaEsta opción solo está disponible cuando Aplicar cambios está seleccionado en Configuración de carga.
    • Incluir eliminaciones lógicas: inserte una expresión para definir qué registros se marcarán para su eliminación.

    • Crear un almacén de datos históricos (Tipo 2): esto mantendrá las versiones anteriores de los registros modificados.

  • Haga clic en Siguiente.

Resumen

La pantalla de resumen muestra visualmente su canalización de datos:

  • Opcionalmente, para la tarea de destino de streaming y transformación de streaming, puede hacer clic en Editar nombre y descripción para proporcionar nuevos valores.

  • Seleccione la opción de lo que desea que suceda Después de crear la canalización.

  • Cuando haya configurado todos los ajustes, haga clic en Crear para crear la canalización de streaming.

  • Cuando se muestre el proyecto, puede preparar y ejecutar cada tarea para comenzar a ingerir los datos.

Correspondencias entre tipos de datos

El esquema de origen inicial se basa en una muestra de los datos tomada antes de la fase PREPARE al crear su proyecto de canalización, y la evolución del esquema se gestiona en el momento de la lectura. Las tareas de duplicación y otras tareas posteriores que no admiten STRUCT y ARRAY utilizan un tipo JSON. Los datos se pueden analizar mediante SQL.

Las siguientes asociaciones de tipos de datos se aplican a todas las fuentes de datos admitidas, pero varían según el tipo de archivo de origen, y se debe tener en cuenta lo siguiente:

  • Los tipos de datos se infieren de una muestra de los datos que se incorporan. Por ejemplo, si un campo contiene solo valores enteros en la muestra, se crea como INT8 en las tareas de stream de destino/aterrizaje y transformación. Si los datos posteriores incluyen valores fraccionarios de doble precisión, los archivos de destino/aterrizaje contienen esos valores; sin embargo, en la tarea de transformación de Streaming, si la configuración Cambiar tipo de datos de campo se establece en Ignorar, la columna permanece como INT8 y los valores fraccionarios se truncan. Para evitar un truncado no deseado, asegúrese de que los datos de muestra incluyan el rango completo de valores esperados antes de la incorporación, o configure Cambiar tipo de datos de campo en Detener tarea durante las primeras etapas y ajuste los tipos de datos según sea necesario.

  • Si se añade un campo a una estructura en el origen, siempre se añade al destino. Para la transformación del flujo o stream, el comportamiento se aplica según la opción elegida en Configuración de la tarea de transformación de stream > Evolución del esquema > Añadir campos a la estructura (Aplicar al destino, Ignorar, Detener tarea).

  • Si falta un campo en un registro específico, o una matriz está vacía, se tratan como nulos.

  • Si un conjunto de datos se aplana por una matriz, y llega un registro donde esa matriz está vacía o es nula, el sistema crea una fila y el campo aplanado es nulo. No se excluye automáticamente. Si desea excluir estas filas, añada un filtro manualmente, por ejemplo, array_element IS NOT NULL.

  • Los tipos de datos mostrados en la interfaz de usuario reflejan la granularidad del conjunto de datos seleccionado. Para matrices aplanadas, se muestra el tipo de datos del elemento individual en lugar de la propia estructura de la matriz.

  • No se puede añadir un nuevo atributo dentro de una estructura en un campo JSON anidado, solo en el nivel raíz.

  • En las tareas de transformación de streaming, el aplanamiento solo se admite para un único nivel de una matriz. Cuando el aplanamiento se aplica a una matriz multinivel, por ejemplo, ARRAY<ARRAY<STRUCT>>, solo se aplana la matriz externa, lo que da como resultado ARRAY<STRUCT> en lugar de una STRUCT completamente aplanada. Además, la interfaz de usuario actual permite configurar el aplanamiento solo a nivel de columna. Como resultado, la selección de una matriz multinivel aplica implícitamente el aplanamiento solo al primer nivel de la matriz.

  • Cuando se hace referencia a una matriz de primitivas, se utiliza el tipo de datos del elemento si la granularidad es la matriz. De lo contrario, se utiliza el tipo de datos de la matriz.

    En este ejemplo, OrderDetails tiene una matriz de CustomerID de tipo de datos INT. OrderDetails.CustomerID significa INT si la granularidad es OrderDetails.CustomerID y ARRAY<INT> si la granularidad es OrderDetails.

JSON

En los archivos JSON, el valor numérico del origen determina el tipo de datos de destino:

  • INT8 se utiliza para valores enteros que se ajustan al rango de enteros admitido y no incluyen un componente fraccionario.

  • REAL8 (DOUBLE) se utiliza cuando el valor contiene un componente fraccionario (número de coma flotante).

  • STRING se utiliza cuando el valor numérico excede el rango máximo de enteros admitido.

Los tipos de datos se asocian de la siguiente manera:

Tipos de datos de origen Qlik Talend Data Integration tipos de datos
STRING STRING
NUMBER INT8
NUMBER REAL8
NUMBER STRING
BOOLEAN BOOLEAN
ARRAY ARRAY
OBJECT STRUCT

CSV, TSV, REGEX y SPLIT

Por defecto, todos los tipos de datos de origen se ingieren como una cadena. Use la opción, Inferir tipos automáticamente, para asociar los tipos de origen y destino de la siguiente manera:

Tipos de datos de origen Qlik tipos de datos
NUMERIC INT8/REAL8
True/TRUE/true/False/FALSE/false BOOLEAN
TIMESTAMP Las marcas de tiempo en el formato yyyy-MM-dd HH:mm:ss o yyyy-MM-ddTHH:mm:ssz se analizan como un tipo de fecha y hora. Si se incluye una zona horaria, el valor se analiza como una cadena de caracteres.

Parquet

Los archivos Parquet admiten tipos de datos físicos y lógicos. Los tipos de datos físicos definen cómo se almacenan los valores en el disco, como INT32, DOUBLE o BYTE_ARRAY. Los tipos de datos lógicos proporcionan un significado semántico además de la representación física, por ejemplo, identificando si un valor entero representa una fecha. Cuando un tipo lógico se adjunta a una columna Parquet y es compatible con Qlik Open Lakehouse (como se indica a continuación), la tarea de destino de Streaming utiliza el tipo lógico al definir el esquema de destino, en lugar del tipo físico subyacente. Esto garantiza que los datos se interpreten correctamente, preserva la semántica prevista, como la precisión, la escala y el significado temporal, y da como resultado esquemas más precisos cuando los datos se escriben en formatos de downstream.

Los datos obtenidos de archivos Parquet se asocian de la siguiente manera:

Tipos de datos de origen Tipos lógicos Qlik Talend Data Integration tipos de datos
BOOLEAN   BOOLEAN
INT32   INT8
INT64   INT8
INT96   DATETIME
FLOAT   REAL8
DOUBLE   REAL8
BYTE_ARRAY   STRING (Codificado como Base64)
FIXED_LEN_BYTE_ARRAY   STRING (Codificado como Base64)
BYTE_ARRAY STRING STRING
BYTE_ARRAY ENUM STRING
INT32 DECIMAL INT8
INT64 DECIMAL INT8
FIXED_LEN_BYTE_ARRAY DECIMAL INT8/REAL8 (Codificado como Base64)
BYTE_ARRAY DECIMAL INT8/REAL8 (Codificado como Base64)
INT32 DATE DATE
INT32 TIME(MILLIS,true) INT8
INT64 TIME(MICROS,true) TIME
INT64 TIMESTAMP(MICROS,true) DATETIME
INT64 TIMESTAMP(MILLIS,true) DATETIME
NESTED TYPES   STRUCT
LIST   ARRAY
MAP   ARRAY<STRUCT>. Matriz de estructuras que representan pares clave-valor.

Avro

Las siguientes asociaciones se aplican a los archivos Avro con registro de esquema.

Tipos de datos de origen Tipos lógicos Qlik Talend Data Integration tipos de datos
BOOLEAN   BOOLEAN
INT   INT8
LONG   INT8
FLOAT   REAL8
DOUBLE   REAL8
BYTES   STRING
STRING   STRING
RECORD   STRUCT
ENUM   STRING
ARRAY   ARRAY
MAP   ARRAY<STRUCT>
UNION    
FIXED   STRING
BYTES DECIMAL DECIMAL
FIXED DECIMAL DECIMAL
INT DATE DATE
INT TIME-MILLIS INT8
INT TIME-MICROS TIME
LONG TIMESTAMP-MILLIS DATETIME
LONG TIMESTAMP-MICROS DATETIME

ORC

Las siguientes asignaciones se aplican a los archivos ORC.

Tipos de datos de origen Qlik Talend Data Integration tipos de datos
BOOLEAN BOOLEAN
BYTE INT8
SHORT INT8
INT INT8
LONG INT8
DATE DATE
FLOAT REAL8
DOUBLE REAL8
TIMESTAMP DATETIME
BINARY STRING
DECIMAL REAL8
STRING STRING
VARCHAR STRING
CHAR STRING
LIST ARRAY
MAP ARRAY<STRUCT>. Matriz de estructuras que representan pares clave-valor.
STRUCT STRUCT
UNION  

Limitaciones y consideraciones

  • Si una estructura o matriz se modifica mediante la evolución automática del esquema en el destino, las vistas posteriores que no fueron creadas por una tarea de streaming Qlik Talend Cloud puede que necesiten actualizarse para no quedar obsoletas.

  • Si una tarea tiene errores de análisis, no pasará a un estado de error y no se marcará como que requiere atención. Dado que los errores de análisis son una métrica siempre creciente, no hay un criterio de salida para un estado de error.

  • Eliminar una capacidad de clúster solo se permite si no hay tareas que utilicen esa capacidad.

  • Las actualizaciones y eliminaciones de un registro con la misma clave principal no deben cruzar el límite de la partición, es decir, deben asociarse a la misma partición.

  • Si un origen contiene un gran número de columnas, solo las 500 columnas principales por frecuencia se muestran en las tareas y en el catálogo. Todas las columnas se guardan en los archivos Avro en el destino de S3, pero solo las 500 columnas principales se almacenan en las tablas de Iceberg. En la evolución del esquema, si se añade una nueva columna, no se añadirá a las columnas principales aunque sea frecuente.

¿Esta página le ha sido útil?

Si encuentra algún problema con esta página o su contenido (errores tipográficos, pasos que faltan o errores técnicos), no dude en ponerse en contacto con nosotros.