Creating a Bean Jar
Procedure
- From the repository tree view, right-click Code > Custom Bean Jars and select Create Bean Jar from the contextual menu.
-
In the New Bean Jar dialog box, enter
Kafka_Avro_Beans in the Name field
and click Finish.
- Right-click Kafka_Avro_Beans under the Custom Bean Jars node and select Create Bean from the contextual menu.
-
The New Bean wizard opens. In the
Name field, enter
KafkaAvroSerializerBean and click Finish.
-
Enter the following code in the design workspace.
package org.example.local_project.beansjar.kafka_avro_beans; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } }
- Press Ctrl+S to save your bean.
-
Right-click Kafka_Avro_Beans under the Custom
Bean Jars node again and create a Bean with the name
KafkaAvroDeserializerBean and the following content:
package org.example.local_project.beansjar.kafka_avro_beans; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000); this.useSpecificAvroReader = false; } }
-
Repeat this operation to create a Bean inside
Kafka_Avro_Beans with the name
KafkaAvroConverterBean and the following content:
package org.example.local_project.beansjar.kafka_avro_beans; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonEncoder; public class KafkaAvroConverterBean { // AVRO schema static Schema schema = new Schema.Parser().parse( "{" + "\"doc\": \"Sample schema to help you get started.\"," + "\"fields\": [" + "{" + "\"doc\": \"The int type is a 32-bit signed integer.\"," + "\"name\": \"my_field1\"," + "\"type\": \"int\"" + "}," + "{" + "\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"," + "\"name\": \"my_field2\"," + "\"type\": \"int\"" + "}," + "{" + "\"doc\": \"The string is a unicode character sequence.\"," + "\"name\": \"my_field3\"," + "\"type\": \"string\"" + "}" + "]," + "\"name\": \"AvroSample\"," + "\"namespace\": \"talend\"," + "\"type\": \"record\"" + "}"); public static String avroToJsonString(Object body) throws RuntimeException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DatumWriter<Record> writer = new GenericDatumWriter<Record>(schema); JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false); writer.write((Record)body, encoder); encoder.flush(); baos.flush(); return new String(baos.toByteArray(), StandardCharsets.UTF_8); } catch (Exception e) { throw new RuntimeException( String.format("Error coverting Avro to Json of schema %s", schema), e); } } public static Record jsonStringToAvro(Object jsonString) throws RuntimeException { try { DatumReader<Object> reader = new GenericDatumReader<Object>(schema); return (Record)reader.read(null, DecoderFactory.get().jsonDecoder(schema, (String)jsonString)); } catch (Exception e) { throw new RuntimeException( String.format("Error coverting json %s to Avro of schema %s", jsonString, schema), e); } } }
- Right-click Kafka_Avro_Beans under the Custom Bean Jars node and select Edit Bean Jar Libraries from the contextual menu.
-
Add the following dependencies.
- Create a Route for the message exchange with Confluent Kafka message broker.
-
Right-click the Route you just created and select Setup Code
dependencies from the contextual menu.
- In the Setup Code dependencies dialog box, select Kafka_Avro_Beans in the Custom Bean Jars tab and click OK.
Did this page help you?
If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!