コンポーネントを設定
手順
-
cTimerコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。下のように、オプションはデフォルト設定のままにしておきます。
-
SetBodyAsJsonStringというラベルが付けられている最初のcSetBodyコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
- [Language] (言語)ドロップダウンリストで[Constant] (定数)を選択し、メッセージボディとして[Expression] (式)フィールドに"{\"my_field1\": 1,\"my_field2\": 72438939,\"my_field3\": \"my demo Test message\"}"というJSON文字列を入力します。
-
ConvertBodyToAvroというラベルが付けられている最初のcProcessorコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
-
[Code] (コード)フィールドに、メッセージボディ(JSON文字列)をAvroレコードに変換する以下のJavaコードを設定します。
Object body = exchange.getMessage().getBody(); exchange.getMessage().setBody(KafkaAvroConverterBean.jsonStringToAvro(body));
-
[Advanced settings] (詳細設定)タブをクリックし、[Import] (インポート)フィールドに次のステートメントを入力します。local_projectをTalendのプロジェクト名に置き換えます。
import org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroConverterBean;
-
ConvertBodyToJsonStringというラベルが付けられている2番目のcProcessorコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
-
[Code] (コード)フィールドに、メッセージボディ(Avroレコード)をJSON文字列に変換する以下のJavaコードを設定します。
Object body = exchange.getMessage().getBody(); exchange.getMessage().setBody(KafkaAvroConverterBean.avroToJsonString(body));
-
[Advanced settings] (詳細設定)タブをクリックし、[Import] (インポート)フィールドに次のステートメントを入力します。local_projectをTalendのプロジェクト名に置き換えます。
import org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroConverterBean;
-
SendMessageToKafkaというラベルが付けられている最初のcKafkaコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
-
[Broker List] (ブローカーリスト)フィールドに"localhost:9092"と入力します。
[Topic] (トピック)フィールドに"demo.AVRO"と入力します。[Serializer Class] (シリアライザークラス)フィールドに、"org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroSerializerBean"と入力します。local_projectをTalendのプロジェクト名に置き換えます。その他のオプションはデフォルト設定のままにしておきます。
-
[Advanced settings] (詳細設定)タブをクリックします。[Kafka Properties] (Kafkaのプロパティ)フィールドで、"schemaRegistryURL"という名前と"localhost:8081"という値でパラメーターを追加します。
-
ReceiveMessageFromKafkaというラベルが付けられている2番目のcKafkaコンポーネントをダブルクリックし、[Component] (コンポーネント)タブで[Basic settings] (基本設定)ビューを開きます。
-
[Broker List] (ブローカーリスト)フィールドに"localhost:9092"と入力します。
[Topic] (トピック)フィールドに"demo.AVRO"と入力します。その他のオプションはデフォルト設定のままにしておきます。
-
[Advanced settings] (詳細設定)タブをクリックします。[Kafka Properties] (Kafkaのプロパティ)フィールドに、"schemaRegistryURL"という名前と"localhost:8081"という値を持つパラメーターを1つ、そして"valueDeserializer"という名前と"org.example.local_project.beansjar.kafka_avro_beans.KafkaAvroDeserializerBean"という値を持つパラメーターを1つ追加します。local_projectをTalendのプロジェクト名に置き換えます。
- メッセージ交換をログするcLogコンポーネントはデフォルト設定のままにしておきます。
- Ctrl + Sを押してジョブを保存します。