メイン コンテンツをスキップする 補完的コンテンツへスキップ

AvroデータをProducerRecordに書き込み

このタスクについて

書き込みジョブを設定するために使われます。

手順

  1. 書き込みジョブでtFixedFlowInputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します:
    1. [Edit schema] (スキーマを編集)の横にある[...]ボタンをクリックし、[Schema] (スキーマ)ダイアログボックスを開きます。
    2. [+]ボタンをクリックしてカラムを追加し、そのカラムに名前を付けます。例:
      追加されたカラムがすべて含まれている入力スキーマ。
    3. OKをクリックしてこれらの変更を確定し、ポップアップ表示されるダイアログボックスで求められるプロパゲーションを承認します。
    4. [Mode] (モード)エリアで[Use Single Table] (単一テーブルを使用)を選択し、各カラムの値を指定します。
  2. tJavaRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Code] (コード)フィールドにJavaコードを入力してコンテンツを抽出します。例:
      org.apache.avro.Schema valueSchema = new org.apache.avro.Schema.Parser().parse(
      "{"
      	+ "\"type\":\"record\","
      	+ "\"name\":\"testRecord\","
      	+ "\"fields\":["
      		+ "{\"name\":\"id\",\"type\":\"string\"},"
      		+ "{\"name\":\"amount\",\"type\":\"string\"}]"
      + "}"
      );
      org.apache.avro.generic.GenericData.Record valueDataRecord = new org.apache.avro.generic.GenericData.Record(valueSchema);
      valueDataRecord.put("id", row1.id);
      valueDataRecord.put("amount", row1.amount);
      
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		valueDataRecord
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record; 
  3. tKafkaOutputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します:
    1. [Input type] (入力タイプ)ドロップダウンリストでProducerRecordを選択します。
    2. [Version] (バージョン)ドロップダウンリストで、使用するKafkaクラスターのバージョンを選択します。
    3. [Broker list] (ブローカーリスト)フィールドに、使用するKafkaクラスターのブローカーノードのアドレスを入力します。
  4. [Advanced settings] (詳細設定)ビューを開き、次のパラメーターを指定します:
    1. [Key serializer] (キーシリアライザー)リストで[Custom] (カスタム) を選択し、対応するテキストフィールドに"org.apache.kafka.common.serialization.ByteArraySerializer"と入力します。

      Avroキーシリアライザーを使用したい場合は、この例のvaueカラムと同じアプローチで、キー用にorg.apache.avro.Schemaオブジェクトを作成し、tJavaRowコンポーネント内のコードをアップデートしてinput_row.keyの代わりにこのオブジェクトを使用するようにします。

    2. [Value serializer] (値シリアライザー)リストで、Avroを選択します。
  5. 変更を保存します。

タスクの結果

書き込みジョブが設定されます。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。