Contact Free trial Login

Anypoint Connector for Azure Service Bus - Mule 3

Support Category: Select

Anypoint Connector for Azure Service Bus 2.0.0

Anypoint Connector for Azure Service Bus (Azure Service Bus Connector) enables message integration with Azure Service Bus on the cloud. This connector supports communication with queues and topics. In addition, you can use the built-in management API to discover Azure Service Bus objects automatically and to provision them.

Prerequisites

To use Azure Service Bus Connector, you must be familiar with Mule, Anypoint Connectors, Anypoint Studio, Mule concepts, elements in a Mule flow, and global elements.

You need login credentials to test your connection with your target resource. You can use Basic Authentication, or you can pass credentials in a connection string.

For software requirements and compatibility information, see the connector’s release notes.

Install the Connector in Anypoint Studio

  1. In Anypoint Studio, click the Exchange icon in the Studio task bar.

  2. Click Login in Anypoint Exchange.

  3. Search for the connector and click Install.

  4. Follow the prompts to install the connector.

Configure the Connector

To start configuring the connector, you first need to select and configure an input source. Azure Service Bus Connector supports three input sources:

  • HTTP connector, which receives messages from a web server. Using the HTTP connector enables you to test the connector.

  • Queue listener, which receives messages from an Azure queue.

  • Subscription listener, which receives messages from an Azure subscription.

The following steps use an HTTP connector as an in input source. For information on using the Queue listener and Subscription listener, see Input Sources.

  1. In the Mule Palette, search for HTTP.

  2. Drag the HTTP connector onto the Studio canvas.

    HTTP connector options
  3. Click + next to the Connector Configuration field to configure a global element for the connector.

  4. Accept the defaults by clicking OK.

  5. In the Mule Palette, search for Azure.

  6. Select the Azure Service Bus Connector.

    Add the Connector
  7. Drag the connector next to the HTTP connector on the canvas.

  8. Click + next to the Connector Configuration field to configure a global element for the connector.

  9. Select an authentication type:

    authentication-type
    • Basic authentication

      With Basic authentication, applications gain access to Azure Service Bus resources by providing a shared access key in the Azure Service Bus namespace. The shared access key enables user access to Azure Service Bus resources without having to store user account access keys in the application.

      basic
      Field Description

      Name

      Name of the shared access signature.

      Service namespace

      Name of the service namespace to address Service Bus resources within your application.

      Shared access key name

      Name of the access key configured in the Azure Service Bus namespace. An access key created at a lower level, such as a topic-level shared key, does not work with this option unless you disable the connectivity test at startup.

      Shared access key

      56-bit primary key used to authorize access to storage resources.

    • Connection string

      With Connection string authorization, the application passes a string that includes the information used to access Azure Service Bus resources.

      General
      Field Description

      Name

      Name of the connection string.

      Connection string

      Link provided by the Microsoft Azure portal that contains values for the Endpoint, SharedAccessKeyName, and SharedAccessKey parameters. For example:

      Endpoint=Endpoint=sb://samplenamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=kHuIoiu79jbjuNgHYJKbn7698BtjKohGuKMouGHyJkX=

      Service namespace

      Name of the service namespace to address Service Bus resources within your application.

Input Sources

Azure Service Bus Connector has two input source operations:

Queue Listener Configuration

Use the Queue listener input source when you want the app to receive messages from an Azure queue. Configure the source as follows:

Queue Source
Field Description

Queue name

Queue to receive events. To receive events from the dead-letter queue, enter QueueName/$deadletterqueue in this field. The dead-letter queue is a queue that holds messages that cannot be processed or delivered.

Receive mode

  • AUTOMATIC (Default): Message is acknowledged from the queue and then deleted from the queue after it is received. If a message is not acknowledged, it times out and goes to the end of the queue.

  • MANUAL: User must manually acknowledge messages.

Server timeout (ms)

Maximum duration, in milliseconds, within which the client keeps renewing the message lock if the processing of the message is not completed by the handler.

Prefetch count

When Prefetch is enabled in any of the official Service Bus clients, the receiver acquires additional messages, up to the PrefetchCount limit. This can be more messages than what the application initially requested. Set this field to 0 to disable prefetching.

CRON expression

