Skip to main content Skip to complementary content

tDeltaLakeOutput properties for Apache Spark Batch

These properties are used to configure tDeltaLakeOutput running in the Spark Batch Job framework.

The Spark Batch tDeltaLakeOutput component belongs to the Technical family.

The component in this framework is available in all subscription-based Talend products with Big Data and Talend Data Fabric.

Basic settings

Define how to save the dataset

Either Metastore, Files, or Merge.

 

Metastore: Stores data in table format in a metastore.

 

Files: Stores data in Delta format in files.

 

Merge: Stores data by merging it into an existing Delta table.

For more information about merging data in Delta tables, see Upsert into a table using merge in the Databricks documentation.

Define a storage configuration component

Select the configuration component to be used to provide the configuration information for the connection to the target file system.

If you leave this check box clear, the target file system is the local system.

The configuration component to be used must be present in the same Job. For example, if you have dropped a tS3Configuration component in the Job, you can select it to write the result in a given S3 storage system.

This field is available only when you select Files from the Define the source of the dataset drop-down list in the Basic settings view.

Property type

Either Built-In or Repository.

 

Built-In: No property data stored centrally.

 

Repository: Select the repository file where the properties are stored.

The properties are stored centrally under the Hadoop Cluster node of the Repository tree.

The fields that come after are pre-filled in using the fetched data.

For further information about the Hadoop Cluster node, see Managing Hadoop connection metadata.

Schema and Edit Schema

A schema is a row description. It defines the number of fields (columns) to be processed and passed on to the next component. When you create a Spark Job, avoid the reserved word line when naming the fields.

Click Edit schema to make changes to the schema. If the current schema is of the Repository type, three options are available:

  • View schema: choose this option to view the schema only.

  • Change to built-in property: choose this option to change the schema to Built-in for local changes.

  • Update repository connection: choose this option to change the schema stored in the repository and decide whether to propagate the changes to all the Jobs upon completion.

    If you just want to propagate the changes to the current Job, you can select No upon completion and choose this schema metadata again in the Repository Content window.

Spark automatically infers data types for the columns in a PARQUET schema. In a Talend Job for Apache Spark, the Date type is inferred and stored as int96.

This component offers the advantage of the dynamic schema feature. This allows you to retrieve unknown columns from source files or to copy batches of columns from a source without mapping each column individually. For further information about dynamic schemas, see Dynamic schema.

This dynamic schema feature is designed for the purpose of retrieving unknown columns of a table and is recommended to be used for this purpose only; it is not recommended for the use of creating tables.

 

Built-In: You create and store the schema locally for this component only.

 

Repository: You have already created the schema and stored it in the Repository. You can reuse it in various projects and Job designs.

Folder/File

Browse to, or enter the path pointing to the data to be used in the file system.

If the path you set points to a folder, this component will read all of the files stored in that folder, for example, /user/talend/in; if sub-folders exist, the sub-folders are automatically ignored unless you define the property spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive to be true in the Advanced properties table in the Spark configuration tab.
  • Depending on the filesystem to be used, configure the corresponding configuration component placed in your Job, for example, a tS3Configuration component for S3 and a tAzureFSConfiguration for Azure Storage and Azure Data Lake Storage.

The button for browsing does not work with the Spark Local mode; if you are using the other Spark Yarn modes that Talend Studio supports with your distribution, ensure that you have properly configured the connection in a configuration component in the same Job. Use the configuration component depending on the filesystem to be used.

This field is available only when you select Files from the Select how to save the dataset drop-down list in the Basic settings view.

Action

Select an operation for writing data to the filesystem to which the configuration component in your Job provides the connection information:
  • Create: Creates the Target table or directory specified in the Folder/File field and writes data in it.
  • Append: Adds incoming records to the data that already exists in the Target table or in the directory specified in the Folder/File field.
  • Overwrite: Overwrites the Target table or directory specified in the Folder/File field using the incoming data.
  • Drop table: Deletes the table and the directory associated with the table from the filesystem, and recreates a table.
  • Drop table if exists: Deletes the table and the directory associated with the table from the filesystem, and recreates a table.. If the table does not exist, nothing happens.
  • Truncate table: Deletes all rows in the table but the schema of the table remains.

Delta Lake systematically creates slight differences between the upload time of a file and the metadata timestamp of this file. Bear in mind these differences when you need to filter data.

This option is available only when you select Files or Metastore from the Select how to save the dataset drop-down list in the Basic settings view.

Source type

The source type of the input data, which can be either Dataset or SQL.

This field is available only when you select Merge from the Select how to save the dataset drop-down list in the Basic settings view.

 

Dataset: select this value if the source data is a dataset that comes from a flow of data.

 

SQL: select this value if you want to use a SQL statement to retrieve the source data from a Delta table.

In the SQL field, enter the SQL statement that defines the source data to retrieve.
Information noteNote: The Delta table, from which the data are retrieved, can either come from a metastore or from the component that is connected to the input.
In the Table aliasfield, enter the alias of the table that contains the source data. Surround the alias with double quotation marks.
Information noteNote: The table alias is the given name of the table that results from the SELECT statement used in the SQL query.

Database

Enter, in double quotation marks, the name of the Delta Lake database to be used.

This field is available only when you select Metastore or Merge from the Select how to save the dataset drop-down list in the Basic settings view.

Target table

Enter, in double quotation marks, the name of the table to be used.

This field is available only when you select Metastore or Merge from the Select how to save the dataset drop-down list in the Basic settings view.

External path

Enter, in double quotation marks, the path pointing to a filesystem, different from the DBFS, to store the data. It can either be a ADLS Gen2 filesystem or a S3 filesystem.

