Streaming data
Het onboardingproces draagt gegevens over van de bron en slaat deze op in Iceberg-tabellen. Wijzigingen van de streaming-gegevensbronnen worden continu en in vrijwel realtime toegepast op de opslagtabellen.
Gegevens onboarden
Gegevens worden ge-onboard binnen een pijplijnproject en gegevensverzamelingen worden opgeslagen op de S3-locatie die is gedefinieerd in de projectinstellingen.
-
Klik in uw project op Maken en vervolgens op Gegevens onboarden.
-
Voeg een Taaknaam en optionele Beschrijving toe voor de onboarding.
Klik op Volgende.
-
Selecteer de bronverbinding.
U kunt een bestaande streaming-bronverbinding selecteren of een nieuwe verbinding met de bron maken.
Zie Verbinding maken met datastreams voor meer informatie.
Klik op Volgende en volg de onderstaande instructies voor uw gegevensbron.
Gegevens selecteren
Apache Kafka
De lijst toont de beschikbare Kafka-onderwerpen van het cluster dat is gedefinieerd in de bronverbinding.
Bij het selecteren van uw onderwerpen kunt u specifieke gegevensverzamelingen selecteren. U kunt ook selectieregels gebruiken om groepen gegevensverzamelingen op te nemen of uit te sluiten:
-
Gebruik % als een jokerteken om selectiecriteria voor de gegevensverzamelingen te definiëren.
Als onderwerpen worden geselecteerd met behulp van selectieregels, kunt u kiezen of u alle gegevensverzamelingen in dezelfde doeltabel wilt laden of een afzonderlijke doeltabel voor elk brononderwerp wilt maken:
-
Standaard wordt de naam van de Iceberg-doeltabel afgeleid van de onderwerpnaam, opgemaakt om te voldoen aan naamgevingsconventies, bijvoorbeeld kleine letters, spaties verwijderd, streepjes vervangen door onderstrepingstekens. In Naam doelgegevensverzameling definiëren kunt u de naam van de doeltabel bewerken
-
Wanneer selectieregels worden gebruikt om meerdere onderwerpen in één tabel te laden, moet u de doelnaam opgeven.
-
Wanneer selectieregels worden gebruikt en de gegevens in afzonderlijke tabellen worden geladen (één gegevensverzameling per onderwerp), zijn de standaard doelnamen de onderwerpnamen. In dit stadium kunt u de namen niet bewerken in de wizard, maar dit kan later in de tussenopslag-taak worden gedaan.
-
Als een regel is geconfigureerd om onderwerpen voor opname te selecteren, worden alle nieuwe onderwerpen die aan de regelcriteria voldoen ook in de tussenopslag geplaatst als de optie Nieuw onderwerp > Toevoegen aan doel onder schema-evolutie in de instellingen van de tussenopslag-taak is aangevinkt.
Selecteer een of meer gegevensverzamelingen, of gebruik een selectieregel, en klik op Toevoegen. Klik op Volgende.
Amazon Kinesis
De lijst toont de beschikbare Kinesis-streams die zijn gedefinieerd in de bronverbinding.
Selecteer een of meer gegevensverzamelingen en klik op Toevoegen. U kunt de toegevoegde gegevensverzamelingen zien onder Geselecteerde streams. Klik op Volgende.
Amazon S3
De mapbrowser toont een lijst met alle mappen die zich in de S3-bucket van uw bronverbinding bevinden.
-
Selecteer de mappen die u wilt opnemen bij het in de tussenopslag plaatsen van gegevens:
-
Voer voor elke map in Pad toevoegen het pad en het bestandsnaampatroon in:
-
Gebruik * als een jokerteken om met elk willekeurig teken overeen te komen.
-
Om een datumpatroon in te voeren, gebruikt u <yyyy> als de tijdelijke aanduiding voor het viercijferige jaar, <MM> als de tijdelijke aanduiding voor de tweecijferige maand, <dd> als de tijdelijke aanduiding voor de tweecijferige dag en <HH> als de tijdelijke aanduiding voor het tweecijferige uur. Bijvoorbeeld:
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
Klik op Voorbeeld om het dialoogvenster Voorbeeld van gegevens te openen. Er wordt een lijst met opgenomen en uitgesloten bestanden weergegeven.
-
Klik op Valideren om te controleren of de paden en de bestandsnaampatronen correct en functioneel zijn.
-
Geef in Naam doelgegevensverzameling definiëren een naam op om het onderwerp toe te wijzen aan de Iceberg-doeltabel. Klik op Volgende.
Het inhoudstype selecteren
Kies het inhoudstype van de brongebeurtenissen.
-
Selecteer het type gebeurtenissen dat u opneemt in Kies het type gegevensgebeurtenissen.
-
Zie Verbinding maken met datastreams voor meer informatie.
Het geselecteerde inhoudstype is van toepassing op alle onderwerpen, gegevensverzamelingen of gegevensgebeurtenissen. U moet een nieuwe taak maken voor elk inhoudstype dat u wilt opnemen.
-
Vouw Controleren of de gebeurtenissen correct zijn geladen uit om te bevestigen dat de gegevens kunnen worden geparseerd. Het is een goed idee om in dit stadium te controleren of de gegevens correct zijn, anders moet u de pijplijn opnieuw maken en de gegevens opnieuw laden. Gebruik Gegevensverzameling selecteren om specifieke gegevensverzamelingen te onderzoeken en eventuele waarschuwingen te controleren die van invloed kunnen zijn op het laden van de gegevens. Klik op het oogpictogram naast eventuele struct-kolommen om de gegevens te bekijken.
-
Klik op Volgende.
Opname-eigenschappen instellen
Configureer de instellingen voor uw pijplijn:
-
Gegevens lezen van
-
Starten vanaf de vroegste gebeurtenis: alle historische gegevens opnemen.
-
Nu starten: nieuwe gegevens opnemen die binnenkomen vanaf het moment dat de pijplijn start.
-
-
Kolommen ontwarren
-
Geneste kolommen behouden: er worden geen transformaties toegepast.
-
Ontwarren naar afzonderlijke kolommen: gegevens worden gesplitst in afzonderlijke kolommen.
-
-
Laadinstellingen voor nieuwe gegevensverzamelingen
-
Alleen toevoegen: over het algemeen de beste optie voor gebeurtenisgegevens, omdat deze meestal een korte levensduur hebben en niet worden bijgewerkt, bijvoorbeeld Orders.
-
Wijzigingen toepassen: dit is het meest geschikt voor gegevens die in de loop van de tijd worden bijgewerkt, bijvoorbeeld Klanten. Werkt bestaande records bij en voegt nieuwe records in op basis van sleutelvelden. U moet de sleutelvelden later opgeven bij het definiëren van de taak.
-
-
Doeltabelpartitie
De optie voor doeltabelpartitie is van toepassing op alle tabellen in de pijplijn. U kunt dit later op tabelniveau overschrijven om aangepaste partitionering te definiëren.
InformatieDeze optie is alleen beschikbaar wanneer Alleen toevoegen is geselecteerd in Laadinstellingen.-
Geen partitie: tabellen worden gemaakt zonder enige partitionering.
-
Partitioneren op opnamedatum van gebeurtenis: tabellen worden gepartitioneerd op de datum waarop gebeurtenissen worden opgenomen.
InformatieWanneer deze optie is geselecteerd samen met de koptekstkolomoptie hdr__from_timestamp, wordt hdr__from_timestamp gebruikt als de standaard partitiekolom. Zie Tabeldefinities voor informatie over het toevoegen van de koptekstkolom hdr__from_timestamp aan standaardweergaven.
-
-
Afhandeling van gegevenswijzigingen
InformatieDeze optie is alleen beschikbaar wanneer Wijzigingen toepassen is geselecteerd in Laadinstellingen.-
Zachte verwijderingen opnemen: Voer een expressie in om te definiëren welke records moeten worden gemarkeerd voor verwijdering.
-
Een historische gegevensopslag maken (Type 2): Hiermee worden eerdere versies van gewijzigde records bewaard.
-
-
Klik op Volgende.
Samenvatting
Het samenvattingsscherm biedt een visuele weergave van uw pijplijn:
-
Optioneel kunt u voor de taak Streaming-tussenopslag en Streaming-transformatie op Naam en beschrijving bewerken klikken om nieuwe waarden op te geven.
-
Selecteer de optie voor wat u wilt dat er gebeurt Nadat de pijplijn is gemaakt.
-
Wanneer u alle instellingen hebt geconfigureerd, klikt u op Maken om de streaming-pijplijn te maken.
-
Wanneer het project wordt weergegeven, kunt u elke taak voorbereiden en uitvoeren om te beginnen met het opnemen van de gegevens.
-
Bereid de taak Streaming-tussenopslag voor en voer deze uit.
Zie Tussenopslag van streaminggegevens naar Qlik Open Lakehouse voor meer informatie.
-
Bereid de taak Streaming-transformatie voor en voer deze uit.
Zie Streaming datasets opslaan voor meer informatie.
-
Toewijzingen van gegevenstypen
Het initiële bronschema is gebaseerd op een steekproef van de gegevens die is genomen voorafgaand aan de PREPARE-fase bij het maken van uw pijplijnproject, en schema-evolutie wordt afgehandeld tijdens het lezen. Mirror-taken en andere downstream-taken die STRUCT en ARRAY niet ondersteunen, gebruiken een JSON-type. Gegevens kunnen worden geparseerd met behulp van SQL.
De volgende toewijzingen van gegevenstypen zijn van toepassing op alle ondersteunde gegevensbronnen, maar variëren afhankelijk van het bronbestandstype, en het volgende moet worden opgemerkt:
-
Gegevenstypen worden afgeleid uit een steekproef van de gegevens die worden ge-onboard. Als een veld bijvoorbeeld alleen gehele getallen bevat in de steekproef, wordt het gemaakt als INT8 in de taken voor streaming-tussenopslag en -transformatie. Als latere gegevens fractionele waarden met dubbele precisie bevatten, bevatten de tussenopslagbestanden die waarden; in de taak Streaming-transformatie blijft de kolom echter INT8 en worden de fractionele waarden afgekapt als de instelling Gegevenstype van veld wijzigen is ingesteld op Negeren. Om onbedoeld afkappen te voorkomen, moet u ervoor zorgen dat de voorbeeldgegevens het volledige bereik van verwachte waarden bevatten vóór de onboarding, of configureert u Gegevenstype van veld wijzigen op Taak stoppen in de beginfasen en past u de gegevenstypen indien nodig aan.
-
Als een veld wordt toegevoegd aan een struct in de bron, wordt het altijd toegevoegd aan het tussenopslagdoel. Voor streaming-transformatie wordt het gedrag toegepast volgens de optie die is gekozen in Instellingen voor taak Streaming-transformatie > Schema-evolutie > Velden toevoegen aan struct (Toepassen op doel, Negeren, Taak stoppen).
-
Als een veld ontbreekt in een specifieke record, of als een array leeg is, worden deze behandeld als null.
-
Als een gegevensverzameling wordt afgevlakt door een array en er een record binnenkomt waarbij die array leeg of null is, maakt het systeem één rij en is het afgevlakte veld null. Het wordt niet automatisch uitgesloten. Als u deze rijen wilt uitsluiten, voegt u handmatig een filter toe, bijvoorbeeld array_element IS NOT NULL.
-
De gegevenstypen die in de gebruikersinterface worden weergegeven, weerspiegelen de geselecteerde granulariteit van de gegevensverzameling. Voor afgevlakte arrays wordt het gegevenstype van het afzonderlijke element weergegeven in plaats van de arraystructuur zelf.
-
Een nieuw kenmerk kan niet worden toegevoegd binnen een struct in een genest JSON-veld, alleen op het hoofdniveau.
-
In streaming-transformatietaken wordt afvlakken slechts voor één niveau van een array ondersteund. Wanneer afvlakken wordt toegepast op een array met meerdere niveaus, bijvoorbeeld ARRAY<ARRAY<STRUCT>>, wordt alleen de buitenste array afgevlakt, wat resulteert in ARRAY<STRUCT> in plaats van een volledig afgevlakte STRUCT. Bovendien staat de huidige gebruikersinterface toe dat afvlakken alleen op kolomniveau wordt geconfigureerd. Als gevolg hiervan past het selecteren van een array met meerdere niveaus impliciet afvlakken alleen toe op het eerste arrayniveau.
-
Wanneer u verwijst naar een array van primitieven, wordt het gegevenstype van het element gebruikt als de granulariteit de array is. Anders wordt het gegevenstype van de array gebruikt.
In dit voorbeeld heeft OrderDetails een array van CustomerID van het gegevenstype INT. OrderDetails.CustomerID betekent INT als de granulariteit OrderDetails.CustomerID is en ARRAY<INT> als de granulariteit OrderDetails is.
JSON
In JSON-bestanden bepaalt de numerieke waarde in de bron het doelgegevenstype:
-
INT8 wordt gebruikt voor waarden van gehele getallen die binnen het ondersteunde bereik voor gehele getallen vallen en geen fractionele component bevatten.
-
REAL8 (DOUBLE) wordt gebruikt wanneer de waarde een fractionele component bevat (drijvendekommagetal).
-
STRING wordt gebruikt wanneer de numerieke waarde het maximaal ondersteunde bereik voor gehele getallen overschrijdt.
Gegevenstypen worden als volgt toegewezen:
| Brongegevenstypen | Qlik Talend Data Integration-gegevenstypen |
|---|---|
| STRING | STRING |
| NUMBER | INT8 |
| NUMBER | REAL8 |
| NUMBER | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| OBJECT | STRUCT |
CSV, TSV, REGEX en SPLIT
Standaard worden alle brongegevenstypen opgenomen in een string. Gebruik de optie Typen automatisch afleiden om bron- en doeltypen als volgt toe te wijzen:
| Brongegevenstypen | Qlik-gegevenstypen |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | Tijdstempels in de notatie yyyy-MM-dd HH:mm:ss of yyyy-MM-ddTHH:mm:ssz worden geparseerd naar een datetime-type. Als er een tijdzone is opgenomen, wordt de waarde geparseerd als een string. |
Parquet
Parquet-bestanden ondersteunen fysieke en logische gegevenstypen. Fysieke gegevenstypen definiëren hoe waarden op schijf worden opgeslagen, zoals INT32, DOUBLE of BYTE_ARRAY. Logische gegevenstypen bieden semantische betekenis bovenop de fysieke weergave, bijvoorbeeld door te identificeren of een waarde van een geheel getal een datum vertegenwoordigt. Wanneer een logisch type is gekoppeld aan een Parquet-kolom en wordt ondersteund in Qlik Open Lakehouse (zoals hieronder vermeld), gebruikt de taak Streaming-tussenopslag het logische type bij het definiëren van het doelschema, in plaats van het onderliggende fysieke type. Dit zorgt ervoor dat gegevens correct worden geïnterpreteerd, behoudt de beoogde semantiek zoals precisie, schaal en temporele betekenis, en resulteert in nauwkeurigere schema's wanneer gegevens naar downstream-indelingen worden geschreven.
Gegevens afkomstig uit Parquet-bestanden worden als volgt toegewezen:
| Brongegevenstypen | Logische typen | Qlik Talend Data Integration-gegevenstypen |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTE_ARRAY | STRING (gecodeerd als Base64) | |
| FIXED_LEN_BYTE_ARRAY | STRING (gecodeerd als Base64) | |
| BYTE_ARRAY | STRING | STRING |
| BYTE_ARRAY | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| FIXED_LEN_BYTE_ARRAY | DECIMAL | INT8/REAL8 (gecodeerd als Base64) |
| BYTE_ARRAY | DECIMAL | INT8/REAL8 (gecodeerd als Base64) |
| INT32 | DATE | DATE |
| INT32 | TIME(MILLIS,true) | INT8 |
| INT64 | TIME(MICROS,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| NESTED TYPES | STRUCT | |
| LIST | ARRAY | |
| MAP | ARRAY<STRUCT>. Array van structs die sleutel-waardeparen vertegenwoordigen. |
Avro
De volgende toewijzingen zijn van toepassing op Avro-bestanden met schemaregister.
| Brongegevenstypen | Logische typen | Qlik Talend Data Integration-gegevenstypen |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| LONG | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| RECORD | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | ARRAY<STRUCT> | |
| UNION | ||
| FIXED | STRING | |
| BYTES | DECIMAL | DECIMAL |
| FIXED | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | TIME-MILLIS | INT8 |
| INT | TIME-MICROS | TIME |
| LONG | TIMESTAMP-MILLIS | DATETIME |
| LONG | TIMESTAMP-MICROS | DATETIME |
ORC
De volgende toewijzingen zijn van toepassing op ORC-bestanden.
| Brongegevenstypen | Qlik Talend Data Integration-gegevenstypen |
|---|---|
| BOOLEAN | BOOLEAN |
| BYTE | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| LONG | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| LIST | ARRAY |
| MAP | ARRAY<STRUCT>. Array van structs die sleutel-waardeparen vertegenwoordigen. |
| STRUCT | STRUCT |
| UNION |
Beperkingen en overwegingen
-
Als een structuur of array wordt gewijzigd door automatische schema-evolutie in de tussenopslag, moeten downstream-weergaven die niet zijn gemaakt door een Qlik Talend Cloud-streamingtaak mogelijk worden bijgewerkt om niet verouderd te raken.
-
Als een taak parseerfouten heeft, komt deze niet in een foutstatus en wordt deze niet gemarkeerd als aandacht vereist. Aangezien parseerfouten een altijd toenemende statistiek zijn, is er geen afsluitcriterium voor een foutstatus.
-
Het verwijderen van een clustermogelijkheid is alleen toegestaan als er geen taken zijn die die mogelijkheid gebruiken.
-
Updates en verwijderingen van een record met dezelfde primaire sleutel mogen de partitiegrens niet overschrijden, dat wil zeggen dat ze aan dezelfde partitie moeten worden toegewezen.
-
Als een bron een groot aantal kolommen bevat, worden alleen de top 500 kolommen op frequentie weergegeven in taken en in de catalogus. Alle kolommen worden opgeslagen in de Avro-bestanden in de S3-tussenopslag, maar alleen de top 500 kolommen worden opgeslagen in Iceberg-tabellen. Als er bij schema-evolutie een nieuwe kolom wordt toegevoegd, wordt deze niet toegevoegd aan de topkolommen, zelfs niet als deze frequent is.