メイン コンテンツをスキップする 補完的コンテンツへスキップ

Apache Pulsarトピックにメッセージを公開

このシナリオは、パイプラインでコネクターを簡単にセットアップして使用できるようにすることを目的としています。お使いの環境とユースケースに適応させてください。

以下の手順で作成されるパイプラインの例。

手順

  1. [Connections] (接続) > [Add connection] (接続を追加)をクリックします。
  2. 開いたパネルで、作成する接続のタイプを選択します。

    data generator
  3. [Engine] (エンジン)リストでエンジンを選択します。
    情報メモ注:
    • データの高度処理の場合は、Cloud Engine for DesignではなくRemote Engine Gen2を使用することをお勧めします。
    • Talend Management Consoleから作成されたRemote Engine Gen2がないか、存在していても稼働中ではないステータスの場合は、リストで接続の[Connection type] (接続タイプ)を選択することも、新しい接続を保存することもできません。
    • 使用可能な接続タイプのリストは、選択したエンジンによって異なります。
  4. 作成する接続のタイプを選択します。
    ここで、[Data generator] (データジェネレーター)を選択します。
  5. [Add dataset] (データセットを追加)をクリックし、Dataジェネレータープロパティの説明に従ってデータセットプロパティを入力します。
  6. [Add a new dataset] (新しいデータセットを追加)パネルで、データセットに名前を付けます。

    customer generated data
  7. プロパティを入力して、目的のテスト顧客データを生成します。この例では、単純なLDAPプロトコルを使用しています。
    1. テストレコードを100件生成したいので、[Rows] (行)フィールドに100を入力します。
    2. [Add] (追加)フィールドをクリックし、エレメントの[Name] (名前)フィールドにfirstnameを入力し、[Type] (タイプ)リストで[First Name] (ファーストネーム)を選択して、空のフィールドを含まないランダムなファーストネームを生成したいので、[Blank %] (空白%)フィールド内に0を入力します。
    3. [Add] (追加)フィールドをクリックし、エレメントの[Name] (名前)フィールドにlastnameを入力し、[Type] (タイプ)リストで[Last Name] (ラストネーム)を選択して、空のフィールドを含まないランダムなラストネームを生成したいので、[Blank %] (空白%)フィールド内に0を入力します。
    4. [Add] (追加)フィールドをクリックし、エレメントの[Name] (名前)フィールドにageを入力し、[Type] (タイプ)リストで[Age] (年齢)を選択して、空のフィールドを含まない18歳から99歳の年齢を生成したいので、[Min] (最小)フィールドに18[Max] (最大)フィールドに99[Blank %] (空白%)フィールドに0を入力します。
    ファーストネーム、ラストネーム、年齢のカラムを持つデータが作成された顧客のヘッドサンプル。
  8. [Connections] (接続) > [Add connection] (接続を追加)をクリックします。
  9. 作成する接続のタイプを選択します。
    ここではApache Pulsarを選択します。
  10. Apache Pulsarプロパティの説明に従って、接続のプロパティを入力し、Apache Pulsarブローカーにアクセスし、接続をチェックして、[Add dataset] (データセットを追加)をクリックします。
  11. [Add a new dataset] (新しいデータセットを追加)パネルで、データセットに名前を付けます。この例では、今のところは空であるcustomer-ageトピックを使って処理済み顧客情報に関するデータを公開します。
    新しいPulsarデータセットの設定。
  12. データセットに名前(Customers on Pulsarなど)を付けます。
  13. [Validate] (検証)をクリックしてデータセットを保存します。
  14. [Pipelines] (パイプライン)ページで[Add pipeline] (パイプラインを追加)をクリックします。新しいパイプラインが開きます。
  15. パイプラインに意味のある名前を付けます。

    From Data generator to Pulsar - publish msg to Pulsar
  16. [ADD SOURCE] (ソースを追加)をクリックし、パネルが開いたら、customer generated dataというソースデータセットを選択します。
  17. [Add Processor] (プロセッサーを追加)をクリックし、Type converterプロセッサーをパイプラインに追加して、ageフィールドのデータ型を変更してフィールド値に計算を実行できるようにします。設定パネルが開きます。
  18. プロセッサーに意味のある名前を付けます。

    convert age data type
  19. [Converters] (コンバーター)エリアで次の操作を行います。
    1. 特定レコードの値のデータ型を変更する場合は、[Field path] (フィールドパス)リストで.ageを選択します。
    2. データ型を整数からダブルに変更する場合は、[Output type] (出力タイプ)リストでDoubleを選択します。
  20. [Save] (保存)をクリックして設定を保存します。
  21. オプションとして、プロセッサーのプレビューを表示し、タイプ変換後のデータを確認します。
    [Output data] (出力データ)プレビューで、Age Data型がDouble型になっています。
  22. [Add Processor] (プロセッサーを追加)をクリックしてパイプラインにAggregateプロセッサーを追加し、顧客の平均年齢を計算できるようにします。設定パネルが開きます。
  23. プロセッサーに意味のある名前を付けます。

    calculate average age
  24. [Operations] (操作)エリアで次の操作を行います。
    1. 特定レコードの値の平均値を計算する場合は、[Field path] (フィールドパス)リストで.ageを選択します。
    2. [Operation] (操作)リストでAverageを選択します。
    3. 新しく生成されたフィールドの名前を変更する場合は、[Output field name] (出力フィールド名)フィールドにavg_ageと入力します。
  25. [Save] (保存)をクリックして設定を保存します。
  26. オプションとして、プロセッサーのプレビューを表示し、集計操作後のデータを確認します。
    [Output data] (出力データ)プレビューで、平均年齢のレコードがDouble型で表示されています。
  27. パイプラインで[ADD DESTINATION] (デスティネーションを追加)アイテムをクリックしてパネルを開き、自分の出力データのロード先となるApache Pulsar (Customers on Pulsar)を選択します。
  28. デスティネーションの[Configuration] (設定)タブで、[Producer name] (プロデューサー名)にチェックを入れ、データのロード先となるトピックを選択します。
    デスティネーション項目の設定。
  29. Talend Cloud Pipeline Designerの上部ツールバーで[Run] (実行)ボタンをクリックするとパネルが開き、実行プロファイルを選択できるようになります。
  30. リストで実行プロファイルを選択し(詳細は実行プロファイルをご覧ください)、[Run] (実行)をクリックしてパイプラインを実行します。

タスクの結果

パイプラインは実行中となり、ローカルデータからの平均年齢データが処理され、定義したApache Pulsarトピックに出力フローが送信されます。

次のタスク

イベントが公開された後は、別のパイプラインでPulsarメッセージを消費してソースデータセットとして使用できます。

ソースが前のデスティネーションパイプラインからの顧客年齢のデータである新しいパイプライン。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。