Skip to main content
Pentaho Documentation

AMQP Consumer

Parent article

The Advanced Message Queuing Protocol (AMQP) Consumer step receives streaming data from an AMQP message producer through an AMQP 0-9-1 compatible broker. You can configure this step to use an existing AMQP message queue or create a new one.

You can also set up the AMQP Consumer step to continuously ingest streaming data from either an AMQP message or broker to collect messages about monitored events, track user consumption of data streams, or monitor alerts. The parent AMQP Consumer step runs a child (sub-transformation) that executes according to the message batch size or duration, letting you process a continuous stream of records in near real-time. The child transformation must start with the Get records from stream step. Additionally, you can select a step in the child transformation to stream records back to the parent transformation, which passes the records downstream to any other steps included within the same parent transformation.

NoteSince the AMQP Consumer step continuously ingests streaming data, you may want to use the Abort step in either the parent or child transformation to stop consuming records from AMQP for specific workflows. For example, you can run the parent transformation on a timed schedule, or abort the child transformation if sensor data exceeds a preset range.

Before You begin

Before using the AMQP Consumer step, be aware of the following conditions:

  • This step uses and requires the AMQP 0-9-1 messaging protocol.
  • You must have an AMQP 0-9-1 compatible broker (such as RabbitMQ) available before you configure this step.
  • Within a transformation, you can use the AMQP Consumer step alone to ingest messages from any AMQP producer or broker. The AMQP Producer step is not required. If you want to use both steps together (whether in the same or in a separate transformation), then some of the settings you specify in the Consumer step must match certain settings defined in the Producer step. The Tab sections below explain which settings must match.
  • Although you can set up this step to work with an existing AMQP message queue, you can also use this step to create a new AMQP message queue. For more information, see Create a new AMQP Message Queue.

General

AMQP Consumer dialog box

Enter the following information for the Step Name and Transformation fields:

OptionDescription
Step nameSpecify the unique name of the AMQP Consumer step on the canvas. You can customize the name or leave it as the default.
Transformation

Specify the child transformation to execute by performing any of the following actions:

NoteThe selected child transformation must start with the Get Records from Stream step.

If you select a transformation that has the same root path as the current transformation, the variable ${Internal.Entry.Current.Directory} is automatically inserted in place of the common root path. For example, if the current transformation's path is: /home/admin/transformation.ktr and you select a transformation in the directory /home/admin/path/sub.ktr, then the path is automatically converted to: ${Internal.Entry.Current.Directory}/path/sub.ktr

If you are working with a repository, you must specify the name of the transformation. If you are not working with a repository, you must specify the XML file name of the transformation.

Transformations previously specified by reference are automatically converted to be specified by the transformation name in the Pentaho Repository.

Create and save a new child transformation

If you do not already have a child transformation, you can create one while setting up the AMQP Consumer step. When you click the New button, a new child transformation automatically generates the required Get records from stream step in a new canvas tab. All your fields and types are customized in the child transformation's Get Records from Stream step to match the fields and types specified in the Fields tab of the parent AMQP Consumer step.

Procedure

  1. In the AMQP Consumer step, click New.

    The Save As dialog box appears.
  2. Navigate to the location where you want to save your new child transformation, then type in the file name.

  3. Click Save.

    A notification box displays informing you that the child transformation has been created and opened in a new tab. If you do not want to see this notification in the future, select the Don't show me this again check box.
  4. Click the New Transformation tab to view and edit the child transformation.

    It automatically contains the Get Records from Stream step. (Optional) You can continue to build this transformation and save it.

  5. Return to the AMQP Consumer step when finished,.

Options

The AMQP Consumer step requires you to specify options and parameters in the Setup, Security, Batch, Fields, and Result fields tabs. Each tab is described below.

Setup tab

Setup tab in AMQP Consumer

In the Setup tab, specify the connections for queue names, exchange names, exchange types, and routing keys or headers. You can create a new AMQP message queue or use an existing AMQP message queue.

Create a new AMQP Message Queue

The options and settings in the AMQP Consumer Setup tab make it possible for you to create a new AMQP message queue the first time you run the AMQP Consumer step in a transformation. The new AMQP message queue will default to the following properties:

  • Durable
  • Non auto-delete
  • Non-exclusive

