Partitioning data allows you to distribute all the data from a set into distinct subsets according to the rule applied on a table or row, where these subsets form a partition of the original set with no item replicated into multiple groups. Partitioning data is an important feature for scaling up and scaling out your transformations and jobs. Scaling up makes the most of a single server with multiple CPU cores, while scaling out maximizes the resources of multiple servers operating in parallel.
By default, each step in a transformation is executed in parallel in a single separate thread. Consider, for example, the transformation below. With a single copy of each step, the data is read from the ‘CSV file input’ step and then aggregated in the ‘count by state’ step. The results of which can be verified by examining the preview data.
Partitioning during Data Processing
To take advantage of the processing resources in your server, you can scale up the transformation using the multi-threading option ‘Change Number of Copies to Start…’ to produce copies of the steps (right click the step to access the menu). As shown below, the “x2” notation indicates that two copies will be started at runtime. By default, this ‘Data Movement’ from the ‘CSV file input’ step into the ‘count by state’ step will be performed in round-robin order. This means that if there are N copies, the first copy gets the first row, the second copy gets the second row, and the Nth copy receives the Nth row. Row N+1 goes to the first copy again, and so on until there are no more rows to distribute. Reading the data from the CSV file is done in parallel. Attempting to aggregate in parallel, however, produces incorrect results because the rows are split arbitrarily (without a specific rule) over the 2 copies of the ‘count by state’ aggregation step, as shown in the preview data.
Understand Repartitioning Logic
Data distribution in the steps is shown in the following table.
As you can see, the "CSV file input" step divides the work between two step copies and each copy reads 50 rows of data. However, these 2 step copies also need to make sure that the rows end up on the correct "count by state" step copy where they arrive in a 43/57 split. Because of that, it is a general rule that the step performing the repartitioning (row redistribution) of the data (a non-partitioned step before a partitioned one) has internal buffers from every source step copy to every target step copy, as shown below.
This is where partitioning data becomes a useful concept, as it applies specific rule-based direction for aggregation, directing rows from the same state to the same step copy, so that the rows are not split arbitrarily. In the example below, a partition schema called "State" was applied to the ‘count by state’ step and the ‘Remainder of division’ partitioning rule was applied to the ‘state’ field. Now, the ‘count by state’ aggregation step produces consistent correct results because the rows were split up according to the partition schema and rule, as shown in the preview data.
Note: To view this transformation in the PDI client, open the Pentaho/…/design-tools/data-integration/samples/transformations/General - parallel reading and aggregation.ktr sample file.
Partitioning Data over Tables
The ‘Table output’ step (double-click the step to open it) supports partitioning rows of data to different tables. When configured to accept the table name from a ‘Partitioning field,’ the PDI client will output the rows to the appropriate table. You can also ‘Partition data per month’ or ‘Partition data per day.’ To ensure that all the necessary tables exist, we recommend creating them in a separate transformation.
The partitioning method you use can be based on any criteria, can include no rule (round-robin row distribution), or can be created using a partitioning method plugin. The idea is to establish a criterion by which to partition the data, so that resulting storage and processing groups are logically independent from each other.
Step One, setup the partition schema:
- First, configure a partition schema. A partition schema defines how many ways the row stream will be split. The names used for the partitions can be anything you like.
- Next, apply the partition schema to the “group by” step. By applying a partition schema to a step, a matching set of step copies is started automatically (for example, if applying a partition schema with three partitions, three step copies are launched).
Step Two, select the partitioning method:
- Establish the partitioning method for the step, which defines the rule for row distribution across the copies. The ‘Remainder of division’ rule allows rows with the same state value to be sent to the same step copy and the distribution of similar rows among the steps. If the modulo is calculated on a non-integer value, the PDI Client calculates the modulo on a checksum created from the String, Date, and Number value.
When you run the transformation, there are no guarantees as to which page name goes to which step copy, only that any page name encountered is consistently forwarded to the same step copy.
Use Data Swimlanes
When a partitioned step passes data to another partitioned step with the same partition schema, the data is kept in swimlanes because no repartitioning needs to be done. As illustrated below, no extra buffers (row sets) are allocated between the copies of steps ‘count by state’ and ‘Replace in string.’
The step copies remain isolated from one another and the rows of data travel in swimlanes. No extra work needs to be done to keep the data partitioned, so you can chain as many partitioned steps as needed. This will internally be executed as shown in the following illustration.
Rules for Partitioning
When you use partitioning, the logic used for distribution, repartitioning, and buffer allocations will be dependent upon the following rules:
- A partitioned step causes one step copy to be executed per partition in the partition schema.
- When a step needs to repartition the data, the step creates buffers (row sets) from each source step copy to each target step copy (partition).
- When rows of data pass from a non-partitioned step to a partitioned one, data is repartitioned and extra buffers are allocated.
- When rows of data, partitioned with the same partition schema, pass from a partitioned step to another partitioned step, data is not repartitioned.
- When rows of data, partitioned with a different partition schema, pass from a partitioned step to another partitioned step, data is repartitioned.
Partitioning Clustered Transformations
Partitioning data allows your transformations to scale out on a cluster of slave servers to maximize the resources of machines operating in parallel. When a step is assigned to run on a Carte master node (that is, non-clustered in a clustered transformation), the same rules apply as described above.
In case a clustered step is partitioned, the partitions are distributed over the number of slave servers. As a result, the number of partitions needs to be equal to or higher than the number of slave servers in the cluster schema. It is, therefore, recommended to allow the PDI client to create the partition schema dynamically in a clustered environment.
You should always limit repartitioning on a cluster to a minimum, as high amounts of networking and CPU overhead can be incurred, which is caused by the massive amounts of data passing from one server to another over TCP/IP sockets. Also, to get optimal performance on a cluster, try to keep the data in swimlanes, described above, for as long as possible.