Données de flux
Le processus d'intégration transfère les données de la source et les stocke dans des tables Iceberg. Les modifications apportées aux sources de données de flux sont appliquées en continu aux tables de stockage quasiment en temps réel.
Intégration des données
Les données sont intégrées dans un projet de pipeline et les jeux de données sont stockés à l'emplacement S3 défini dans les paramètres du projet.
-
Dans votre projet, cliquez sur Créer, puis sur Intégrer les données.
-
Ajoutez un Nom de tâche et une Description facultative pour l'intégration.
Cliquez sur Suivant.
-
Sélectionnez la connexion source.
Vous pouvez sélectionner une connexion source de flux existante ou créer une nouvelle connexion à la source.
Pour plus d'informations, consultez Connexion à des flux de données.
Cliquez sur Suivant et suivez les instructions ci-dessous pour votre source de données.
Sélection des données
Apache Kafka
La liste affiche les rubriques Kafka disponibles à partir du cluster défini dans la connexion source.
Lorsque vous sélectionnez vos rubriques, vous pouvez sélectionner des jeux de données spécifiques. Vous pouvez également utiliser des règles de sélection pour inclure ou exclure des groupes de jeux de données :
-
Utilisez % comme caractère générique pour définir des critères de sélection pour les jeux de données.
Si des rubriques sont sélectionnées à l'aide de règles de sélection, vous pouvez choisir de charger tous les jeux de données dans la même table cible ou de créer une table cible distincte pour chaque rubrique source :
-
Par défaut, le nom de la table Iceberg cible est dérivé du nom de la rubrique, formaté de sorte à être conforme aux conventions de nommage, par exemple, en minuscules, avec les espaces supprimés et les tirets remplacés par des traits de soulignement. Dans Définir le nom du jeu de données cible, vous pouvez modifier le nom de la table cible.
-
Lorsque des règles de sélection sont utilisées pour charger plusieurs rubriques dans une seule table, vous devez fournir le nom cible.
-
Lorsque des règles de sélection sont utilisées et que les données sont chargées dans des tables distinctes (un jeu de données par rubrique), les noms cibles par défaut sont les noms de rubrique. À ce stade, vous ne pouvez pas modifier les noms dans l'assistant, mais cela peut être fait ultérieurement dans la tâche de dépôt temporaire.
-
Si une règle est configurée de sorte à sélectionner des rubriques à ingérer, toutes les nouvelles rubriques qui répondent aux critères de la règle sont également déposées temporairement si l'option Nouvelle rubrique > Ajouter à la cible sous l'évolution du schéma dans les paramètres de la tâche de dépôt temporaire est cochée.
Sélectionnez un ou plusieurs jeux de données ou utilisez une règle de sélection, puis cliquez sur Ajouter. Cliquez sur Suivant.
Amazon Kinesis
La liste affiche les flux Kinesis disponibles définis dans la connexion source.
Sélectionnez un ou plusieurs jeux de données, puis cliquez sur Ajouter. Vous pouvez voir les jeux de données ajoutés sous Flux sélectionnés. Cliquez sur Suivant.
Amazon S3
Le navigateur de répertoires affiche une liste de tous les répertoires situés dans le compartiment S3 de votre connexion source.
-
Sélectionnez les répertoires à inclure lors du dépôt temporaire de données :
-
Pour chaque répertoire, dans Ajouter un chemin d'accès, saisissez le chemin d'accès et le modèle de nom de fichier :
-
Utilisez * comme caractère générique pour mettre en correspondance n'importe quel caractère.
-
Pour saisir un modèle de date, utilisez <yyyy> comme espace réservé pour l'année à quatre chiffres, <MM> comme espace réservé pour le mois à deux chiffres, <dd> comme espace réservé pour le jour à deux chiffres et <HH> comme espace réservé pour l'heure à deux chiffres. Par exemple :
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
Cliquez sur Aperçu pour ouvrir la boîte de dialogue Aperçu des données. Une liste des fichiers inclus et exclus s'affiche.
-
Cliquez sur Valider pour vérifier que les chemins d'accès et les modèles de nom de fichier sont corrects et fonctionnels.
-
Dans Définir le nom du jeu de données cible, indiquez un nom pour mapper la rubrique à la table Iceberg cible. Cliquez sur Suivant.
Sélection du type de contenu
Sélectionnez le type de contenu des événements sources.
-
Sélectionnez le type d'événements que vous ingérez dans Sélectionner le type d'événements de données.
-
Pour plus d'informations, consultez Connexion à des flux de données.
Le type de contenu sélectionné s'applique à l'ensemble des rubriques, jeux de données ou événements de données. Vous devez créer une nouvelle tâche pour chaque type de contenu que vous souhaitez ingérer.
-
Développez Vérifier que les événements sont correctement chargés pour confirmer que les données peuvent être analysées. Il est recommandé de s'assurer que les données sont correctes à ce stade, sinon vous devrez recréer le pipeline et charger de nouveau les données. Utilisez Sélectionner un jeu de données pour examiner des jeux de données spécifiques et vérifier les avertissements susceptibles d'affecter le chargement des données. Cliquez sur l'icône en forme d'œil à côté de n'importe quelle colonne de struct pour afficher les données.
-
Cliquez sur Suivant.
Définition des propriétés d'ingestion
Configurez les paramètres de votre pipeline :
-
Lire les données depuis
-
Démarrer à partir de l'événement le plus ancien : ingérez toutes les données historiques.
-
Démarrer à partir de maintenant : ingérez les nouvelles données qui arrivent à partir du démarrage du pipeline.
-
-
Désimbrication des colonnes
-
Préserver les colonnes imbriquées : aucune transformation n'est appliquée.
-
Désimbriquer en colonnes séparées : les données sont divisées en colonnes séparées.
-
-
Paramètres de chargement des nouveaux jeux de données
-
Écrire à la suite uniquement : généralement la meilleure option pour les données d'événement, car elles ont généralement une courte durée de vie et ne sont pas mises à jour, par exemple, Commandes.
-
Appliquer les modifications : il s'agit de la meilleure option pour les données qui sont mises à jour au fil du temps, par exemple, Clients. Met à jour les enregistrements existants et insère de nouveaux enregistrements en se basant sur les champs clés. Vous devrez définir les champs clés ultérieurement, lors de la définition de la tâche.
-
-
Partition des tables cibles
L'option Partition des tables cibles s'applique à toutes les tables du pipeline. Vous pouvez la remplacer ultérieurement au niveau de la table pour définir un partitionnement personnalisé.
Note InformationsCette option est uniquement disponible lorsque Écrire à la suite uniquement est sélectionné dans Paramètres de chargement.-
Aucune partition : les tables seront créées sans aucune partition.
-
Partition par date d'ingestion des événements : les tables seront partitionnées en fonction de la date d'ingestion des événements.
-
-
Manipulation des modifications de données
Note InformationsCette option est uniquement disponible lorsque Appliquer les modifications est sélectionné dans Paramètres de chargement.-
Inclure les suppressions réversibles : saisissez une expression pour définir les enregistrements à marquer à des fins de suppression.
-
Créer un data store historique (Type 2) : cette option conservera les versions précédentes des enregistrements modifiés.
-
-
Cliquez sur Suivant.
Résumé
L'écran Résumé fournit un affichage visuel de votre pipeline :
-
En option, pour les tâches de dépôt temporaire de flux et de transformation de flux, vous pouvez cliquer sur Modifier le nom et la description pour fournir de nouvelles valeurs.
-
Sélectionnez l'option correspondant à ce que vous souhaitez qu'il se passe Après la création du pipeline.
-
Lorsque vous avez configuré tous les paramètres, cliquez sur Créer pour créer le pipeline de flux.
-
Lorsque le projet s'affiche, vous pouvez préparer et exécuter chaque tâche pour commencer à ingérer les données.
-
Préparez et exécutez la tâche de dépôt temporaire de flux.
Pour plus d'informations, consultez Dépôt temporaire de données de flux dans Qlik Open Lakehouse.
-
Préparez et exécutez la tâche de transformation de flux.
Pour plus d'informations, consultez Stockage de jeux de données de flux.
-
Mappage de types de données
Le schéma source initial est basé sur un échantillon des données prélevées avant la phase PREPARE lors de la création de votre projet de pipeline, et l'évolution du schéma est traitée lors de la lecture. Les tâches de mise en miroir et les autres tâches en aval qui ne supportent pas STRUCT et ARRAY utilisent un type JSON. Les données peuvent être analysées via SQL.
Les mappages de types de données suivants s'appliquent à toutes les sources de données compatibles, mais varient en fonction du type de fichier source, et il convient de tenir compte des points suivants :
-
Les types de données sont déduits d'un échantillon des données en cours d'intégration. Par exemple, si un champ ne contient que des valeurs entières dans l'échantillon, il est créé en tant qu'INT8 dans les tâches de dépôt temporaire et de transformation de flux. Si les données ultérieures incluent des valeurs fractionnaires à double précision, les fichiers de dépôt temporaire contiennent ces valeurs ; cependant, dans la tâche de transformation de flux, si le paramètre Modifier le type de données du champ est défini sur Ignorer, la colonne reste INT8 et les valeurs fractionnaires sont tronquées. Pour éviter une troncature involontaire, assurez-vous que l'échantillon de données inclut la gamme complète des valeurs attendues avant l'intégration, ou configurez Modifier le type de données du champ sur Arrêter la tâche lors des premières étapes et ajustez les types de données, si nécessaire.
-
Si un champ est ajouté à une struct dans la source, il est toujours ajouté à la cible de dépôt temporaire. Pour la transformation de flux, le comportement est appliqué en fonction de l'option sélectionnée dans Paramètres de la tâche de transformation de flux > Évolution du schéma > Ajouter des champs à la struct (Appliquer à la cible, Ignorer, Arrêter la tâche).
-
S'il manque un champ dans un enregistrement spécifique, ou si une séquence est vide, ce champ ou cette séquence est traité(e) comme nul(le).
-
Si un jeu de données est aplati par une séquence et qu'un enregistrement arrive avec cette séquence vide ou nulle, le système crée une ligne et le champ aplati est nul. Il n'est pas automatiquement exclu. Si vous souhaitez exclure ces lignes, ajoutez manuellement un filtre, par exemple, array_element IS NOT NULL.
-
Les types de données affichés dans l'IU reflètent la granularité du jeu de données sélectionné. Pour les séquences aplaties, le type de données de l'élément individuel est affiché plutôt que la structure de séquence elle-même.
-
Un nouvel attribut ne peut pas être ajouté à l'intérieur d'une struct dans un champ JSON imbriqué, seulement au niveau racine.
-
Dans les tâches de transformation de flux, l'aplatissement n'est supporté que pour un seul niveau d'une séquence. Lorsque l'aplatissement est appliqué à une séquence à plusieurs niveaux, par exemple, ARRAY<ARRAY<STRUCT>>, seule la séquence externe est aplatie, ce qui donne ARRAY<STRUCT> plutôt qu'une STRUCT entièrement aplatie. De plus, l'IU actuelle permet de configurer l'aplatissement uniquement au niveau de la colonne. Par conséquent, la sélection d'une séquence à plusieurs niveaux applique implicitement l'aplatissement uniquement au premier niveau de la séquence.
-
Lorsque vous faites référence à une séquence de primitives, le type de données de l'élément est utilisé si la granularité est la séquence. Sinon, le type de données séquence est utilisé.
Dans cet exemple, OrderDetails contient une séquence de CustomerID de type de données INT. OrderDetails.CustomerID signifie INT si la granularité est OrderDetails.CustomerID et ARRAY<INT> si la granularité est OrderDetails.
JSON
Dans les fichiers JSON, la valeur numérique dans la source détermine le type de données cible :
-
INT8 est utilisé pour les valeurs entières qui se situent dans la plage d'entiers supportée et n'incluent pas de composante fractionnaire.
-
REAL8 (DOUBLE) est utilisé lorsque la valeur contient une composante fractionnaire (nombre à virgule flottante).
-
STRING est utilisé lorsque la valeur numérique dépasse la plage d'entiers maximale supportée.
Les types de données sont mappés comme suit :
| Type de données sources | Types de données Qlik Talend Data Integration |
|---|---|
| STRING | STRING |
| NUMBER | INT8 |
| NUMBER | REAL8 |
| NUMBER | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| OBJECT | STRUCT |
CSV, TSV, REGEX, et SPLIT
Par défaut, tous les types de données sources sont ingérés sous forme de chaîne. Utilisez l'option Déduire automatiquement les types pour mapper les types sources et cibles comme suit :
| Type de données sources | Types de données Qlik |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | Les horodatages au format yyyy-MM-dd HH:mm:ss ou yyyy-MM-ddTHH:mm:ssz sont analysés en type datetime. Si un fuseau horaire est inclus, la valeur est analysée comme une chaîne. |
Parquet
Les fichiers Parquet supportent les types de données physiques et logiques. Les types de données physiques définissent la manière dont les valeurs sont stockées sur le disque, comme INT32, DOUBLE ou BYTE_ARRAY. Les types de données logiques fournissent une signification sémantique en plus de la représentation physique, par exemple en identifiant si une valeur entière représente ou non une date. Lorsqu'un type logique est joint à une colonne Parquet et est supporté dans Qlik Open Lakehouse (comme indiqué ci-dessous), la tâche de dépôt temporaire de flux utilise le type logique lors de la définition du schéma cible plutôt que le type physique sous-jacent. Cela garantit l'interprétation correcte des données, préserve les sémantiques prévues telles que la précision, l'échelle et la signification temporelle et aboutit à des schémas plus précis lorsque les données sont écrites dans des formats en aval.
Les données provenant de fichiers Parquet sont mappées comme suit :
| Type de données sources | Types logiques | Types de données Qlik Talend Data Integration |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTE_ARRAY | STRING (Encoded as Base64) | |
| FIXED_LEN_BYTE_ARRAY | STRING (Encoded as Base64) | |
| BYTE_ARRAY | STRING | STRING |
| BYTE_ARRAY | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| FIXED_LEN_BYTE_ARRAY | DECIMAL | INT8/REAL8 (Encoded as Base64) |
| BYTE_ARRAY | DECIMAL | INT8/REAL8 (Encoded as Base64) |
| INT32 | DATE | DATE |
| INT32 | TIME(MILLIS,true) | INT8 |
| INT64 | TIME(MICROS,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| NESTED TYPES | STRUCT | |
| LIST | ARRAY | |
| MAP | ARRAY<STRUCT>. Séquence de structs représentant des paires clé-valeur. |
Avro
Les mappages suivants s'appliquent aux fichiers Avro avec registre de schémas.
| Type de données sources | Types logiques | Types de données Qlik Talend Data Integration |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| LONG | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| RECORD | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | ARRAY<STRUCT> | |
| UNION | ||
| FIXED | STRING | |
| BYTES | DECIMAL | DECIMAL |
| FIXED | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | TIME-MILLIS | INT8 |
| INT | TIME-MICROS | TIME |
| LONG | TIMESTAMP-MILLIS | DATETIME |
| LONG | TIMESTAMP-MICROS | DATETIME |
ORC
Les mappages suivants s'appliquent aux fichiers ORC.
| Type de données sources | Types de données Qlik Talend Data Integration |
|---|---|
| BOOLEAN | BOOLEAN |
| BYTE | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| LONG | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| LIST | ARRAY |
| MAP | ARRAY<STRUCT>. Séquence de structs représentant des paires clé-valeur. |
| STRUCT | STRUCT |
| UNION |
Limitations et considérations générales
-
Si une structure ou une séquence est modifiée par l'évolution de schéma automatique dans le dépôt temporaire, les vues en aval qui n'ont pas été créées par une tâche de flux Qlik Talend Cloud peuvent nécessiter une mise à jour afin de ne pas être obsolètes.
-
Si une tâche présente des erreurs d'analyse, elle n'obtiendra pas d'état d'erreur et ne sera pas marquée comme nécessitant une attention particulière. Comme les erreurs d'analyse sont une métrique toujours croissante, il n'existe pas de critère de sortie pour un état d'erreur.
-
La suppression d'une capacité de cluster n'est autorisée que si aucune tâche n'utilise cette capacité.
-
Les mises à jour et les suppressions d'un enregistrement portant la même clé primaire ne doivent pas franchir la limite de partition, c'est-à-dire qu'elles doivent être mappées vers la même partition.
-
Si une source contient un grand nombre de colonnes, seules les 500 premières colonnes par fréquence sont affichées dans les tâches et dans le catalogue. Toutes les colonnes sont enregistrées dans les fichiers Avro du dépôt temporaire S3, mais seules les 500 premières colonnes sont stockées dans les tables Iceberg. Dans l'évolution de schéma, si une nouvelle colonne est ajoutée, elle ne sera pas ajoutée aux colonnes supérieures, même si elle est fréquente.