(Optional) UNIX CRON expression that specifies when to trigger the receiver action. For example, setting this field to 0 0 * * * triggers the receiver action at midnight (00:00) every day.

Max. messages to receive

Maximum number of messages to receive during the scheduled operation.

Subscription Listener Configuration

Use the * Subscription listener* input source when you want the app to receive messages from an Azure subscription. Configure the source as follows:

Topic Source
Field Description

Topic

Name of the topic to connect to.

Subscription

Subscription to receive events. To receive events from the dead letter queue, specify QueueName/$deadletterqueue. The dead-letter queue is a queue that holds messages that cannot be processed or delivered.

Receive mode

  • AUTOMATIC (Default): Message is acknowledged from the queue and then deleted from the queue after it is received. If a message is not acknowledged, it times out and goes to the end of the queue.

  • MANUAL: Specifies that the user must manually acknowledge messages.

Server timeout (ms)

Maximum duration within which the client keeps renewing the message lock if message processing is not completed by the handler.

Prefetch count

When Prefetch is enabled in any of the official Service Bus clients, the receiver acquires additional messages, up to the PrefetchCount limit. This can be more messages than what the application initially requested. Set this field to 0 to disable prefetching.

CRON Expression

(Optional) UNIX CRON expression that specifies when to trigger the receiver action. For example, setting this field to 0 0 * * * triggers the receiver action at midnight (00:00) every day.

Max. messages to receive

Maximum number of messages to receive during the scheduled operation.

[][examples]] == Examples

This topic contains the following examples:

Send a Message to a Queue or Topic

This example configures a Mule app to send a message to an Azure Service Bus queue or topic. The following diagram shows the flow for this example:

Sending a message to a queue or topic
  1. Create a new Mule app in Studio.

  2. In the Mule Palette, search for HTTP.

  3. Drag the HTTP connector onto the Studio canvas.

  4. Select the HTTP connector.

  5. Click + next to the Connector Configuration field to configure a global element for the connector.

  6. Leave the defaults field values and click OK.

  7. In the Mule Palette, search for Set Payload.

  8. Drag the Set Payload transformer next to the HTTP connector on the Studio canvas.

  9. In the Value field, enter a message value for the queue.

  10. In the Mule Palette, search for Azure.

  11. Drag the Microsoft Azure Service Bus Connector next to the Set Payload transformer on the Studio canvas.

  12. Click + next to the Connector Configuration field for the Microsoft Azure Service Bus connector to configure a global element for the it. Select and configure authentication for the connector, as described in Configure the Connector.

  13. On the Microsoft Azure Bus Service properties dialog, configure the connector as follows:

    • Set the Operation value to 'Publish message' and the Destination type to QUEUE or TOPIC.

    • Enter the value for Destination name.

  14. Because the Publish message operation has no return value, use another Set Payload transformer to return a message: #["message sent"].

When you call http://localhost:8081/send, the connector uploads the message to the queue.

Receive Messages from a Queue or Topic with a Manual Acknowledge Message

This example configures a Mule app to send an acknowledgment after receiving a message from a queue or topic. The following diagram shows the flow for this example:

Sending a message to a queue or topic
  1. Create a new Mule app in Studio.

  2. Add a Microsoft Azure Service Bus connector as the flow source, and configure the connector:

    • In the Operation field, select Queue listener or Subscription listener.

    • Click + next to the Connector Configuration field to configure a global element for the connector. For more information about configuring a global element, see Configure the Connector.

    • To send messages to a queue, enter the name of the queue in the Queue name field.

    • To send messages to a topic, enter the name of the topic in the Topic field and the name of the subscription in the Subscription field.

  3. Set the Receive mode field to MANUAL.

  4. Drag a Record Variable, Session Variable, or Variable transformer next to the Microsoft Azure Service Bus on the canvas.

  5. Store the lock token value from the received message using this expression:

    #[message.inboundProperties.get('lockToken')]

  6. For testing purposes, perform an action with the message, such as logging it by using a Logger component.

  7. Add another Microsoft Azure Service Bus connector next to the variable component on the canvas, and select the Acknowledge message operation.

  8. For the Lock field value, retrieve the stored lockToken value.

The connector processes every message received and acknowledges each message that is processed successfully.

