Skip to main content

Pentaho+ documentation has moved!

The new product documentation portal is here. Check it out now at docs.hitachivantara.com

 

Hitachi Vantara Lumada and Pentaho Documentation

About Spark tuning in PDI

Parent article

Spark tuning is the customization of PDI transformation and step parameters to improve the performance of your PDI transformation executed on the Spark engine. These Spark parameters include both the transformation parameters, which we call application tuning parameters when working with PDI transformations on Spark, and step-level parameters, which we call Spark tuning options. Together, the application tuning parameters and the Spark tuning options control memory storage and the number of partitions which are critical to optimizing the execution of your transformation.

Audience and prerequisites

Spark tuning features are intended for ETL developers who have a solid understanding of PDI, Spark, and your system's cluster resources. For effective Spark tuning, you need to know how a transformation uses your resources, including both your environment and data size. PDI steps vary in their resource requirements, so you should tune the Spark engine to meet the needs of each transformation.

To use the Spark tuning features, you need access to the following information:

  • Cluster resources
  • Amount of resources available to the Spark engine during execution, including memory allotments and number of cores.
  • Size of data.

You may want to consult your cluster administrator, who can perform the following tasks:

  • Monitor cluster resources on the YARN Resource Manager.
  • Manage Spark execution resources on the Spark History Server.

Executing on the Spark engine

The Spark engine groups data into partitions. Data is processed in each partition. The Spark engine creates executors, which process partitions of the data. In some cases, you can improve performance by either adding executors or increasing their memory size. The amount and size of executors is limited to your cluster resources.

Not all the memory set aside in the Spark memory model is available for data processing. The memory allotted in the Spark memory model is broken down into the following segments:

Memory typeAmountNote
Reserved memory300 MB hard coded for Spark.Cannot be adjusted and is not available to any executor.
User memory25% of the memory leftover after the reserved memory.Non-dataset storage used for the executors.
Spark memory75% of the memory leftover after the reserved memory.Roughly half of this memory is used for data storage, and the other half is used for execution.

The amount of memory available for partitions comes from the data storage part of Spark memory model. The rest of the Spark memory model is available for the executor. ETL tasks usually require more data storage, while AI and machine learning tasks need more execution memory.

The Spark engine changes the state of data in the memory model through the following types of Spark transformations:

  • Narrow Spark transformation

    A Spark task that only requires data from a single partition. The data can be input, transformed, and output all within the same partition.

  • Wide Spark transformation

    A Spark task that requires data from multiple partitions. In a wide transformation, the data must be shuffled between partitions. The partition used to input the data is not the same as the partition used to transform and output the data.

A PDI transformation may have one or more narrow or wide transformations in Spark.

Narrow Spark transformations are more efficient than wide Spark transformations. Wide Spark transformations can lead to repartitioning, which can lead to slow data transfer speeds, transfer failures, and re-calculations. Examples of wide transformations are join, sort, and grouping operations. You can improve execution by coalescing the partitions (reducing the number of partitions) to consolidate splits without shuffling data.

Tuning the Spark application parameters

Spark transformations are lazy loaded. They are only loaded into a process when they are needed, which are triggered by actions. Some examples of these actions are count, collect, take, and save.

Spark views PDI transformations as applications. If you execute a PDI transformation on a Spark engine, the PDI transformation appears in Spark as an application on your cluster.

Opening Spark application tuning

You can specify values for fine tuning the following types of Spark application parameters within either the data-integration/adaptive-execution/config/application.properties file or the Parameters tab of the PDI Transformation properties dialog box:

  • Executor and driver resource sizing
  • YARN utilization impacts
  • Default partitioning
  • Memory splits

You can also control these types of Spark application parameters through PDI environments variables.

As an example of application tuning, the number of default executors may be too low for your PDI transformation to efficiently utilize YARN capacity. You would improve capacity by increasing the number of executors based on data storage memory and cluster resources.

