ジョブ内のKafkaとAVRO
Talendジョブでは、AVROフォーマットのデータを(非)シリアライズするためにAVROが提供するアプローチに反映されているように、Kafkaコンポーネント(通常のKafkaコンポーネント)とAVROのKafkaコンポーネントは、AVROデータを異なる方法で処理します。
- 通常のKafkaコンポーネントは、JSON形式のみを読み書きします。したがって、KafkaがAVROデータを生成または使用し、何らかの理由でAVROのKafkaコンポーネントが使えない場合は、avro-toolsライブラリーを使って、ジョブの外部でAVROとJSONの間でデータを変換する必要があります。 例:この例で使われているavro-tools-1.8.2.jarライブラリーはMVN Repository (英語のみ)からダウンロードできます。このコマンドは、out.avroファイルをjsonに変換します。
java -jar C:\2_Prod\Avro\avro-tools-1.8.2.jar tojson out.avro
Orこのコマンドは、twitter.avscからスキーマを使ってtwitter.jsonファイルをtwitter.avroに変換します。java -jar avro-tools-1.8.2.jar fromjson --schema-file twitter.avsc twitter.json > twitter.avro
- AVROのKafkaコンポーネントは、Sparkフレームワークでのみ使えます。データはAVRO形式で直接処理されます。KafkaクラスターがAVROデータを生成し、消費する場合は、tKafkaInputAvroを使ってデータをKafkaから直接読み取り、tWriteAvroFieldsを使ってAVROデータをtKafkaOutputに送信してください。
ただし、これらのコンポーネントは、avro-toolsライブラリーによって作成されたAVROデータを処理しません。これは、avro-toolsライブラリーとAVROのコンポーネントが、AVROの提供するアプローチと同じアプローチを使わないためです。
AVRO形式のデータを(逆)シリアライズするためにAVROが提供する2つのアプローチは次のとおりです。
- AVROファイルは、各ファイルに埋め込まれたAVROスキーマを使用して生成されます( org.apache.avro.file.{DataFileWriter/DataFileReader} )。avro-toolsライブラリーはこのアプローチを使います。
- AVROレコードは、各レコードにスキーマを埋め込むことなく生成されます(org.apache.avro.io.{BinaryEncoder/BinaryDecoder}を使用)。AVROのKafkaコンポーネントは、このアプローチを使います。
この方法は、AVROでエンコードされたメッセージが常にKafkaトピックに書き込まれる場合に強く推奨されます。この方法では、すべてのメッセージにAVROスキーマを再び埋め込むためのオーバーヘッドが発生しないためです。AVROスキーマのサイズは比較的大きく、そのため、各メッセージにスキーマを埋め込むのは費用対効果が低い方法になります。他方、レコード(メッセージ)は通常小さいため、Spark Streamingを使ってデータをKafkaに読み書きする場合、これは他のアプローチに比べて大きな利点です。
2つのアプローチの出力を同じ読み書きの処理で混在させることはできません。