To have the connector automatically acknowledge messages, set the Receive mode field to AUTOMATIC.

Schedule the Reception of Messages

The following example configures a Mule app that receives messages from a queue or topic on a specified schedule:

Create a Mule app to receive messages from a queue or topic, as described in Send a Message to a Queue or Topic.

When you create the app:

  • Set the CRON Expression field to a CRON expression that follows the UNIX standard. For example, to connect and receive messages every day at 8 AM, use 0 8 * * *.

  • Set the Max. Messages to receive field to the maximum number of messages for the connector to receive every time the CRON expression triggers a receive.

The connector executes a batch receive every time the CRON expression commands it. Any number of available messages up to the maximum set are retrieved and processed by the flow.

Send Multiple Messages to a Queue

The following example configures a Mule app to send multiple messages to a queue:

  1. Create a Mule app to receive messages from a queue or topic, as described in Send a Message to a Queue or Topic.

  2. Replace the Publish message operation with the Publish batch of messages operation. Make sure the Destination type and Destination name are properly set.

  3. When crafting the message, be sure to create a comma-separated list of strings, because the app sends many messages simultaneously.

    To split a comma-separated string of messages separated into separate messages, drag a Transform message transformer to the canvas and use the following script:

%dw 1.0
%output application/java
payload splitBy ','

Schedule the Logical Enqueuing of a Message

The following example configures a Mule app to send a message to all listeners at a specified time:

  1. Create a Mule app to receive messages from a queue or topic, as described in Send a Message to a Queue or Topic.

  2. In the properties window for Azure Service Bus Connector, enter a value for the Scheduled enqueue time UTC" field with the date or time when you want the message to be dispatched. For example: 2019-06-27T21:16:46.866Z.

    The message is sent to the destination, which will dispatch the message at the specified time.

Supported REST Operations

Azure Service Bus Connector supports the following Azure Service Bus REST API operations:

  • Queue

    • Create

    • Get

    • Get Size

    • List

    • Update

    • Delete

  • Topic

    • Create

    • Get

    • List

    • Update

    • Delete

  • Subscription

    • Create

    • Get

    • Get Size

    • List

    • Update

    • Delete

  • Rule

    • Create

    • Get

    • List

    • Update

    • Delete

Additional Configuration Information

Queue and Subscription Listener Operations

For the Queue listener and Subscription listener operations, the output payload is a string with the message. The connector returns additional attributes in the outbound properties.

Restricted Access Policies

In situations where your resources can only send and receive amqp messages, enable the option AMQP-only credentials inside the Advanced tab.

Handling a Very High Number of Messages

If Mule processes a very high number of messages (such as thousands per second), especially if it publishes to and reads from the same destination in one Mule app, you might see messages like the following in the log:

[ERROR 2019-05-22 00:15:26,362 [ForkJoinPool.commonPool-worker-3]
com.microsoft.azure.servicebus.MessageAndSessionPump: onMessage with
message containing sequence number '95' threw exception com.mulesoft.connectors.microsoft.azure.servicebus.internal.error.exception.ConsumerException: Failed
setting attributes from original API message.]

The error can occur because Mule processes errors asynchronously by default. To fix this error, select queue-asynchronous when you configure the message listener in a flow. If a message waits for more than 30,000 ms to be processed by the flow, Mule throws an exception that causes an error (timeout in the Mule SEDA queue).

