メイン コンテンツをスキップする 補完的コンテンツへスキップ

ConsumerRecordからAvroデータを読み取り

このタスクについて

読み取りジョブを設定するために使われます。

手順

  1. 読み取りジョブでtFixedFlowInputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します:
    1. [Edit schema] (スキーマを編集)の横にある[...]ボタンをクリックし、[Schema] (スキーマ)ダイアログボックスを開きます。
    2. [+]ボタンをクリックしてカラムを追加し、そのカラムに名前を付けます。例:
      追加されたカラムがすべて含まれている入力スキーマ。
    3. OKをクリックしてこれらの変更を確定し、ポップアップ表示されるダイアログボックスで求められるプロパゲーションを承認します。
    4. [Mode] (モード)エリアで[Use Single Table] (単一テーブルを使用)を選択し、各カラムの値を指定します。
  2. tJavaRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Code] (コード)フィールドにJavaコードを入力してコンテンツを抽出します。例:
      org.apache.avro.Schema valueSchema = new org.apache.avro.Schema.Parser().parse(
      "{"
      	+ "\"type\":\"record\","
      	+ "\"name\":\"testRecord\","
      	+ "\"fields\":["
      		+ "{\"name\":\"id\",\"type\":\"string\"},"
      		+ "{\"name\":\"amount\",\"type\":\"string\"}]"
      + "}"
      );
      org.apache.avro.generic.GenericData.Record valueDataRecord = new org.apache.avro.generic.GenericData.Record(valueSchema);
      valueDataRecord.put("id", row1.id);
      valueDataRecord.put("amount", row1.amount);
      
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		valueDataRecord
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record;
  3. tKafkaOutputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します:
    1. [Input type] (入力タイプ)ドロップダウンリストでProducerRecordを選択します。
    2. [Version] (バージョン)ドロップダウンリストで、使用するKafkaクラスターのバージョンを選択します。
    3. [Broker list] (ブローカーリスト)フィールドに、使用するKafkaクラスターのブローカーノードのアドレスを入力します。
  4. [Advanced settings] (詳細設定)ビューを開き、次のパラメーターを指定します:
    1. [Key serializer] (キーシリアライザー)リストで[Custom] (カスタム) を選択し、対応するテキストフィールドに"org.apache.kafka.common.serialization.ByteArraySerializer"と入力します。

      Avroキーシリアライザーを使用したい場合は、この例のvaueカラムと同じアプローチで、キー用にorg.apache.avro.Schemaオブジェクトを作成し、tJavaRowコンポーネント内のコードをアップデートしてinput_row.keyの代わりにこのオブジェクトを使用するようにします。

    2. [Value serializer] (値シリアライザー)リストで、Avroを選択します。
  5. サブジョブでtKafkaInputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します:
    1. [Output type] (出力タイプ)ドロップダウンリストで、ConsumerRecordを選択します。
      ConsumerRecordを使用する場合、Avroレコードは、Talend Studioで次のように[Object] (オブジェクト)として分類されます。
    2. [Version] (バージョン)ドロップダウンリストで、使用するKafkaクラスターのバージョンを選択します。
    3. [Broker list] (ブローカーリスト)フィールドに、使用するKafkaクラスターのブローカーノードのアドレスを入力します。
    4. [Topic name] (トピック名)フィールドに、tKafkaInputがメッセージフィードを受信する元となるトピックの名前を入力します。
    5. [Consumer group id] (コンシューマーグループID)フィールドに、tKafkaInputの所属先としたいコンシューマーグループの名前を入力します。
  6. tJavaRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します:
    1. [Edit schema] (スキーマを編集)の横にある[...]ボタンをクリックし、[Schema] (スキーマ)ダイアログボックスを開きます。
    2. [+]ボタンをクリックしてカラムを追加し、そのカラムに名前を付けます。例:
    3. [OK]をクリックしてこれらの変更を確認し、ポップアップ表示されるダイアログボックスで求められるプロパゲーションを承認します。
    4. [Code] (コード)フィールドにJavaコードを入力してコンテンツを抽出します。例:
      org.apache.kafka.clients.consumer.ConsumerRecord record = (org.apache.kafka.clients.consumer.ConsumerRecord) input_row.record;
      
      output_row.topic = record.topic();
      output_row.partition = record.partition();
      output_row.timestamp = record.timestamp();
      
      output_row.header1 = record.headers().lastHeader("header1").value();
      output_row.header2 = record.headers().lastHeader("header2").value();
      
      output_row.key = (byte[]) record.key();
      
      org.apache.avro.generic.GenericData.Record avroRecord = (org.apache.avro.generic.GenericData.Record) record.value();
      output_row.id = avroRecord.get("id").toString();
      output_row.amount = avroRecord.get("amount").toString();
  7. tLogRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、[Mode] (モード)エリアで[Table] (テーブル)を選択して、結果をテーブルに表示させます。
  8. 変更を保存します。

タスクの結果

読み取りジョブが設定されます。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。