Configuring the components
Procedure
-
Double-click the cTimer component to
display its Basic settings view in the Component tab. Keep the default settings of the options as
shown below.
-
Double-click the first cSetBody
component, which is labeled SetBodyAsJsonString, to open its
Basic settings view in the Component tab.
- Select Constant from the Language drop-down list and type in the JSON string "{\"my_field1\": 1,\"my_field2\": 72438939,\"my_field3\": \"my demo Test message\"}" in the Expression field as the message body.
-
Double-click the first cProcessor
component, which is labeled ConvertBodyToAvro, to open its
Basic settings view in the Component tab.
-
In the Code field, set the following Java code to convert
the message body (JSON string) to Avro Record.
Object body = exchange.getMessage().getBody(); exchange.getMessage().setBody(KafkaAvroConverterBean.jsonStringToAvro(body));
-
Click the Advanced settings tab and enter the following
statement in the Import field. Make sure you replace
local_project with the Talend
project name.
import org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroConverterBean;
-
Double-click the second cProcessor component, which is
labeled ConvertBodyToJsonString, to open its Basic
settings view in the Component tab.
-
In the Code field, set the following Java code to convert
the message body (Avro Record) to JSON string.
Object body = exchange.getMessage().getBody(); exchange.getMessage().setBody(KafkaAvroConverterBean.avroToJsonString(body));
-
Click the Advanced settings tab and enter the following
statement in the Import field. Make sure you replace
local_project with the Talend
project name.
import org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroConverterBean;
-
Double-click the first cKafka
component, which is labeled SendMessageToKafka, to open its
Basic settings view in the Component tab.
-
In the Broker List field, enter
"localhost:9092".
In the Topic field, enter "demo.AVRO".In the Serializer Class field, enter "org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroSerializerBean". Make sure you replace local_project with the Talend project name.Keep the default settings of the other options.
-
Click the Advanced settings tab. In the Kafka
Properties field, add a parameter with the name
"schemaRegistryURL" and the value
"localhost:8081".
-
Double-click the second cKafka component, which is labeled
ReceiveMessageFromKafka, to open its Basic
settings view in the Component tab.
-
In the Broker List field, enter
"localhost:9092".
In the Topic field, enter "demo.AVRO".Keep the default settings of the other options.
-
Click the Advanced settings tab. In the Kafka
Properties field, add two parameters with the name
"schemaRegistryURL" and "valueDeserializer", and
the value "localhost:8081" and
"org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroDeserializerBean"
respectively. Make sure you replace local_project with the Talend
project name.
- Keep the default settings of the cLog components to log the message exchanges.
- Press Ctrl+S to save your Route.
Did this page help you?
If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!