Skip to main content Skip to complementary content

Reading Avro data from ConsumerRecord

About this task

Configure the reading Job.

Procedure

  1. From the reading Job, double-click the tFixedFlowInput component to open its Basic settings view and specify the following parameters:
    1. Click the […] button next to Edit schema to open the Schema dialog box.
    2. Click the [+] button to add columns and give them a name. For example:
      The input schema with all added columns.
    3. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.
    4. Select Use Single Table in the Mode area and specify the value for each column.
  2. Double-click the tJavaRow component to open its Basic settings view and specify the following parameter:
    1. In the Code field, enter the Java code to extract the content. For example:
      org.apache.avro.Schema valueSchema = new org.apache.avro.Schema.Parser().parse(
      "{"
      	+ "\"type\":\"record\","
      	+ "\"name\":\"testRecord\","
      	+ "\"fields\":["
      		+ "{\"name\":\"id\",\"type\":\"string\"},"
      		+ "{\"name\":\"amount\",\"type\":\"string\"}]"
      + "}"
      );
      org.apache.avro.generic.GenericData.Record valueDataRecord = new org.apache.avro.generic.GenericData.Record(valueSchema);
      valueDataRecord.put("id", row1.id);
      valueDataRecord.put("amount", row1.amount);
      
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		valueDataRecord
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record;
  3. Double-click the tKafkaOutput component to open its Basic settings view and specify the following parameters:
    1. From the Input type drop-down list, select ProducerRecord.
    2. From the Version drop-down list, select the version of the Kafka cluster to be used.
    3. In the Broker list field, enter the address of the broker nodes of the Kafka cluster to be used.
  4. Open the Advanced settings viewand specify the following parameters:
    1. Select Custom in the Key serializer list, and enter "org.apache.kafka.common.serialization.ByteArraySerializer" in the corresponding text field.

      If you would like to use an Avro key serializer, you can create an org.apache.avro.Schema object for this and replace input_row.key with it in the code of the tJavaRow component, the same way it's been done for the value column in this example.

    2. Select Avro in the Value serializer list.
  5. From the subJob, double-click the tKafkaInput component to open its Basic settings view and specify the following parameters:
    1. From the Output type drop-down list, select ConsumerRecord.
      When you use ConsumerRecord, the Avro records are classified as Object in Talend Studio as follows:
    2. From the Version drop-down list, select the version of the Kafka cluster to be used.
    3. In the Broker list field, enter the address of the broker nodes of the Kafka cluster to be used.
    4. In the Topic name field, enter the name of the topic from which tKafkaInput receives the feed of messages.
    5. In the Consumer group id field, enter the name of the consumer group to which you want tKafkaInput to belong.
  6. Double-click the tJavaRow component to open its Basic settings view and specify the following parameters:
    1. Click the […] button next to Edit schema to open the Schema dialog box.
    2. Click the [+] button to add a column and give a name to the column. For example:
    3. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.
    4. In the Code field, enter the Java code to extract the content. For example:
      org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
      
      output_row.topic = record.topic();
      output_row.partition = record.partition();
      output_row.timestamp = record.timestamp();
      
      output_row.header1 = record.headers().lastHeader("header1").value();
      output_row.header2 = record.headers().lastHeader("header2").value();
      
      output_row.key = (byte[]) record.key();
      
      org.apache.avro.generic.GenericData.Record avroRecord = (org.apache.avro.generic.GenericData.Record) record.value();
      output_row.id = avroRecord.get("id").toString();
      output_row.amount = avroRecord.get("amount").toString();
  7. Double-click the tLogRow component to open its Basic settings view and select Table in the Mode area.
  8. Save your changes.

Results

The reading Job is configured.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!