This field is available only when you select Metastore from the Select how to save the dataset drop-down list in the Basic settings view.

Optimize

Enter the SQL statement to optimize the layout of the data.

For more information about optimizing Delta Lake data, see Optimize (Delta Lake on Databricks) in the Databricks documentation.

This field is available only when you select Metastore from the Select how to save the dataset drop-down list in the Basic settings view.

Merge on

Indicate the input and output columns on which to apply the merge operation. For each pair of input and output columns, you must specify a function. The resulting clause will condition which merge actions to perform. These merge actions are defined in the When matched and When not matched options.

For example, suppose an Airline company works with data related to flight events. They regularly handle changes in flight departure dates and need to update their current data with the new dates. To merge this new data into existing events based on the ID of the flight, they might indicate flightId as input and output columns and the== function. Then, they can define the merge actions to perform when the result of this condition returns true. In this case, the condition is true when the values of flightId in the input and output columns are equal.

This field is available only when you select Merge from the Select how to save the dataset drop-down list in the Basic settings view.

When matched

Select this check box to define merge actions to perform when the functions defined in the Merge on table return true. You can define one merge action for each of the two When matched options. If you define two When matched options, the first one must have a clause condition and the two options are evaluated in the order in which they are defined.

In the condition field, you can enter a condition statement to further refine the data on which to apply the merge action. If you specify a condition, the merge action is performed on a given row only if the condition is true for this row. Using conditions can speed up the merge. The condition must follow the format targetTable.column = this.column, where targeTable is the name of the target table. With Dataset source type, this is the name of the connection. With SQL source type, this must be replaced with the Table alias field.

In the Merge action drop-down list, select the action to perform.
  • DELETE: deletes the row in the target table.
  • UPDATE SET *: replaces all the values in the output row with the values from the input row. This action requires that the source table has the same columns as the target table.
  • UPDATE: replaces some values in the output row with the corresponding values from the input row. In this case, you must specify the columns on which to apply the update.

For example, suppose that an airline data table contains the number of passengers of a flight and a boolean that indicates if a flight has become more crowded. The following condition limits the scope of the merge action to the flights that have become more crowded based on the number of passengers: flightEvents.nbOfPeople < this.nbOfPeople. The merge action could then be to update the value of isFlightMoreCrowded to true with the UPDATE SET* action.

This field is available only when you select Merge from the Select how to save the dataset drop-down list in the Basic settings view.

When not matched

Select this check box to define merge actions to perform when the functions defined in the Merge on table return false.

In the condition field, you can enter a condition statement to further refine the data on which to apply the merge action. If you specify a condition, the merge action is performed on a given row only if the condition is true for this row. Using conditions can speed up the merge. The condition must follow the format targetTable.column = this.column, where targeTable is the name of the target table. With Dataset source type, this is the name of the connection. With SQL source type, this must be replaced with the Table alias field.

In the Merge action drop-down list, select the action to perform.
  • DELETE: deletes the row in the target table.
  • INSERT SET *: inserts all the columns from the input data into the corresponding output columns. This action requires that the source table has the same columns as the target table.
  • INSERT: inserts some of the columns from the input data into the corresponding output columns. In this case, you must specify the columns to insert.

For example, suppose that an airline data table is partitioned by date and the merge action has the following condition: flightEvents.date = current_date(). The merge action then applies when the functions defined in the merge on table return false and only on the data that corresponds to flights that occurred on the current date.

This field is available only when you select Merge from the Select how to save the dataset drop-down list in the Basic settings view.

Advanced settings

Define column partitions Select this check box and complete the table that is displayed using columns from the schema of the incoming data. The records of the selected columns are used as keys to partition your data.
Sort columns alphabetically Select this check box to sort the schema columns in the alphabetical order. If you leave this check box clear, these columns stick to the order defined in the schema editor.
Merge Schema The schema of your datasets often evolves through time. Select this check box to merge the schemas of the incoming data and the existing data when their schemas are different.

If you leave this check box and the Overwrite Schema check box clear, only the columns of the existing data are used.

Overwrite Schema

The schema of your datasets often evolves through time. Select this check box to use the schemas of the incoming data to overwrite the schemas of the existing data.

If you leave this check box and the Merge Schema check box clear, only the columns of the existing data are used.

Usage

Usage rule

This component is used as an end component and requires an input link.

Delta Lake systematically creates slight differences between the upload time of a file and the metadata timestamp of this file. Bear in mind these differences when you need to filter data.

This Delta Lake layer is built on top of your Data Lake system, thus to be connected as part of your Data Lake system using the configuration component corresponding to your Data Lake system, for example, tAzureFSConfiguration.

Spark Connection

In the Spark Configuration tab in the Run view, define the connection to a given Spark cluster for the whole Job. In addition, since the Job expects its dependent jar files for execution, you must specify the directory in the file system to which these jar files are transferred so that Spark can access these files:
  • Yarn mode (Yarn client or Yarn cluster):
    • When using Google Dataproc, specify a bucket in the Google Storage staging bucket field in the Spark configuration tab.

    • When using HDInsight, specify the blob to be used for Job deployment in the Windows Azure Storage configuration area in the Spark configuration tab.

    • When using Altus, specify the S3 bucket or the Azure Data Lake Storage for Job deployment in the Spark configuration tab.
    • When using on-premises distributions, use the configuration component corresponding to the file system your cluster is using.

  • Standalone mode: use the configuration component corresponding to the file system your cluster is using.

    If you are using Databricks without any configuration component present in your Job, your business data is written directly in DBFS (Databricks Filesystem).

This connection is effective on a per-Job basis.

Did this page help you?

If you find any issues with this page or its content – a typo, a missing step, or a technical error – please let us know!