Skip to main content Skip to complementary content

Writing Avro data to ProducerRecord

About this task

Configure the writing Job.

Procedure

  1. From the writing Job, double-click the tFixedFlowInput component to open its Basic settings view and specify the following parameters:
    1. Click the […] button next to Edit schema to open the Schema dialog box.
    2. Click the [+] button to add columns and give them a name. For example:
      The input schema with all added columns.
    3. Click OK to validate these changes and accept the propagation prompted by the pop-up dialog box.
    4. Select Use Single Table in the Mode area and specify the value for each column.
  2. Double-click the tJavaRow component to open its Basic settings view and specify the following parameter:
    1. In the Code field, enter the Java code to extract the content. For example:
      org.apache.avro.Schema valueSchema = new org.apache.avro.Schema.Parser().parse(
      "{"
      	+ "\"type\":\"record\","
      	+ "\"name\":\"testRecord\","
      	+ "\"fields\":["
      		+ "{\"name\":\"id\",\"type\":\"string\"},"
      		+ "{\"name\":\"amount\",\"type\":\"string\"}]"
      + "}"
      );
      org.apache.avro.generic.GenericData.Record valueDataRecord = new org.apache.avro.generic.GenericData.Record(valueSchema);
      valueDataRecord.put("id", row1.id);
      valueDataRecord.put("amount", row1.amount);
      
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		valueDataRecord
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record; 
  3. Double-click the tKafkaOutput component to open its Basic settings view and specify the following parameters:
    1. From the Input type drop-down list, select ProducerRecord.
    2. From the Version drop-down list, select the version of the Kafka cluster to be used.
    3. In the Broker list field, enter the address of the broker nodes of the Kafka cluster to be used.
  4. Open the Advanced settings viewand specify the following parameters:
    1. Select Custom in the Key serializer list, and enter "org.apache.kafka.common.serialization.ByteArraySerializer" in the corresponding text field.

      If you would like to use an Avro key serializer, you can create an org.apache.avro.Schema object for this and replace input_row.key with it in the code of the tJavaRow component, the same way it's been done for the value column in this example.

    2. Select Avro in the Value serializer list.
  5. Save your changes.

Results

The writing Job is configured.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!