See Configuring application tuning parameters for Spark for more information on setting Spark application parameters in PDI. See the Spark Application Properties documentation for a full list of Spark application parameters.

Setting the Spark tuning options on PDI steps

At the PDI step level, you have the following capabilities to fine tune your execution on the Spark engine:

  • Data persistence

    The cache and persist.storageLevel options help you save downstream computations after wide Spark transformations to reduce the amount of possible recalculations. See Spark RDD Persistence documentation for a list of possible Spark storage values.

  • Repartition and coalesce

    The repartition.numPartitions, repartition.columns, and coalesce options help you increase or decrease the number of partitions. Repartition splits and shuffles data into new partitions. Coalesce collapses down the number of partitions without shuffling.

  • Broadcast join

    The join.broadcast.stepName option helps you push datasets out to executors to reduce shuffling during join operations. You should only broadcast out relatively small amounts of data. The maximum broadcast size is set by the spark.sql.autoBroadcastJoinThreshold parameter.

Opening the PDI step Spark tuning options

You can access Spark tuning from a PDI step by right-clicking on the step in the canvas and selecting Spark tuning from the menu. The Spark tuning parameters dialog box appears. Only the Spark tuning options available for that step appear in the dialog box. These options vary step-to-step, depending on the scope of the step.

See Spark Tuning for more information.

Example: Improving performance using Spark step tuning options

As an example, you may have a PDI transformation you are trying to execute on the Spark engine that takes many hours to run. While researching this slow performance, you notice only a small amount of YARN memory is used for this execution. Further research indicates you have not enough executors for too many partitions based on the default HDFS block size.

You could try the following workflow to possibly improve performance:

Procedure

  1. Determine how many executors are feasible to use more available YARN memory.

  2. Adjust application tuning parameters in the Parameters tab of the PDI Transformation properties dialog box to increase the number of executors to the feasible amount.

    You are now using a higher amount of YARN memory and performance is better, but you feel you need further performance improvement. You could start tuning at the PDI step level. Because of the large amount of default partitions, you could coalesce down to a smaller number of partitions.
  3. Determine how far you can coalesce down to a target number of partitions per your memory and cluster resources.

  4. Set coalesce to true and repartition.numPartitions to your target number of partitions.

    You also notice you are joining data within your PDI transformation.
  5. Set joinBroadcast.stepName to the name of the PDI step introducing the data into your join step.

    Performance has improved, but you feel it could be even better. You could persist the data after the wide Spark transformations but before output to save downstream computations
  6. Set persist.storageLevel to OFF_HEAP. The IN_MEMORY setting allocates memory away from the executors and could lead to an overflow. See RDD Persistence for possible storage options.

Results

This workflow could work in improving performance for a given PDI transformation with too few executors for too many partitions, yet every PDI transformation and your cluster resources are different. Before trying to apply application and step tuning, you should know how to effectively adjust Spark parameters, your specific PDI transformation, and cluster resources . Generally, 3 to 5 YARN cores per executor is a good point for sizing your resources. You can assess the number of partitions after you size the executors.

For more information about how to improve the performance of your transformations, see Optimizing Spark tuning.

Cautions for Spark tuning

When approaching Spark tuning in PDI, you should consider that one tuning solution may not work for all your PDI transformations. Effective tuning depends on knowing how your environment changes, how your data sizes change, which PDI steps require more resources, and how tuning those steps impacts the execution of the transformation. As a best practice, you should know how your data skews. Balanced partitions increase stability, but skewing data too much may cause executors to overflow and fail.

The Spark engine in PDI runs on the Adaptive Execution Layer (AEL), which does not safeguard against incorrect tuning. Some tuning may cause executors and applications to fail, and may reduce performance instead of improving it. See Optimizing Spark tuning to learn how to use PDI logging and Spark logging to monitor changes to application and step tuning parameters, and how those changes affect the execution of your transformation and its related Spark memory model.