Skip to main content
Pentaho Documentation

Kafka Consumer

The PDI client can pull streaming data from Kafka through a Kafka transformation. The Kafka Consumer step runs a sub-transformation that executes according to message batch size or duration, letting you process a continuous stream of records in near-real-time. This sub-transformation must start with the Get records from stream step.

You can configure this step to continuously ingest streaming data from your Kafka server. Depending on your setup, you can execute the transformation within PDI or within the Adaptive Execution Layer (AEL), using Spark as the processing engine. In the Kafka Consumer step itself, you can define the number of messages to accept for processing, as well as the specific data formats to stream activity data and system metrics. You can set up this step to collect monitored events, track user consumption of data streams, and monitor alerts.

If you are using Spark as the processing engine, you must execute the sub-transformation according to ‘Duration (ms)’ only.

Kafka records are stored within topics, and consist of a category to which the records are published. Topics are divided into a set of logs known as partitions. Kafka scales topic consumption by distributing partitions among a consumer group. A consumer group is a set of consumers sharing a common group identifier.

Before using the Kafka Consumer step, you must select and configure the shim for your distribution. For information on configuring a shim for a specific distribution, see Set Up Pentaho to Connect to a Hadoop Cluster.

Since the Kafka Consumer step continuously ingests streaming data, you may want to use the Abort step in your parent or sub-transformation to stop consuming records from Kafka for specific workflows. For example, you can run the parent transformation on a timed schedule, or abort the sub-transformation if sensor data exceeds a preset range.

General

The Kafka Consumer step requires setup, batch, and field definitions to stream messages. 

PDITrans_KafkaConsumer_SetupTab.png

Enter the following information in the transformation step name field.

Option Description

Step name

Specifies the unique name of the transformation step on the canvas. The Step Name is set to ‘Kafka Consumer’ by default.

Transformation

Specify your transformation to execute by entering its path or clicking Browse and selecting the path. Note that this transformation must start with the Get Records from Streaming step.

If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path. For example, if the current transformation's path is /home/admin/transformation.ktr and you select a transformation in the folder /home/admin/path/sub.ktr, then the path is automatically converted to ${Internal.Entry.Current.Directory}/path/sub.ktr.

If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation.

Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository.

Setup Tab

PDITrans_KafkaConsumer_SetupTabCropped.png

In this tab, define the connections used for receiving messages, topics to which you want to subscribe, and the consumer group for the topics.

Option Description

Connection

Select a connection type:

  • Direct - Specify the Bootstrap servers from which you want to receive the Kafka streaming data.
  • Cluster - Specify the Hadoop cluster configuration from which you want to retrieve the Kafka streaming data. In a Hadoop cluster configuration, you can specify information like host names and ports for HDFS, Job Tracker, security, and other big data cluster components. Multiple servers can be specified if these are part of the same cluster. For information on Hadoop clusters, see Set Up Pentaho to Connect to a Hadoop Cluster

Topics

Enter the name of each Kafka topic from which you want to consume streaming data (messages). You must include all topics that you want to consume.

Consumer group

Enter the name of the group of which you want this consumer to be a member.  Each Kafka Consumer step will start a single thread for consuming.

 

When part of a consumer group, each consumer is assigned a subset of the partitions from topics it has subscribed to, which locks those partitions.  Each instance of a Kafka Consumer step will only run a single consumer thread.

 

 

Batch Tab

PDITrans_KafkaConsumer_BatchTab.png

Use this tab to determine how many messages to consume before processing. You can specify message count and/or a specific amount of time.

Note: While either option will trigger consumption, the first satisfied option will start the transformation for the batch.

Note: If you are using Spark as the processing engine, you must execute the sub-transformation according to ‘Duration (ms)’ only.

Option Description

Number of records

Specify a number. After every ‘X’ number of records, the specified transformation will be executed and these ‘X’ records will be passed to the transformation.

If set to a value of ‘0’ then Duration triggers consumption.

Duration (ms)

Specify a time in milliseconds. This value is the amount of time the step will spend collecting records prior to the execution of the transformation.

Note: You must set this field if you are using Spark as your processing engine.

If set to a value of ‘0’, then Number of records triggers consumption.

Note: Either Number of records or Duration must contain a value greater than ‘0’ to run the transformation.

Fields Tab

PDITrans_KafkaConsumer_FieldsTab.png

Use this tab to define the fields in the record format.

Option Description

Input name

The input name is received from the Kafka streams. The following are received by default:

  • key: Determines message distribution to partitions. If no key is present, messages are randomly distributed from partitions.
  • message: The individual message contained in a record. Each record consists of a key, a value, and a timestamp.
  • topic: The category to which records are published.
  • partition: An ordered sequence of records that is continuously appended. You cannot have more consumers than the number of partitions.
  • offset: A sequential ID number assigned to each record. It uniquely identifies each record within the partition.
  • timestamp: The time the message is received on the server.

Output name

The Output name can be mapped to subscriber and member requirements.

Type

The Type field defines the data format for streaming the record. You must choose the same data type that produced the records. This field applies to the ‘key’ and ‘message’ input names. Options include:

  • String
  • Boolean
  • Number
  • Integer
  • Binary
 

Options Tab

PDITrans_KafkaConsumer_OptionsTab.png

Use this tab to configure the property formats of the Kafka consumer broker sources.  A few of the most common property formats have been included for your convenience. You can enter any desired Kafka property. For further information on these input names, see the Apache Kafka documentation site: https://kafka.apache.org/documentation/.

Metadata Injection Support

All fields of this step support metadata injection. You can use this step with ETL Metadata Injection to pass metadata to your transformation at runtime.