When you use the AMQP Consumer to create a new queue, the broker bindings are initialized the first time you run the AMQP Consumer step in a transformation. Once you initialize the bindings, you can then start the AMQP 0-9-1 message producer before running the AMQP Consumer step. As a recommended best practice, always run the AMQP Consumer step first, before you start producing or publishing any messages through the AMQP producer.

To create a new AMQP message queue on the broker, define the Connection, Queue name and Exchange name options in the Setup tab as follows:

OptionDescription
ConnectionSpecify the URI address of the AMQP broker to which this step connects to ingest messages into PDI. For more information see: https://www.rabbitmq.com/uri-spec.html
Queue name

Specify the name of a new AMQP message queue from which this step will ingest messages.

The new queue will be created automatically the first time you run the transformation.

The new queue and its exchange attributes will default to the following properties:

  • Durable
  • Non auto-delete
  • Non-exclusive
NoteIf you specify a queue name that already exists on the broker, but the existing queue has parameter settings that differ from these, or if the specified queue has an Exchange type (below) that is different, the transformation will abort.
Exchange name

Specify either a new exchange name or an existing exchange name from which to bind the queue.

If the exchange name does not already exist, it will default to the following properties:

  • Durable
  • Non auto-delete

Leave the Exchange name blank to use the DEFAULT as the Exchange type (below) and set the Exchange type to DIRECT. The AMQP Producer step will require a matching blank entry in its Setup tab for the Exchange name.

Set up the remaining Exchange type, Routing Keys or Headers options as explained in Use an existing AMQP Message Queue.

Use an existing AMQP Message Queue

To use an AMQP message queue that already exists on the broker, define the options in the Setup tab as follows:

OptionDescription
ConnectionSpecify the URI address of the AMQP broker to which this step connects to ingest messages into PDI. For more information see: https://www.rabbitmq.com/uri-spec.html
Queue nameSpecify the name of an existing AMQP message queue name from which this step will ingest messages. The specified queue must conform to these parameters:
  • Durable
  • Non auto-delete
  • Non-exclusive
NoteIf you specify the name of a queue which does not already exist, this step will create a new queue. The new queue will conform to the same parameters. See Create a new AMQP Message Queue.
Exchange name

Specify an existing exchange name from which to bind the queue.

Leave the Exchange name blank to use the DEFAULT as the Exchange type (below) and set the Exchange type to DIRECT. The AMQP Producer step requires a matching blank entry in its Setup tab for the Exchange name.

Exchange type

Specify the type pattern this exchange is using:

DIRECT routes messages to queues based on the message routing key.

  • DEFAULT To use the DEFAULT exchange type, do not specify an Exchange name (above) and set the Exchange type to DIRECT.

FANOUT routes messages to all the queues that are bound to the fanout exchange and the routing key is ignored.

TOPIC routes messages to one or many queues based on a match between a message routing key and the pattern used to bind a queue to an exchange.

HEADERS routes messages using one or more key/value pairs that are more easily expressed as message headers than a routing key.

Routing KeysUse the Routing Keys table to specify the routing keys that define the bindings between the exchange and the queue. See Specify Routing Keys for more information.
HeadersUse the Headers table to specify the Name and Value associated with the appropriate headers. See Specify Headers for more information.

Specify Routing Keys

When using either DIRECT or TOPIC as the Exchange type, specify the appropriate routing key (or multiple keys) in the Routing Keys table. Routing keys are input as string names.

Routing Keys table in Setup           tab of AMQP Consumer
NoteIf you select DIRECT as the Exchange type, and leave the Exchange Name blank, the queue name you specified in the Queue name option is used as the routing key, regardless of whether you specify any routing keys in the table.

Once you specify the routing key configuration for the Consumer step and run the transformation, this permanently binds the routing keys and the Consumer configuration to the specified queue. Even if you subsequently remove the routing keys from this routing key table, the binding will persist in the AMQP broker. For more information on how to verify the queue's bindings, see: https://www.rabbitmq/rabbitmqctl.8.html#list_bindings

Specify Headers

When using Headers as the Exchange type , specify the Name and Value associated with the appropriate headers in the Headers table. Only string values are accepted.

Headers table

