Skip to main content

Pentaho+ documentation has moved!

The new product documentation portal is here. Check it out now at docs.hitachivantara.com

 

Hitachi Vantara Lumada and Pentaho Documentation

Kinesis Consumer

Parent article

The Kinesis Consumer step gets and processes data records from Amazon Kinesis Data Streams (KDS). This step is useful for managing your Amazon KDS Applications. When you set up an Amazon KDS application in the Kinesis Consumer step, the name property uniquely identifies the application that is associated with your AWS account and geographical region of the data stream. Then your consumer is ready to get and process data records from the indicated Kinesis data stream.

In the PDI Kinesis Consumer step itself, you can define the location for processing, as well as the specific data formats to stream data and system metrics. You can set up this step to collect monitored events, track user consumption of data streams, and monitor alerts.

The Kinesis Consumer step pulls streaming data from Amazon Kinesis Data Streams (KDS) through a PDI transformation. The parent Kinesis Consumer step runs a child transformation that executes according to message batch size or duration, so you can process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.

You can configure the Kinesis Consumer step to continuously ingest streaming data from the Kinesis Data Streams. Depending on your setup, you can execute the transformation within PDI or within the Adaptive Execution Layer (AEL), using Spark as the processing engine. When using Spark, you must execute the child transformation according to Duration (ms) only.

Additionally, in the Kinesis Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. Records processed by a Kinesis Consumer step in a parent transformation can then be passed downstream to any other steps included within the same parent transformation.

NoteSince the Kinesis Consumer step continuously ingests streaming data, you may want to use the Abort step in either the parent or child transformation to stop consuming records from Kinesis Data Streams for specific workflows. For example, you can run the parent transformation on a timed schedule, or abort the child transformation if sensor data exceeds a preset range.

AEL considerations

When using the Kinesis Consumer step with the Adaptive Execution Layer, the following may affect performance and results.

  • When running the Kinesis Consumer step on AEL Spark, use HDP 3.x. Earlier versions of HDP are not supported.

General

The Kinesis Consumer step requires definitions for setup, batch, fields, result fields, and Kinesis Data Streams specific options to consume messages.

Kinesis Consumer step

Enter the following information in the Step name and Transformation fields:

OptionDescription
Step nameSpecifies the unique name of the step on the canvas. The Step name is set to Kinesis Consumer by default.
TransformationSpecify the child transformation to execute by performing any of the following actions.

The selected child transformation must start with the Get Records from Stream step.

If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path. For example, if the current transformation's path is /home/admin/transformation.ktr and you select a transformation in the directory /home/admin/path/sub.ktr, then the path is automatically converted to ${Internal.Entry.Current.Directory}/path/sub.ktr.

If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation.

Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository.

Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the Kinesis Consumer step. When you click the New button, a new child transformation generates the required Get Records from Stream step in a new canvas tab.

Procedure

  1. In the Kinesis Consumer step, click New.

    The Save As dialog box appears.
  2. Navigate to the location where you want to save your new child transformation, then type in the file name.

  3. Click Save.

    A notification box displays informing you that the child transformation has been created and opened in a new tab. If you do not want to see this notification in the future, select the Don't show me this again check box.
  4. Click the new transformation tab to view and edit the child transformation.

    It automatically contains the Get Records from Stream step. Optionally, you can continue to build this transformation and save it.
  5. When finished, return to the Kinesis Consumer step.

Options

The Kinesis Consumer step features several tabs. Each tab is described below.

Setup tab

Setup tab

In this tab, define your specific data stream within Amazon Kinesis Data Streams and the related starting location for consuming records. The records in the Amazon Kinesis Data Streams are stored by the data stream name and geographical areas known as regions. A Kinesis Data Streams application reads the records from the data stream. The name property of the Kinesis application specifies a consumer of the data stream and uniquely identifies the last point at which this consumer has read from the data stream.

Groups of records in Amazon Kinesis Data Streams are known as shards. The shard iterator property narrows the location of the records in a shard if a given application name has never been used to read from a data stream before. See https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html and https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html for more information on Amazon Kinesis Data Streams applications and shards.

OptionDescription
RegionSpecify the Amazon geographical area where the data stream occurs. You can only select one region.
Stream nameSpecify the name of the specific data stream within Amazon Kinesis Data Streams that contains the records to be consumed.
Application NameSpecify the name property of the existing Amazon Kinesis Data Streams application used to get records from a data stream. A given Application Name must not be used with more than one Stream name. Do not consume records from more than one data stream at a time.
Shard iteratorIf you are using an application (as specified in Application Name) for the first time to locate records to consume, select how to further refine the starting location for consuming data. The following iterator options are available:
  • TRIM_HORIZON

    The last untrimmed record (the oldest data record).

  • LATEST

    The most recent records.

  • AT_TIMSTAMP

    A specified point in time. If you select this option, use Date and Time to specify that point in time.

