Accéder au contenu principal Passer au contenu complémentaire

Lire des données Avro depuis ConsumerRecord

Pourquoi et quand exécuter cette tâche

Configurez le Job de lecture.

Procédure

  1. Depuis le Job de lecture, double-cliquez sur le composant tFixedFlowInput pour ouvrir sa vue Basic settings et configurez les paramètres suivants :
    1. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du Schema.
    2. Cliquez sur le bouton [+] pour ajouter des colonnes et donnez-leur un nom. Par exemple :
      Schéma d'entrée avec toutes les colonnes ajoutées.
    3. Cliquez sur OK pour valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.
    4. Dans la zone Mode, sélectionnez Use Single Table et spécifiez la valeur pour chaque colonne.
  2. Double-cliquez sur le tJavaRow pour ouvrir sa vue Basic settings et configurez le paramètre suivant :
    1. Dans le champ Code, saisissez le code Java pour extraire le contenu. Par exemple :
      org.apache.avro.Schema valueSchema = new org.apache.avro.Schema.Parser().parse(
      "{"
      	+ "\"type\":\"record\","
      	+ "\"name\":\"testRecord\","
      	+ "\"fields\":["
      		+ "{\"name\":\"id\",\"type\":\"string\"},"
      		+ "{\"name\":\"amount\",\"type\":\"string\"}]"
      + "}"
      );
      org.apache.avro.generic.GenericData.Record valueDataRecord = new org.apache.avro.generic.GenericData.Record(valueSchema);
      valueDataRecord.put("id", row1.id);
      valueDataRecord.put("amount", row1.amount);
      
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		valueDataRecord
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record;
  3. Double-cliquez sur le tKafkaOutput pour ouvrir sa vue Basic settings et configurez les paramètres suivants :
    1. Dans la liste déroulante Input type, sélectionnez ProducerRecord.
    2. Dans la liste déroulante Version, sélectionnez la version du cluster Kafka à utiliser.
    3. Dans le champ Broker list, saisissez l'adresse des nœuds du broker du cluster Kafka à utiliser.
  4. Ouvrez la vue Advanced settings et spécifiez les paramètres suivants :
    1. Sélectionnez Custom dans la liste Key serializer et saisissez "org.apache.kafka.common.serialization.ByteArraySerializer" dans le champ correspondant.

      Si vous souhaitez utiliser un sérialiseur de clé Avro, vous pouvez créer un objet org.apache.avro.Schema pour la clé et mettre à jour le code dans le composant tJavaRow pour utiliser cet objet au lieu de input_row.key, avec la même approche que pour la colonne value dans cet exemple.

    2. Sélectionnez Avro dans la liste Value serializer.
  5. Dans le sous-Job, double-cliquez sur le composant tKafkaInput pour ouvrir sa vue Basic settings et configurez les paramètres suivants :
    1. Dans la liste déroulante Output type, sélectionnez ConsumerRecord.
      Lorsque vous utilisez ConsumerRecord, les enregistrements Avro sont classifiés comme Object dans le Studio Talend, comme suit :
    2. Dans la liste déroulante Version, sélectionnez la version du cluster Kafka à utiliser.
    3. Dans le champ Broker list, saisissez l'adresse des nœuds du broker du cluster Kafka à utiliser.
    4. Dans le champ Topic name, saisissez le nom du topic duquel le tKafkaInput reçoit le flux de messages.
    5. Dans le champ Consumer group id, saisissez le nom du groupe de consommation auquel vous souhaitez que le tKafkaInput appartienne.
  6. Double-cliquez sur le tJavaRow pour ouvrir sa vue Basic settings et configurez les paramètres suivants :
    1. Cliquez sur le bouton [...] à côté du champ Edit schema pour ouvrir l'éditeur du Schema.
    2. Cliquez sur le bouton [+] pour ajouter une colonne et nommez cette colonne. Par exemple :
    3. Cliquez sur OK pour valider ces modifications et acceptez la propagation proposée par la boîte de dialogue qui s'ouvre.
    4. Dans le champ Code, saisissez le code Java pour extraire le contenu. Par exemple :
      org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
      
      output_row.topic = record.topic();
      output_row.partition = record.partition();
      output_row.timestamp = record.timestamp();
      
      output_row.header1 = record.headers().lastHeader("header1").value();
      output_row.header2 = record.headers().lastHeader("header2").value();
      
      output_row.key = (byte[]) record.key();
      
      org.apache.avro.generic.GenericData.Record avroRecord = (org.apache.avro.generic.GenericData.Record) record.value();
      output_row.id = avroRecord.get("id").toString();
      output_row.amount = avroRecord.get("amount").toString();
  7. Double-cliquez sur le composant tLogRow pour ouvrir sa vue Basic settings, sélectionnez Table, dans la zone Mode, pour afficher les résultats sous forme de tableau.
  8. Enregistrez vos modifications.

Résultats

Le Job de lecture est configuré.

Cette page vous a-t-elle aidé ?

Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.