メイン コンテンツをスキップする 補完的コンテンツへスキップ

データをKafkaトピックに送信

このシナリオは、パイプラインでコネクターを簡単にセットアップして使用できるようにすることを目的としています。お使いの環境とユースケースに適応させてください。

以下の手順で作成されるパイプラインの例。

始める前に

  • このシナリオを再現する場合は、 test-file-to-kafka.zip ファイルをダウンロードして抽出します。

手順

  1. [Connections] (接続) > [Add connection] (接続を追加)をクリックします。
  2. テスト接続を追加して[Add dataset] (データセットを追加)をクリックします。
  3. [Engine] (エンジン)リストでエンジンを選択します。
    情報メモ注:
    • データの高度処理の場合は、Cloud Engine for DesignではなくRemote Engine Gen2を使用することをお勧めします。
    • Talend Management Consoleから作成されたRemote Engine Gen2がないか、存在していても稼働中ではないステータスの場合は、リストで接続の[Connection type] (接続タイプ)を選択することも、新しい接続を保存することもできません。
    • 使用可能な接続タイプのリストは、選択したエンジンによって異なります。
  4. [Value format] (値の形式)リストでJSONを選択し、[Values] (値)フィールドにtest-file-to-kafka.jsonファイルのコンテンツを貼り付けます。
  5. 名前(action moviesなど)を付けて保存します。
  6. 同じようにしてKafkaサーバーに接続を追加します。
    1. [Connections] (接続) > [Add connection] (接続を追加)をクリックします。
    2. 表示されたパネルで、接続に名前を付け、必要に応じて説明を入力します。

      Kafka
    3. 作成する接続のタイプを選択します。
      ここではKafkaを選択します。
    4. Kafkaプロパティの説明に従って、接続のプロパティを入力してKafkaサーバーに安全にアクセスし、接続をチェックして、[Add dataset] (データセットを追加)をクリックします。
  7. [Add a new dataset] (新しいデータセットを追加)パネルで、データセットに名前を付けます。この例では、collette_movies_jsonトピックを使って映画に関するデータを公開します。

    新しいKafkaデータセットの設定。
  8. データセットに名前(Collette kafka topicなど)を付けます。
  9. [Validate] (検証)をクリックしてデータセットを保存します。
  10. [Pipelines] (パイプライン)ページで[Add pipeline] (パイプラインを追加)をクリックします。新しいパイプラインが開きます。
  11. パイプラインに意味のある名前を付けます。

    From Test to Kafka - send to Kafka topic
  12. [ADD SOURCE] (ソースを追加)をクリックし、パネルが開いたら、action moviesというソースデータセットを選択します。
  13. [Add Processor] (プロセッサーを追加)をクリックしてSplitプロセッサーをパイプラインに追加し、俳優のファーストネームとラストネームの両方が含まれているレコードを分割できるようにします。設定パネルが開きます。
  14. プロセッサーに意味のある名前を付けます。

    split actor names
  15. プロセッサーを設定します。
    1. 名前レコードに応じて値を分割する場合は、[Function name] (関数名)リストで[Split text in parts] (テキストを複数の部分に分割)を選択します。
    2. 特定のレコードの値に同じ変更を適用する場合は、[Fields to process] (処理するフィールド)リストで.detail.starringを選択します。
    3. 特定レコードの値を2つの部分に分割する場合は、[Parts] (分割数)リストに2と入力します。
    4. これらのレコードではファーストネームとラストネームがスペースで区切られているので、[Separator] (区切り)リストで[Space] (スペース)を選択します。
  16. [Save] (保存)をクリックして設定を保存します。
  17. オプションとして、プロセッサーのプレビューを表示し、分割操作後のデータを確認します。
    [Output data] (出力データ)プレビューで、星印の詳細カラムが2つに分割され、1つはファーストネーム用、もう1つはラストネーム用となっています。
  18. [Add Processor] (プロセッサーを追加)をクリックし、パイプラインにFilterプロセッサーを追加します。設定パネルが開きます。
  19. プロセッサーに意味のある名前を付けます。

    filter on movies with actor Collette
  20. プロセッサーを設定します。
    1. データセットにリスト表示されている俳優のラストネームでフィルタリングする場合は、新しいエレメントを追加し、[Input] (入力)リストで.detail.starring_split_2を選択します。
    2. [Optionally select a function to apply] (適用する関数をオプションとして選択)リストで[None] (なし)を選択します。
    3. [Operator] (オペレーター)リストで==を選択します。
    4. Colletteという名前が含まれているデータをフィルタリングする場合は、[Value] (値)フィールドに1と入力します。
    5. [Save] (保存)をクリックして設定を保存します。
  21. オプションとして、Filterプロセッサーのプレビューを表示し、フィルタリング操作後のデータサンプルを確認します。

    [Output data] (出力データ)プレビューで、3件のレコードが基準にマッチしています。
  22. パイプラインで[ADD DESTINATION] (デスティネーションを追加)アイテムをクリックしてパネルを開き、自分の出力データのロード先となるApache Kafkaトピック(Collette Kafka topic)を選択します。
  23. デスティネーションの[Configuration] (設定)タブでは、イベントの公開時に使用されるデフォルトのパーティションタイプ総当たりモデルですが、状況に応じてパーティションキーを指定することもできます。
  24. Talend Cloud Pipeline Designerの上部ツールバーで[Run] (実行)ボタンをクリックするとパネルが開き、実行プロファイルを選択できるようになります。
  25. リストで実行プロファイルを選択し(詳細は実行プロファイルをご覧ください)、[Run] (実行)をクリックしてパイプラインを実行します。

タスクの結果

パイプラインは実行中となり、テストファイルからの映画データが処理され、定義したcollette_movies_json トピックに出力フローが送信されます。

次のタスク

イベントが公開された後は、別のパイプラインでトピックのコンテンツを消費してソースとして使用できます。

ソースが前のデスティネーションパイプラインからのCollette Kafkaトピックである新しいパイプライン。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。