Dados de streaming
O processo de integração transfere dados da fonte e os armazena em tabelas do Iceberg. As alterações provenientes das fontes de dados de streaming são aplicadas continuamente às tabelas de armazenamento em tempo quase real.
Integrar dados
Os dados são integrados em um projeto de pipeline e os conjuntos de dados são armazenados na localização S3 definida nas configurações do projeto.
-
No seu projeto, clique em Criar e depois em Integrar dados.
-
Adicione um Nome de tarefa e uma Descrição opcional para a integração.
Clique em Próximo.
-
Selecione a conexão de origem.
Você pode selecionar uma conexão de origem de streaming existente ou criar uma nova conexão com a origem.
Para obter mais informações, consulte Conectando a fluxos de dados
Clique em Avançar e siga as instruções abaixo para sua fonte de dados.
Selecionando dados
Apache Kafka
A lista exibe os tópicos Kafka disponíveis no cluster definido na conexão de origem.
Ao selecionar seus tópicos, você pode selecionar conjuntos de dados específicos. Você também pode usar regras de seleção para incluir ou excluir grupos de conjuntos de dados:
-
Use % como curinga para definir um critério de seleção para os conjuntos de dados.
Se os tópicos forem selecionados usando regras de seleção, você pode escolher se deseja carregar todos os conjuntos de dados na mesma tabela de destino ou criar uma tabela de destino separada para cada tópico de origem:
-
Por padrão, o nome da tabela do Iceberg de destino é derivado do nome do tópico, formatado para estar em conformidade com as convenções de nomenclatura, por exemplo, minúsculas, espaços removidos, hífens substituídos por sublinhados. Em Definir nome do conjunto de dados de destino, você pode editar o nome da tabela de destino
-
Quando regras de seleção são usadas para carregar vários tópicos em uma única tabela, você deve fornecer o nome de destino.
-
Quando as regras de seleção são usadas e os dados são carregados em tabelas separadas (um conjunto de dados por tópico), os nomes de destino padrão são os nomes dos tópicos. Nesta etapa, você não pode editar os nomes no assistente, mas isso pode ser feito posteriormente na tarefa de aterrisagem.
-
Se uma regra for configurada para selecionar tópicos para ingestão, quaisquer novos tópicos que atendam aos critérios da regra também serão aterrisados se a opção Novo tópico > Adicionar ao destino em evolução de esquema nas configurações da tarefa de aterrisagem estiver marcada.
Selecione um ou mais conjuntos de dados, ou utilize uma regra de seleção, e clique em Adicionar. Clique em Próximo.
Amazon Kinesis
A lista exibe os fluxos Kinesis disponíveis definidos na conexão de origem.
Selecione um ou mais conjuntos de dados e clique em Adicionar. Você pode ver os conjuntos de dados adicionados em Fluxos selecionados. Clique em Próximo.
Amazon S3
O navegador de diretórios exibe uma lista de todos os diretórios localizados no compartimento S3 da sua conexão de origem.
-
Selecione os diretórios a serem incluídos ao aterrisar dados:
-
Para cada diretório, em Adicionar caminho, insira o caminho e o padrão de nome de arquivo:
-
Use * como um curinga para corresponder a qualquer caractere.
-
Para inserir um padrão de data, use <yyyy> como o espaço reservado para o ano de quatro dígitos, <MM> como o espaço reservado para o mês de dois dígitos, <dd> como o espaço reservado para o dia de dois dígitos e <HH> como o espaço reservado para a hora de dois dígitos. Por exemplo:
-
MyDir3/<yyyy>_<MM>_<dd>_<HH>_orders.csv
-
MyDir3/<yyyy>/<MM>/<dd>/<HH>_orders.csv
-
-
-
-
Clique em Visualizar para abrir o diálogo Visualizar dados. Uma lista de arquivos incluídos e excluídos é exibida.
-
Clique em Validar para verificar se os caminhos e os padrões de nome de arquivo estão corretos e funcionais.
-
Em Definir nome do conjunto de dados de destino, forneça um nome para mapear o tópico para a tabela do Iceberg de destino. Clique em Próximo.
Selecionando o tipo de conteúdo
Escolha o tipo de conteúdo dos eventos de origem.
-
Selecione o tipo de eventos que você está ingerindo em Escolha o tipo de eventos de dados.
-
Para obter mais informações, consulte Conectando a fluxos de dados.
O tipo de conteúdo selecionado aplica-se a todos os tópicos, conjuntos de dados ou eventos de dados. Você deve criar uma nova tarefa para cada tipo de conteúdo que deseja ingerir.
-
Expanda Verificar se os eventos foram carregados corretamente para confirmar que os dados podem ser analisados. É aconselhável garantir que os dados estejam corretos nesta etapa, caso contrário, você precisará recriar o pipeline e carregar os dados novamente. Use Selecionar conjunto de dados para examinar conjuntos de dados específicos e verificar quaisquer avisos que possam afetar o carregamento dos dados. Clique no ícone de olho ao lado de quaisquer colunas de struct para visualizar os dados.
-
Clique em Próximo.
Definindo propriedades de ingestão
Defina as configurações para o seu pipeline:
-
Ler dados de
-
Começar do evento mais antigo: consuma todos os dados históricos.
-
Começar agora: consuma novos dados que chegam a partir do momento em que o pipeline é iniciado.
-
-
Desaninhar colunas
-
Preservar colunas aninhadas: nenhuma transformação é aplicada.
-
Desaninhamento em colunas separadas: os dados são divididos em colunas separadas.
-
-
Configurações de carregamento para novos conjuntos de dados
-
Somente anexar: geralmente a melhor opção para dados de evento, pois geralmente têm um tempo de vida curto e não são atualizados, por exemplo, Ordens.
-
Aplicar alterações: isso é mais adequado para dados que são atualizados ao longo do tempo, por exemplo, Clientes. Atualiza os registros existentes e insere novos registros com base em campos chave. Você precisará especificar os campos chave posteriormente, ao definir a tarefa.
-
-
Partição da tabela de destino
A opção de partição da tabela de destino aplica-se a todas as tabelas no pipeline. Você pode alterar isso posteriormente no nível da tabela para definir um particionamento personalizado.
Nota informativaEsta opção está disponível apenas quando Somente anexar está selecionado em Configurações de carregamento.-
Nenhuma partição: as tabelas são criadas sem partições.
-
Particionar por data de ingestão de eventos: as tabelas serão particionadas pela data em que os eventos forem ingeridos.
-
-
Gerenciamento de alteração de dados
Nota informativaEsta opção está disponível apenas quando Aplicar alterações está selecionado em Configurações de carregamento.-
Incluir exclusões flexíveis: insira uma expressão para definir quais registros marcar para exclusão.
-
Criar um armazenamento de dados históricos (Tipo 2): isso manterá as versões anteriores dos registros alterados.
-
-
Clique em Próximo.
Resumo
A tela de resumo fornece uma exibição visual do seu pipeline:
-
Opcionalmente, para a tarefa de aterrissagem de streaming e transformação de streaming, você pode clicar em Editar nome e descrição para fornecer novos valores.
-
Selecione a opção para o que você deseja que aconteça Depois que o pipeline for criado.
-
Quando você tiver configurado todas as configurações, clique em Criar para criar o pipeline de streaming.
-
Quando o projeto for exibido, você pode preparar e executar cada tarefa para começar a ingerir os dados.
-
Preparar e executar a tarefa de aterrisagem de streaming.
Para obter mais informações, consulte Aterrisando dados de streaming para o Qlik Open Lakehouse.
-
Preparar e executar a tarefa de transformação de streaming.
Para obter mais informações, consulte Armazenando conjuntos de dados de streaming.
-
Mapeamentos de tipo de dados
O esquema de origem inicial é baseado em uma amostra dos dados coletados antes da fase PREPARE ao criar seu projeto de pipeline, e a evolução do esquema é tratada no momento da leitura. Tarefas de espelhamento e outras tarefas downstream que não suportam STRUCT e ARRAY usam um tipo JSON. Os dados podem ser analisados usando SQL.
Os seguintes mapeamentos de tipo de dados se aplicam a todas as fontes de dados suportadas, mas variam de acordo com o tipo de arquivo de origem, e o seguinte deve ser observado:
-
Tipos de dados são inferidos a partir de uma amostra dos dados que estão sendo integrados. Por exemplo, se um campo contém apenas valores inteiros na amostra, ele é criado como INT8 nas tarefas de aterrisagem de streaming e transformação. Se os dados subsequentes incluírem valores fracionários de precisão dupla, os arquivos de aterrisagem contêm esses valores; no entanto, na Tarefa de transformação de streaming, se a configuração Alterar tipo de dados do campo estiver definida como Ignorar, a coluna permanece INT8 e os valores fracionários são truncados. Para evitar truncamento não intencional, certifique-se de que os dados de amostra incluam o intervalo completo de valores esperados antes da integração, ou configure Alterar tipo de dados do campo para Parar tarefa durante os estágios iniciais e ajuste os tipos de dados conforme necessário.
-
Se um campo for adicionado a uma estrutura na origem, ele é sempre adicionado ao destino de aterrisagem. Para transformação de streaming, o comportamento é aplicado de acordo com a opção escolhida em Configurações da tarefa de transformação de streaming > Evolução do esquema > Adicionar campos à estrutura (Aplicar ao destino, Ignorar, Parar tarefa).
-
Se um campo estiver ausente em um registro específico, ou um array estiver vazio, eles serão tratados como nulos.
-
Se um conjunto de dados for nivelado por um array, e um registro chegar onde esse array estiver vazio ou nulo, o sistema criará uma linha e o campo nivelado será nulo. Não será excluído automaticamente. Se você quiser excluir essas linhas, adicione manualmente um filtro, por exemplo, array_element IS NOT NULL.
-
Os tipos de dados exibidos na interface do usuário refletem a granularidade do conjunto de dados selecionado. Em arrays nivelados, o tipo de dados de cada elemento individual é exibido em vez da estrutura do array em si.
-
Um novo atributo não pode ser adicionado dentro de uma estrutura em um campo JSON aninhado, apenas no nível raiz.
-
Em tarefas de transformação de fluxo, o nivelamento é suportado apenas para um único nível de um array. Quando o nivelamento é aplicado a um array multinível, por exemplo, ARRAY<ARRAY<STRUCT>>, apenas o array externo é nivelado, resultando em ARRAY<STRUCT> em vez de uma ESTRUTURA totalmente nivelada. Além disso, a IU atual permite que o nivelamento seja configurado apenas no nível da coluna. Como resultado, selecionar um array multinível aplica implicitamente o nivelamento apenas ao primeiro nível da matriz.
-
Ao se referir a um array de primitivos, o tipo de dados do elemento é usado se a granularidade for o array. Caso contrário, o tipo de dados de array será usado.
Neste exemplo, OrderDetails tem um array de CustomerID do tipo de dados INT. OrderDetails.CustomerID significa INT se a granularidade for OrderDetails.CustomerID e ARRAY<INT> se a granularidade for OrderDetails.
JSON
Em arquivos JSON, o valor numérico na origem determina o tipo de dados de destino:
-
INT8 é usado para valores inteiros que se encaixam dentro do intervalo de inteiros suportado e não incluem um componente fracionário.
-
REAL8 (DOUBLE) é usado quando o valor contém um componente fracionário (número de ponto flutuante).
-
STRING é usado quando o valor numérico excede o intervalo máximo de inteiros suportado.
Tipos de dados são mapeados da seguinte forma:
| Tipos de dados de origem | Qlik Talend Data Integration tipos de dados |
|---|---|
| STRING | STRING |
| NUMBER | INT8 |
| NUMBER | REAL8 |
| NUMBER | STRING |
| BOOLEAN | BOOLEAN |
| ARRAY | ARRAY |
| OBJECT | STRUCT |
CSV, TSV, REGEX e SPLIT
Por padrão, todos os tipos de dados de origem são ingeridos como uma string. Use a opção Inferir tipos automaticamente para mapear tipos de origem e de destino da seguinte forma:
| Tipos de dados de origem | Qlik tipos de dados |
|---|---|
| NUMERIC | INT8/REAL8 |
| True/TRUE/true/False/FALSE/false | BOOLEAN |
| TIMESTAMP | Carimbos de data/hora no formato yyyy-MM-dd HH:mm:ss ou yyyy-MM-ddTHH:mm:ssz são analisados para um tipo de data/hora. Se um fuso horário for incluído, o valor será analisado como uma string. |
Parquet
Arquivos Parquet suportam tipos de dados físicos e lógicos. Tipos de dados físicos definem como os valores são armazenados em disco, como INT32, DOUBLE ou BYTE_ARRAY. Tipos de dados lógicos fornecem significado semântico sobre a representação física, por exemplo, identificando se um valor inteiro representa uma data. Quando um tipo lógico é anexado a uma coluna Parquet e é compatível no Qlik Open Lakehouse (conforme listado abaixo), a tarefa de aterrisagem de streaming usa o tipo lógico ao definir o esquema de destino, em vez do tipo físico subjacente. Isso garante que os dados sejam interpretados corretamente, preserva a semântica pretendida, como precisão, escala e significado temporal, e resulta em esquemas mais precisos quando os dados são gravados em formatos downstream.
Os dados originados de arquivos Parquet são mapeados da seguinte forma:
| Tipos de dados de origem | Tipos lógicos | Qlik Talend Data Integration tipos de dados |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT32 | INT8 | |
| INT64 | INT8 | |
| INT96 | DATETIME | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTE_ARRAY | STRING (codificado como Base64) | |
| FIXED_LEN_BYTE_ARRAY | STRING (codificado como Base64) | |
| BYTE_ARRAY | STRING | STRING |
| BYTE_ARRAY | ENUM | STRING |
| INT32 | DECIMAL | INT8 |
| INT64 | DECIMAL | INT8 |
| FIXED_LEN_BYTE_ARRAY | DECIMAL | INT8/REAL8 (codificado como Base64) |
| BYTE_ARRAY | DECIMAL | INT8/REAL8 (codificado como Base64) |
| INT32 | DATE | DATE |
| INT32 | TIME(MILLIS,true) | INT8 |
| INT64 | TIME(MICROS,true) | TIME |
| INT64 | TIMESTAMP(MICROS,true) | DATETIME |
| INT64 | TIMESTAMP(MILLIS,true) | DATETIME |
| TIPOS ANINHADOS | STRUCT | |
| LIST | ARRAY | |
| MAP | ARRAY<STRUCT>. Matriz de estruturas representando pares chave-valor. |
Avro
Os seguintes mapeamentos se aplicam a arquivos Avro com o Schema Registry.
| Tipos de dados de origem | Tipos lógicos | Qlik Talend Data Integration tipos de dados |
|---|---|---|
| BOOLEAN | BOOLEAN | |
| INT | INT8 | |
| LONG | INT8 | |
| FLOAT | REAL8 | |
| DOUBLE | REAL8 | |
| BYTES | STRING | |
| STRING | STRING | |
| RECORD | STRUCT | |
| ENUM | STRING | |
| ARRAY | ARRAY | |
| MAP | ARRAY<STRUCT> | |
| UNION | ||
| FIXED | STRING | |
| BYTES | DECIMAL | DECIMAL |
| FIXED | DECIMAL | DECIMAL |
| INT | DATE | DATE |
| INT | TIME-MILLIS | INT8 |
| INT | TIME-MICROS | TIME |
| LONG | TIMESTAMP-MILLIS | DATETIME |
| LONG | TIMESTAMP-MICROS | DATETIME |
ORC
Os seguintes mapeamentos aplicam-se a arquivos ORC.
| Tipos de dados de origem | Qlik Talend Data Integration tipos de dados |
|---|---|
| BOOLEAN | BOOLEAN |
| BYTE | INT8 |
| SHORT | INT8 |
| INT | INT8 |
| LONG | INT8 |
| DATE | DATE |
| FLOAT | REAL8 |
| DOUBLE | REAL8 |
| TIMESTAMP | DATETIME |
| BINARY | STRING |
| DECIMAL | REAL8 |
| STRING | STRING |
| VARCHAR | STRING |
| CHAR | STRING |
| LIST | ARRAY |
| MAP | ARRAY<STRUCT>. Matriz de estruturas representando pares chave-valor. |
| STRUCT | STRUCT |
| UNION |
Limitações e considerações
-
Se uma estrutura ou array for modificado pela evolução automática do esquema na aterrisagem, as exibições downstream que não foram criadas por uma tarefa de streaming Qlik Talend Cloud podem precisar ser atualizadas para não ficarem obsoletas.
-
Se uma tarefa tiver erros de análise, ela não entrará em um estado de erro e não será marcada como exigindo atenção. Como os erros de análise são uma métrica sempre crescente, não há critério de saída para um estado de erro.
-
Remover uma capacidade de cluster só é permitido se não houver tarefas usando essa capacidade.
-
Atualizações e exclusões de um registro com a mesma chave primária não devem cruzar o limite da partição, ou seja, precisam ser mapeadas para a mesma partição.
-
Se uma origem contiver um grande número de colunas, apenas as 500 colunas principais por frequência serão exibidas nas tarefas e no catálogo. Todas as colunas são salvas nos arquivos Avro na aterrisagem do S3, mas apenas as 500 colunas principais são armazenadas em tabelas do Iceberg. Na evolução do esquema, se uma nova coluna for adicionada, ela não será adicionada às colunas principais, mesmo que seja frequente.