There are two options for specifying headers:

  • Match all headers

    For a message to be delivered to the AMQP Consumer step's queue, the producer message must contain all the header key/value pairs in the AMQP Consumer step.

    Be aware that the producer may have more headers than specified in the AMQP Consumer step. Producer headers must match all the specified consumer headers; however, not all the specified consumer headers must match all producer headers.

  • Match any header

    For a message to be delivered to the AMQP Consumer step's queue, at least one header key/value pair must match on both the AMQP Consumer step and the producer.

Once you specify the headers configuration for the AMQP Consumer step and run the transformation, this permanently binds the headers and the consumer configuration to the specified queue. Even if you subsequently remove the headers from this table, the binding will persist in the AMQP broker. For more information on how to verify the queue's bindings, see: https://www.rabbitmq/rabbitmqctl.8.html#list_bindings.

Security tab

Security tab

The Security tab allows you to define authentication credentials for the AMPQ broker. This tab includes the following options:

OptionDescription
UsernameSpecify the user name required to access the AMQP broker.
PasswordSpecify the password associated with the Username.
Use secure protocol

Select this option if you want to define SSL properties for the connection.

NoteThis security protocol setting is used only in PDI. It is not used on AEL Spark.
SSL Properties

Context Algorithm: Specify the name of the secure protocol you are using.

Key Store Password: Specify the password for the key store you want this connection to use.

Key Store Path: Specify the file path location of the key store you want this secure connection to use.

Key Store Type: Specify the identifying name or string for the type of key store.

Trust Store Password: Specify the password for the trust store object that you want this secure connection to use.

Trust Store Path: Specify the file path location of the trust store certificates you want this secure connection to use.

Trust Store Type: Specify the format of the trust store.

Batch tab

Batch tab in AMQP Consumer

Use this tab to designate how many messages to consume before processing. You can specify message count and/or a specific amount of time.

How many messages consumed before processing is defined by either the Duration (ms) or the Number of records option. Messages are consumed when either the specified duration or number of records occur. For example, if Duration (ms) is set to 1000 milliseconds and Number of records is 1000, messages are consumed for processing whenever time intervals of 1000 milliseconds are reached or 1000 records have been received. If you set either option to zero, PDI will ignore that parameter.

The AMQP Consumer step only supports Auto-Ack messages. When the auto-ack messages are received by the step, they are removed from the queue. If the transformation stops before all the messages are ingested, you will lose the remaining messages.

You can also specify the maximum number of batches used to collect records at the same time.

OptionDescription
Duration (ms)Specify a time in milliseconds. This value is the amount of time the step will spend collecting records prior to the execution of the transformation.

If this option set to a value of 0, then Number of records triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation.

NoteYou must set this field if you are using Spark as your processing engine.
Number of recordsSpecify a number. After every ‘X’ number of records, the specified transformation will be executed and these ‘X’ records will be passed to the transformation.

If this option set to a value of 0 then Duration triggers consumption. Either the Duration or the Number of records option must contain a value greater than 0 to run the transformation.

Maximum concurrent batchesSpecify the maximum number of batches used to collect records at the same time. The default value is 1, which indicates a single batch is used for collecting records.

This option should only be used when your consumer step cannot keep pace with the speed at which the data is streaming. Your computing environment must have adequate CPU and memory for this implementation. An error will occur if your environment cannot handle the maximum number of concurrent batches specified.

Fields tab

Fields tab in AMQP Consumer

Use this tab to define the fields in the record format.

OptionDescription
Input name

The input name is specified by the AMQP Consumer step. The following input names are assigned by default:

  • message: The individual message contained in a record.
  • queue name: The queue to which records are published and received.
  • routing key: The routing key associated with the dependent pattern type.
  • exchange name: The exchange name from which messages are received.
Output nameSpecify a substitute output name to use a field name that is different from the Input name. The data type of the output name must match the Type of the Input name.
TypeThis field identifies the data type of the Input name and cannot be changed.

Result Fields tab

Result Fields tab in AMQP Consumer

Use this tab to select the step from the child transformation that will stream records back to the parent transformation. This capability allows records processed by an AMQP Consumer step in the parent transformation to be passed downstream to any other steps included within the same parent transformation.

OptionDescription
Return fields from:Select the name of the step (from the child transformation) that will stream fields back to the parent transformation. The data values in these returned fields will be available to any subsequent downstream steps in the parent transformation.

Metadata injection support

All fields of this step support metadata injection. You can use this step with ETL metadata injection to pass metadata to your transformation at runtime.