メイン コンテンツをスキップする 補完的コンテンツへスキップ

AvroデータをProducerRecordに書き込み

このタスクについて

書き込みジョブを設定するために使われます。

手順

  1. 書き込みジョブでtFixedFlowInputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Edit schema] (スキーマを編集)の横にある[...]ボタンをクリックし、[Schema] (スキーマ)ダイアログボックスを開きます。
    2. [+]ボタンをクリックしてカラムを追加し、そのカラムに名前を付けます。例:
    3. [OK]をクリックしてこれらの変更を確認し、ポップアップ表示されるダイアログボックスで求められるプロパゲーションを承認します。
    4. [Mode] (モード)エリアで[Use Single Table] (単一テーブルを使用)を選択し、各カラムの値を指定します。
  2. tJavaRowコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Code] (コード)フィールドにJavaコードを入力してコンテンツを抽出します。例:
      org.apache.kafka.clients.producer.ProducerRecord record = new org.apache.kafka.clients.producer.ProducerRecord(
      		input_row.topic,
      		input_row.partition,
      		input_row.timestamp,
      		input_row.key,
      		input_row.value
      );
      
      record.headers().add("header1", input_row.header1);
      record.headers().add("header2", input_row.header2);
      
      output_row.record = record;
  3. tKafkaOutputコンポーネントをダブルクリックして[Basic settings] (基本設定)ビューを開き、次のパラメーターを指定します。
    1. [Input type] (入力タイプ)ドロップダウンリストでProducerRecordを選択します。
    2. [Version] (バージョン)ドロップダウンリストで、使用するKafkaクラスターのバージョンを選択します。
    3. [Broker list] (ブローカーリスト)フィールドに、使用するKafkaクラスターのブローカーノードのアドレスを入力します。

タスクの結果

書き込みジョブが設定されます。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。