The default value is LATEST.

Batch tab

Batch tab in Kinesis Consumer

Use this tab to designate how many messages to consume before processing. You can specify message count and/or a specific amount of time.

How many messages consumed before processing is defined by either the Duration (ms) or the Number of records option. Messages are consumed when either the specified duration or number of records occur. For example, if Duration (ms) is set to 1000 milliseconds and Number of records is 1000, messages are consumed for processing whenever time intervals of 1000 milliseconds are reached or 1000 records have been received. If you set either option to zero, PDI will ignore that parameter.

You can also specify the maximum number of batches used to collect records at the same time.

OptionDescription
Duration (ms)Specify a time in milliseconds. This value is the amount of time the step will spend collecting records prior to the execution of the transformation.

If this option set to a value of 0, then Number of records triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation.

Note: You must set this field if you are using Spark as your processing engine.

Number of recordsSpecify a number. After every ‘X’ number of records, the specified transformation will be executed and these ‘X’ records will be passed to the transformation.

If this option set to a value of 0 then Duration triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation.

Maximum concurrent batchesSpecify the maximum number of batches used to collect records at the same time. The default value is 1, which indicates a single batch is used for collecting records.

This option should only be used when your consumer step cannot keep pace with the speed at which the data is streaming. Your computing environment must have adequate CPU and memory for this implementation. An error will occur if your environment cannot handle the maximum number of concurrent batches specified.

Message prefetch limitSpecify a limit for how many incoming messages this step will queue for processing, as they are received from the broker. Setting this value forces the broker to manage the backpressure of messages exceeding the specified limit. The default number of messages to queue is 100000.

Fields tab

Fields tab in Kinesis Consumer step

Use this tab to define the fields in the record format.

OptionDescription
Input name

The input name is received from the Kinesis streams. The following are received by default:

  • message

    The individual message contained in a record. Each record consists of a sequence number, a partition key, and a message.

  • partition key

    Used to group data by shard within a stream. The partition key is associated with each data record to determine which shard a given data record belongs to.

  • sequence number

    A sequential ID number assigned to each record. It uniquely identifies each record within the partition.

  • subsequence number

    Because all subrecords within a record have the same sequence number, additional data is stored with the checkpoint if you need to distinguish between subrecords. This additional data is the subsequence number.

  • shard id

    The unique identifier of the shard within the stream.

Output nameThe Output name can be mapped to subscriber and member requirements.
Type

The Type field defines the data format for streaming the record. You must choose the same data type that produced the records. This field applies to the message input names. Options include:

  • String
  • Boolean
  • Number
  • Integer
  • Binary

The default value is String.

Result fields tab

Result fields tab in Kinesis Consumer

Use this tab to select the step from the child transformation that will stream records back to the parent transformation. Records processed by a Kinesis Consumer step in the parent transformation can then be passed downstream to any other steps included within the same parent transformation.

  • Return fields from

    Select the name of the step from the child transformation to stream fields back to the parent transformation. The data values in these returned fields are available to any subsequent downstream steps in the parent transformation.

Options tab

Options tab in Kinesis Consumer

Use this tab to configure additional properties for ingesting records from a data stream within Amazon Kinesis Data Streams. You should have advance understanding of Amazon Kinesis Data Streams before considering how to best adjust these properties from their default values. See https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-implementation-app-java.html for more details.

PropertyDescription
Write Timeout SecondsAmount of time to wait for PDI to acknowledge back to Amazon Kinesis Data Streams that the last point in a data stream has been reached before an exception is thrown.
Connection Timeout SecondsAmount of time to wait when initially establishing a connection before giving up and timing out.
Connection Acquisition TimeoutAmount of time to wait when acquiring a connection from the pool before giving up and timing out.
Use Enhanced FanoutIf true, consumers receive records from a stream with throughput of up to 2 MiB of data per second per shard. The default is false.

Setting Use Enhanced Fanout to true incurs additional costs with Amazon Web Services (AWS). See https://docs.aws.amazon.com/streams/latest/dev/introduction-to-enhanced-consumers.html and https://aws.amazon.com/kinesis/data-streams/pricing/ for further details.

Max ConcurrencyMaximum number of allowed concurrent requests.
Max Pending Connection AcquiresMaximum number of requests allowed to queue up once Max Concurrency is reached .
Read TimeoutAmount of time to wait for a read on a socket before an exception is thrown.
Max Http2 StreamsMaximum number of concurrent streams for an HTTP/2 connection.
Max Initialization AttemptsMaximum number of attempts to initialize.
Poll Interval (ms)Interval in milliseconds between attempts to initialize.

Metadata injection support

All fields of this step support metadata injection. You can use this step with ETL metadata injection to pass metadata to your transformation at runtime.