Skip to main content
Pentaho Documentation

MQTT Consumer

The PDI client can pull streaming data from an MQTT broker or clients through an MQTT transformation. The parent MQTT Consumer step runs a child transformation that executes according to the message batch size or duration, allowing you to process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step.

Additionally, from the MQTT Consumer step, you can select a step in the child transformation to stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in a parent transformation to be passed downstream to any other steps included within the same parent transformation. 

General 

The MQTT Consumer step requires Setup, Security, Batch, Fields, Result fields, and Options definitions to stream messages. 

PDI_TransStep_Dialog_MQTTConsumer.png

Enter the following information for the Step Name and Transformation fields:

Option Description
Step name Specifies the unique name of the step on the canvas. The Step name is set to ‘MQTT Consumer’ by default.
Transformation

Specify the child transformation to execute by performing any of the following actions:

•    Entering its path
•    Clicking Browse to select an existing child transformation
•    Clicking New to create and save a new child transformation. See Create and Save a New Child Transformation for more details.

The selected child transformation must start with the Get Records from Stream step.

If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path. For example, if the current transformation's path is: /home/admin/transformation.ktr and you select a transformation in the directory /home/admin/path/sub.ktr, then the path is automatically converted to: ${Internal.Entry.Current.Directory}/path/sub.ktr

If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation.

Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository.

Create and Save a New Child Transformation

If you do not already have a child transformation, you can create one while setting up the MQTT Consumer step. When you click the New button, a new child transformation will automatically generate the required Get Records from Stream step in a new canvas tab. All your fields and types are customized in the child transformation's Get Records from Stream step to match the fields and types specified in the Fields tab of the parent MQTT Consumer step. 

1.    In the MQTT Consumer step, click New. The Save As dialog box appears.
2.    Navigate to the location where you want to save your new child transformation, then type in the file name.
3.    Click Save. A notification box displays informing you that the child transformation has been created and opened in a new tab. If you do not want to see this notification in the future, select the Don't show me this again check box.
4.    Click the new transformation tab to view and edit the child transformation. It automatically contains the Get Records from Stream step. Optionally, you can continue to build this transformation and save it.
5.    When finished, return to the MQTT Consumer step.

Options

The MQTT Consumer step includes several tabs. Each tab is described below.

Setup Tab

PDI_TransStep_Tab_MQTTConsumer_Setup.png

In the Setup tab, define the connections used for receiving messages, topics to which you want to subscribe, and the consumer group for the topics.

Option Description
Connection Specify the address of the MQTT server to which this step will connect for sending or retrieving messages.
Topics Specify the MQTT topic(s) to be subscribed to.
Quality of Service (QoS)

Quality of Service (QoS) is a level of guarantee for message delivery. Select one of the following options. 

•    At most once (0) - Default
•    At least once (1)
•    Exactly once (2)

Security Tab

PDI_TransStep_Tab_MQTTConsumer_Security.png

The Security tab allows you to define authentication credentials for the MQTT server. This tab includes the following options:

Option Description
Username Specify the user name required to access the MQTT server.
Password Specify the password associated with the Username.
Use secure protocol

Select this option if you want to define SSL properties for the connection.

This security protocol setting is used only on Kettle. It is not used on AEL Spark.

SSL Properties

ssl.contextProvider – Specify the underlying JSSE provider.

ssl.enabledCipherSuites – Specify which ciphers are enabled. Values are dependent on the provider. 

ssl.keyManager – Specify the algorithm that will be used to create a KeyManagerFactory object instead of using the default algorithm available in the platform.

ssl.keyStore – Specify the name of the file that contains the KeyStore object that you want the KeyManager to use.

ssl.keyStorePassword – Specify the password for the KeyStore object that you want the KeyManager to use.

ssl.keyStoreProvider – Specify the identifying name or string for the key store provider. 

ssl.keyStoreType – Specify the identifying name or string for the type of key store. 

ssl.protocol – Specify the type of SSL protocol to use.

