Créer un JAR d'un bean personnalisé
Procédure
- Dans la vue Repository, cliquez-droit sur Code > Custom Bean Jars et sélectionnez Create Bean Jar dans le menu contextuel.
-
Dans la boîte de dialogue New Bean Jar, saisissez Kafka_Avro_Beans dans le champ Name et cliquez sur Finish.
- Cliquez-droit sur Kafka_Avro_Beans dans le nœud Custom Bean Jars et sélectionnez Create Bean dans le menu contextuel.
-
L'assistant New Bean s'ouvre. Dans le champ Name, saisissez KafkaAvroSerializerBean et cliquez sur Finish.
-
Saisissez le code suivant dans l'espace de modélisation graphique.
package org.example.local_project.beansjar.kafka_avro_beans; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } }
- Appuyez sur Ctrl+S pour sauvegarder votre bean.
-
Cliquez-droit sur Kafka_Avro_Beans dans le nœud Custom Bean Jars à nouveau et créez un Bean du nom KafkaAvroDeserializerBean, avec la contenu suivant :
package org.example.local_project.beansjar.kafka_avro_beans; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000); this.useSpecificAvroReader = false; } }
-
Répétez cette opération pour créer un Bean dans Kafka_Avro_Beans, du nom KafkaAvroConverterBean, avec le contenu suivant :
package org.example.local_project.beansjar.kafka_avro_beans; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonEncoder; public class KafkaAvroConverterBean { // AVRO schema static Schema schema = new Schema.Parser().parse( "{" + "\"doc\": \"Sample schema to help you get started.\"," + "\"fields\": [" + "{" + "\"doc\": \"The int type is a 32-bit signed integer.\"," + "\"name\": \"my_field1\"," + "\"type\": \"int\"" + "}," + "{" + "\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"," + "\"name\": \"my_field2\"," + "\"type\": \"int\"" + "}," + "{" + "\"doc\": \"The string is a unicode character sequence.\"," + "\"name\": \"my_field3\"," + "\"type\": \"string\"" + "}" + "]," + "\"name\": \"AvroSample\"," + "\"namespace\": \"talend\"," + "\"type\": \"record\"" + "}"); public static String avroToJsonString(Object body) throws RuntimeException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DatumWriter<Record> writer = new GenericDatumWriter<Record>(schema); JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false); writer.write((Record)body, encoder); encoder.flush(); baos.flush(); return new String(baos.toByteArray(), StandardCharsets.UTF_8); } catch (Exception e) { throw new RuntimeException( String.format("Error coverting Avro to Json of schema %s", schema), e); } } public static Record jsonStringToAvro(Object jsonString) throws RuntimeException { try { DatumReader<Object> reader = new GenericDatumReader<Object>(schema); return (Record)reader.read(null, DecoderFactory.get().jsonDecoder(schema, (String)jsonString)); } catch (Exception e) { throw new RuntimeException( String.format("Error coverting json %s to Avro of schema %s", jsonString, schema), e); } } }
- Cliquez-droit sur Kafka_Avro_Beans dans le nœud Custom Bean Jars et sélectionnez Edit Bean Jar Libraries dans le menu contextuel.
-
Ajoutez les dépendances suivantes.
- Créez une Route pour l'échange de messages avec le broker de message Confluent Kafka.
-
Cliquez-droit sur la Route créée et sélectionnez Setup Code dependencies dans le menu contextuel.
- Dans la boîte de dialogue Setup Code dependencies, sélectionnez Kafka_Avro_Beans dans l'onglet Custom Bean Jars et cliquez sur OK.
Cette page vous a-t-elle aidé ?
Si vous rencontrez des problèmes sur cette page ou dans son contenu – une faute de frappe, une étape manquante ou une erreur technique – faites-le-nous savoir.