Bean Jarを作成
手順
- リポジトリーツリービューで[Code] (コード) > [Custom Bean Jars] (カスタムBean Jar)を右クリックし、コンテキストメニューから[Create Bean Jar] (Bean Jarを作成)を選択します。
-
[New Bean Jar] (新規Bean Jar)ダイアログボックスの[Name] (名前)フィールドにKafka_Avro_Beansと入力し、[Finish] (終了)をクリックします。
- [Custom Bean Jars] (カスタムBean Jar)の下にあるKafka_Avro_Beansを右クリックし、コンテキストメニューで[Create Bean] (Beanを作成)を選択します。
-
[New Bean] (新規Bean)ウィザードが開きます。[Name] (名前)フィールドにKafkaAvroSerializerBeanと入力し、[Finish] (終了)をクリックします。
-
デザインワークスペースに次のコードを入力します。
package org.example.local_project.beansjar.kafka_avro_beans; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroSerializer; import io.confluent.kafka.serializers.KafkaAvroSerializerConfig; public class KafkaAvroSerializerBean extends KafkaAvroSerializer { @Override public void configure(KafkaAvroSerializerConfig config) { this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000); this.strategyUsesSchema(true); this.autoRegisterSchema = true; // this.useSchemaReflection = true; // this.normalizeSchema = true; // this.useLatestVersion = true; // this.avroReflectionAllowNull = true; } }
- [Ctrl]+[S]を押し、Beanを保存します。
-
[Custom Bean Jars] (カスタムBean Jar)ノードの下にあるKafka_Avro_Beansを再び右クリックし、KafkaAvroDeserializerBeanという名前と次の内容でBeanを作成します:
package org.example.local_project.beansjar.kafka_avro_beans; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig; public class KafkaAvroDeserializerBean extends KafkaAvroDeserializer { @Override public void configure(KafkaAvroDeserializerConfig config) { this.schemaRegistry = new CachedSchemaRegistryClient(config.getSchemaRegistryUrls(), 1000); this.useSpecificAvroReader = false; } }
-
この操作を繰り返し、Kafka_Avro_Beans内にKafkaAvroConverterBeanという名前と次の内容でBeanを作成します:
package org.example.local_project.beansjar.kafka_avro_beans; import java.io.ByteArrayOutputStream; import java.nio.charset.StandardCharsets; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData.Record; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DatumWriter; import org.apache.avro.io.DecoderFactory; import org.apache.avro.io.EncoderFactory; import org.apache.avro.io.JsonEncoder; public class KafkaAvroConverterBean { // AVRO schema static Schema schema = new Schema.Parser().parse( "{" + "\"doc\": \"Sample schema to help you get started.\"," + "\"fields\": [" + "{" + "\"doc\": \"The int type is a 32-bit signed integer.\"," + "\"name\": \"my_field1\"," + "\"type\": \"int\"" + "}," + "{" + "\"doc\": \"The double type is a double precision (64-bit) IEEE 754 floating-point number.\"," + "\"name\": \"my_field2\"," + "\"type\": \"int\"" + "}," + "{" + "\"doc\": \"The string is a unicode character sequence.\"," + "\"name\": \"my_field3\"," + "\"type\": \"string\"" + "}" + "]," + "\"name\": \"AvroSample\"," + "\"namespace\": \"talend\"," + "\"type\": \"record\"" + "}"); public static String avroToJsonString(Object body) throws RuntimeException { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); DatumWriter<Record> writer = new GenericDatumWriter<Record>(schema); JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, baos, false); writer.write((Record)body, encoder); encoder.flush(); baos.flush(); return new String(baos.toByteArray(), StandardCharsets.UTF_8); } catch (Exception e) { throw new RuntimeException( String.format("Error coverting Avro to Json of schema %s", schema), e); } } public static Record jsonStringToAvro(Object jsonString) throws RuntimeException { try { DatumReader<Object> reader = new GenericDatumReader<Object>(schema); return (Record)reader.read(null, DecoderFactory.get().jsonDecoder(schema, (String)jsonString)); } catch (Exception e) { throw new RuntimeException( String.format("Error coverting json %s to Avro of schema %s", jsonString, schema), e); } } }
- [Custom Bean Jars] (カスタムBean Jar)の下にあるKafka_Avro_Beansを右クリックし、コンテキストメニューで[Edit Bean Jar Libraries] (Bean Jarライブラリーを編集)を選択します。
-
次の依存項目を追加します。
- Confluent Kafkaメッセージブローカーとのメッセージ交換用のルートを作成します。
-
作成したルートを右クリックし、コンテキストメニューから[Setup Code dependencies] (コード依存項目を設定)を選択します。
- [Setup Code dependencies] (コード依存項目を設定)ダイアログボックスの[Custom Bean Jars] (カスタムBean Jar)タブでKafka_Avro_Beansを選択し、OKをクリックします。