ssl.trustManager – Specify the algorithm that will be used to create a TrustManagerFactory object, instead of using the default algorithm available in the platform. 

ssl.trustStore – Specify the name of the file that contains the KeyStore object that you want the TrustManager to use.

ssl.trustStorePassword – Specify the password for the TrustStore object that you want the TrustManager to use.

ssl.trustStoreProvider – Specify the identifier or string for the trust store provider. 

ssl.trustStoreType – Specify the type of KeyStore object that you want the TrustManager to use.

Batch Tab

PDI_TransStep_Tab_MQTTConsumer_Batch.png

Use this tab to determine how many messages to consume before processing. You can specify message count and/or a specific amount of time.

While either option will trigger consumption, the first satisfied option will start the transformation for the batch.

If you are using Spark as the processing engine, you must execute the child transformation according to 'Duration (ms)' only. 

Option Description
Duration (ms)

Specify a time in milliseconds. This value is the amount of time the step will spend collecting records prior to the execution of the transformation.

You must set this field if you are using Spark as your processing engine.

If set to a value of ‘0’, then Number of records triggers consumption.

Number of records

Specify a number. After every ‘X’ number of records, the specified transformation will be executed and these ‘X’ records will be passed to the transformation.

If set to a value of ‘0’ then Duration triggers consumption.

Either Number of records or Duration must contain a value greater than ‘0’ to run the transformation.

Fields Tab

PDI_TransStep_Tab_MQTTConsumer_Fields.png

Use this tab to define the fields in the record format.

Option Description
Input name

The input name is received from the MQTT streams. The following are received by default:

•    message: The individual message contained in a record. 
•    topic: The category to which records are published.

Output name The Output name can be mapped to subscriber and member requirements.
Type This will always be a String. This field applies to the ‘message’ and ‘topic’ input names.

Result fields Tab

PDI_TransStep_Tab_MQTTConsumer_Result_fields.png

Use this tab to select the step from the child transformation that will stream records back to the parent transformation. This capability allows records processed by an MQTT Consumer step in the parent transformation to be passed downstream to any other steps included within the same parent  transformation.

Option Description
Return fields from: Select the name of the step (from the child transformation) that will stream fields back to the parent transformation. The data values in these returned fields will be available to any subsequent downstream steps in the parent transformation. 

Options Tab

PDI_TransStep_Tab_MQTTConsumer_Options.png

The Options tab includes the following MQTT-specific parameters:

Parameter Description
Keep Alive Interval Specify a maximum number of interval seconds that is permitted to elapse between the point at which the Client finishes transmitting one Control Packet and the point it starts sending the next.
Max Inflight Specify a number for the maximum number of messages to have in process at any given time.
Connection Timeout Specify the time, in seconds, to disconnect if a message is not received.
Clean Session

Specify if the broker will store or purge messages for a session.  Select one of the following.

  • True:  When set to True, the broker will not store any information for the client.  All information from a previous persistent session will be purged.
  • False: When set to False, the broker will store all subscriptions for the client. When the QoS (Quality of Service) parameter is set to either ‘1’ or ‘2’, all missed messages will be stored. For more information, see the Quality of Service parameter in the Setup Tab.
Storage Level

Indicates if messages are stored in memory or on disk.  

•    The default (blank) is memory. 

•    For disk, enter a valid path. 

This setting is used only on Kettle. It is not used on AEL Spark,  which uses its own configuration.

Server URIs Specify the MQTT server’s universal resource identifier (URI).
MQTT Version Specify the MQTT protocol version that this step is connecting to.
Automatic Reconnect

Enables the client to attempt an automatic re-connect to the server if it becomes disconnected. Select True or False:

  • True: Yes - attempt to reconnect to the server. 
  • False: No - do not attempt to reconnect to the server.

Metadata Injection Support

All fields of this step support metadata injection. You can use this step with ETL Metadata Injection to pass metadata to your transformation at runtime.

Metadata injection is not supported for steps running on the Adaptive Execution Layer (AEL).

See Also

MQTT Producer