メイン コンテンツをスキップする 補完的コンテンツへスキップ

変換済みジョブを編集

コンポーネントを更新し、Spark Streamingフレームワーク内で実行されるデータ変換プロセスを確定します。

DBFSシステムの代わりにKafkaクラスターを使用して、ストリーミング映画データをジョブに提供します。ディレクターデータは、ルックアップフローでDBFSから引き続き取り込まれます。

始める前に

  • 使用するDatabricksクラスターが正しく設定され、実行されています。

  • クラスターの管理者が、読み書き権限と、ユーザー名をDBFSおよびAzure ADLS Gen2ストレージシステム内の関連データとディレクトリーへのアクセスに使用する権限を付与していること。

手順

  1. [Repository] (リポジトリー)aggregate_movie_director_spark_streamingジョブをダブルクリックしてワークスペース内に開きます。

    アイコンは、元のジョブに使用されていたコンポーネントが現在のジョブフレームワーク(Spark Batch)内に存在しないことを示します。この例ではtHDFSInputtHDFSOutputです。

  2. tHDFSInputをクリックして選択し、[Warning] (警告)ポップアップウィンドウで[OK]をクリックしてこのウィンドウを閉じます。
  3. キーボードのDeleteキーを押してtHDFSInputを削除します。
  4. ジョブのワークスペース内でtFileInputDelimitedと入力し、表示されるリストからこのコンポーネントを選択します。

    tFileInputDelimitedがワークスペースに追加されます。

  5. 同様に、tHDFSOutputtFileOutputDelimitedに置き換えます。
  6. [Repository] (リポジトリー)[Metadata] (メタデータ)ノードの下で、[Hadoop cluster] (Hadoopクラスター)ノードに続いてmy_cdh接続ノードとその子ノードを展開して、HDFSフォルダーの下で設定したmoviesスキーマメタデータノードを表示します。
  7. ジョブのワークスペースで、このスキーマメタデータノードを新しいtFileInputDelimitedコンポーネントにドロップします。
  8. このtFileInputDelimitedコンポーネントを右クリックし、コンテキストメニューから[Row] (行) > [Main] (メイン)の順に選択し、tMapをクリックしてtMapに接続します。
  9. tMapを右クリックし、コンテキストメニューから[Row] (行) > [out1]の順に選択し、新しいtFileOutputDelimitedをクリックして、tMapをこのコンポーネントに接続します。
  10. 新しいtFileOutputDelimitedコンポーネントをダブルクリックし、その[Component] (コンポーネント)ビューを開きます。
  11. [Folder] (フォルダー)フィールドで、結果を書き込む必要のあるディレクトリーを入力するか、そこに移動します。このシナリオでは/user/ychen/output_data/spark_batch/outです。映画のディレクターの名前を含むレコードがここに入ります。
  12. [Merge result to single file] (結果を1つのファイルにマージする)チェックボックスをオンにして、part-ファイル(通常はSparkで生成)を1つのファイルにマージします。

    [Merge file path] (ファイルパスのマージ)フィールドが表示されます。

  13. [Merge file path] (ファイルパスのマージ)フィールドに、part-ファイルをマージするファイルを入力するか、またはそこに移動します。

    このシナリオでは、このファイルは/user/ychen/output_data/spark_batch/out/mergedです。

  14. tMapから[reject] (リジェクト)リンクを受け取る別のtFileOutputDelimitedコンポーネントをダブルクリックし、その[Component] (コンポーネント)ビューを開きます。
  15. [Folder] (フォルダー)フィールドで/user/ychen/output_data/spark_batch/rejectへのディレクトリーを設定します。
  16. [Run] (実行)ビューで[Spark configuration] (Spark設定)タブをクリックし、Hadoop/Spark接続メタデータが元のジョブから適切に継承されていることを確認します。

    Spark Batchジョブに対する所定のHadoop/Sparkディストリビューションへの接続を定義するには、常にこの[Spark configuration] (Spark設定)タブを使用する必要があります。また、この接続はジョブごとに有効になります。

  17. ジョブの実行されるマシンのホスト名をSparkクラスターが解決できるかどうかが不明な場合は、[Define the driver hostname or IP address] (ドライバーホスト名またはIPアドレスを指定)チェックボックスをオンにして、フィールドが表示されたら、このマシンのIPアドレスを入力します。

    このチェックボックスをオフのままにしておくと、Sparkクラスターは、127.0.0.1に置かれているマシン(クラスター自体の中にあるマシン)の中でSparkドライバーを探します。

  18. [F6]を押してジョブを実行します。

タスクの結果

[Run] (実行)ビューがStudioの下側に自動的に開き、このジョブの実行の進行状況を示します。

ジョブが完了すると、たとえばHDFSシステムのWebコンソール内で、出力がHDFSに書き込まれていることを確認できます。

このページは役に立ちましたか?

このページまたはコンテンツにタイポ、ステップの省略、技術的エラーなどの問題が見つかった場合はお知らせください。