To avoid this error, create a custom queue-asynchronic configuration and do either or both of the following:

  • Increase the number of threads in the maxThreads property (default 16).

  • Increase the waiting time in the threadWaitTimeout`property (default 30,000 ms).

To modify the configuration:

  1. Select the flow.

  2. Select GeneralOptional Processing StrategyProcessing Strategy Ref.

  3. Click +.

  4. Add a new Queued Asynchronous Processing Strategy.

For more information, see Flow Processing Strategies.

Random Errors When Using a Manual ACK

You might receive errors like this one when working with a high number of messages while a using manual ACK:

[com.mulesoft.connectors.microsoft.azure.servicebus.internal.error.exception.ConsumerException: Message
with lockToken 2dc1312f-b263-4282-bbb0-566998eff6e6 could not be ACK at com.mulesoft.connectors.microsoft.azure.servicebus.internal.connection.Connection.ack(Connection.java:192)]
[ERROR 2019-05-22 00:17:30,822 [ReactorThread6f355ff5-5a36-487b-bb70-1a995a6ddf74]
com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Completing pending
updateState operation for delivery '?     ? &??I??=????' with exception com.microsoft.azure.servicebus.primitives.MessageLockLostException: The lock supplied
is invalid. Either the lock expired, or the message has already been removed from the queue.]

When the connector performs a prefetch, the lockToken’s validity time is fixed in relation to that moment. The problem arises when the lockToken’s validity time is not long enough to process the entire batch of records. In these cases, Mule might throw an error because the lockToken expired before you do an ACK.

To prevent this issue, reduce the size of the prefetch (default 100), and increase the validity time of the lock token, or both. You can do this in the lock duration property when creating the queue or subscription from the connector or from the Azure portal (for existing queues). The maximum duration value for the lock token in Azure is 5 minutes (300 seconds).

Reduce Noise in Mule Apps Logs

In some circumstances, the underlying library used by the connector might regularly log complete stack traces with level WARN. These messages do not represent an issue, but they can clutter the logs. To reduce the noise in the logs, make either of the following changes to the src/main/resources/log4j2.xml file:

  • Specify an AsyncLogger for the library’s package, raising the level of the log:

<!-- Recommended for packages not belonging to Mule -->
<AsyncLogger name="com.microsoft.azure.servicebus" level="ERROR"/>
  • Add a RegexFilter to the existing appender (that is, RollingFile):

<!-- Avoids the log of messages that contain the specified regexp -->
<RegexFilter regex = ".message to filter.*" onMatch="DENY" onMismatch="ACCEPT"/>
If the message is multiline, RegexFilter might not work correctly.

Listeners (Sources) in Network Disconnection and Reconnection Scenarios

When there are network connection problems, you can direct the source components for the Azure Service Bus connector to automatically execute reconnection attempts by specifying a reconnection strategy. You can specify a reconnection strategy when you create or edit the connector configuration:

  1. In the General tab of the connector configuration, select + next to the Connector Configuration field.

  2. Select one of the Connector Configuration options.

  3. In the Reconnection tab, select Standard Reconnection.

  4. Optionally, specify a nondefault value for the Frequency (ms) and Reconnection Attempts fields.

Eventual Send Operation Timed Out

When there are micro-cuts in the network connection, asynchronous message-sending flows might experience exceptions and message losses. This happens because Azure TimeoutException exceptions (com.microsoft.azure.servicebus.primitives.TimeoutException) are not affected by connector reconnection strategies. Therefore, if an exception is generated within a thread pool that executes calls asynchronously, the exception does not use a catch exception strategy. To implement this strategy, you must use XML to program a solution in the Mule app flow.

The following example shows one way of handling send operation timeouts. In this example:

  • The app assumes that a topic is used as a destination and that you have permission to add a redundant subscription to the topic.

  • The app uses Anypoint Connector for MongoDB (MongoDB Connector) to collect unsent messages.

  • Every 15 minutes, the cron job (Poll scope) sends messages held for more than an hour in the MongoDB collection of unsent messages back to the Azure Service Bus.

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:microsoft-azure-service-bus="http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus" xmlns:mongo="http://www.mulesoft.org/schema/mule/mongo"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/mongo
http://www.mulesoft.org/schema/mule/mongo/current/mule-mongo.xsd
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
    <http:listener-config
        name="HTTP_Listener_Configuration"
        host="0.0.0.0" port="8081"
        doc:name="HTTP Listener Configuration"/>
    <mongo:config
        name="Mongo_DB__Configuration"
        username="${mongodb.config.username}"
        password="${mongodb.config.password}"
        database="${mongodb.config.database}"
        host="${mongodb.config.server}"
        authenticationDatabase="${mongodb.config.database.auth}"
        doc:name="Mongo DB: Configuration"/>
    <microsoft-azure-service-bus:basic-authentication-config
        name="Microsoft_Azure_Service_Bus__Basic_authentication"
        sharedAccessKeyName="${azure.config.key.name}"
        sharedAccessKey="${azure.config.shared.access.key}"
        namespace="${azure.config.service.namespace}"
        doc:name="Microsoft Azure Service Bus: Basic authentication"/>
    <flow name="Flow_Send_Async">
        <http:listener config-ref="HTTP_Listener_Configuration"
            path="/send-async"
            doc:name="HTTP"/>
        <byte-array-to-string-transformer
            doc:name="Byte Array to String"/>
        <set-variable
            variableName="azure_message"
                value="#[payload]"
                doc:name="Set Payload in Variable"/>
        <dw:transform-message
            doc:name="Format Document - Transform Message">
            <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
	message : flowVars.azure_message,
	timestamp: now as :string
}]]></dw:set-payload>
        </dw:transform-message>
        <mongo:insert-document
            config-ref="Mongo_DB__Configuration"
            collection="${mongodb.collection}"
            doc:name="Insert Document - Mongo DB"/>
        <microsoft-azure-service-bus:publish
            config-ref="Microsoft_Azure_Service_Bus__Basic_authentication"
            destination="TOPIC||${azure.topic}" body="#[flowVars.azure_message]"
            doc:name="Microsoft Azure Service Bus"/>
        <dw:transform-message doc:name="Transform Message">
            <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
	id_azure: payload,
	message_azure: flowVars.azure_message
}]]></dw:set-payload>
        </dw:transform-message>
    </flow>
    <!-- Messages sent successfully are removed from MongoDB. -->
    <!-- Only messages not received by Azure should remain, to retry later. -->
    <flow name="Flow_Remove_Sent_Messages">
        <microsoft-azure-service-bus:subscription-listener
            config-ref="Microsoft_Azure_Service_Bus__Basic_authentication"
            topic="${azure.topic}"
            subscription="${azure.topic.suscription.redundant}"
            doc:name="Microsoft Azure Service Bus (Streaming)"/>
        <byte-array-to-string-transformer doc:name="Byte Array to String"/>
        <dw:transform-message doc:name="Transform Message">
            <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
	message : payload
}]]></dw:set-payload>
        </dw:transform-message>
        <mongo:remove-documents
            config-ref="Mongo_DB__Configuration"
            collection="${mongodb.collection}"
            doc:name="Mongo DB"/>
        <logger
            message="#[&quot;Document sent successfully. The Mongo document is removed (from retry collection):&quot; + payload]"
            level="INFO" doc:name="Logger"/>
    </flow>
    <!-- Messages that are in MongoDB and are still alive after 1 hour are -->
    <!-- sent again (a time is expected to avoid a race condition with the removal process). -->
    <flow name="Flow_Retry_Send_Message">
        <poll doc:name="Poll - Retry">
            <fixed-frequency-scheduler frequency="15" timeUnit="MINUTES"/>
            <dw:transform-message doc:name="Transform Message">
                <dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
}]]></dw:set-payload>
            </dw:transform-message>
        </poll>
        <mongo:find-documents
            config-ref="Mongo_DB__Configuration"
            collection="${mongodb.collection}"
            doc:name="Mongo DB"/>
        <foreach collection="#[payload]" doc:name="For Each">
            <choice doc:name="Choice">
	            <when expression="#[new org.mule.el.datetime.DateTime(payload.timestamp).plusHours(1).isBefore(server.dateTime)]">
	                <microsoft-azure-service-bus:publish
                        config-ref="Microsoft_Azure_Service_Bus__Basic_authentication"
                        destination="TOPIC||${azure.topic}"
                        body="#[payload.message]"
                        doc:name="Microsoft Azure Service Bus"/>
	            </when>
	            <otherwise>
	                <logger message="#[&quot;The message shouldn't be resent yet: &quot; + payload]"
                        level="INFO"
                        doc:name="Logger"/>
	            </otherwise>
	        </choice>
        </foreach>
    </flow>
</mule>

Connector Namespace and Schema

When designing your application in Anypoint Studio, dragging the connector from the Mule Palette view onto the Anypoint Studio canvas automatically populates the XML code with the connector namespace and schema location:

  • Namespace: http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus

  • Schema location: http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd

If you are manually coding the Mule application in the Studio XML editor or another text editor, paste these statements into the header of your configuration XML, inside the <mule> tag:

<mule
...
xmlns:microsoft-azure-service-bus="http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus"
...
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
...
...">