Send and Receive Large Messages Example - Mule 4
This example of sending and receiving large messages shows two flows:
-
The first flow sends a large message to the Amazon queue that references the AWS S3 bucket to use for storing large message payloads. Creating the flow involves creating a new Mule project and configuring the following components and operations:
-
HTTP > Listener component to initiate the flow
-
File > Read component to read the file
-
Transform Message component to convert the input data to a new output structure or format
-
Amazon SQS > Send Message operation to send the message to the specified queue
-
Logger component to display the response in the Mule console
-
Amazon SQS > Get approximate number of messages operation to retrieve an approximate number of visible messages for the queue
-
A second Logger component to display the approximate number of visible message for the queue
-
-
After you complete the flow to send a large message to the Amazon queue that references the AWS S3 bucket to use for storing large message payloads, you next create a flow to receive the message and log the message body.
Enabling this feature incurs additional charges associated with using AWS S3. |
This example uses variables for some field values. You can either replace the variables with their values in the code, or you can provide the values for each variable in the src/main/resources/mule-artifact.properties
file.
Before You Begin
You must have:
-
Access to the Amazon SQS target resource and Anypoint Platform
-
A file containing a payload larger than 256 KB
-
An AWS S3 bucket created on AWS S3 to use for storing the large payload messages
-
AWS Identity and Access Management (IAM) credentials
Send a Large Message
Create a Mule Project
In Studio, create a new Mule project in which to add and configure the connector:
-
In Studio, select File > New > Mule Project.
-
Enter a name for your Mule project and click Finish.
Add and Configure a Listener
-
In the Mule Palette view, search for HTTP and select the Listener operation:
-
Drag the Listener operation onto the canvas.
-
In the Listener configuration, click + next to the Connector configuration field to add a global element.
-
Accept the defaults.
-
Set the Path field to
/largePayload
:
Add and Configure a File Read Component
Add and configure a File > Read component to read the large payload:
-
In the Mule Palette view, search for File and select the File Read operation:
-
Drag the File Read component onto the canvas, to the right of the Listener component.
-
In the File Read configuration, click + next to the File Configuration field to add a global element.
-
In the File Config window, select the Connection checkbox.
-
In Working Directory, enter the directory that will serve as the root directory for every relative path used with this connector.
The default is your user home directory.In this example, the file is read from
src/main/resources
of the Mule application: -
Configure the following fields in the General properties window:
-
Display Name
Name to display in the UI for the connector operation -
Connector Configuration
Global configuration you previously created for the File > Read operation -
File Path
Path to the file to readThe following image shows example values for the fields:
-
Add and Configure a Transform Message Component
Add the Transform Message component to which to attach the metadata:
-
In the Mule Palette view, search for Transform Message:
-
Drag the Transform Message component onto the canvas, to the right of the Read component.
-
In the Transform Message configuration, overlay the brackets in the Output section with this XML:
{ delaySeconds: 0, body: payload, messageAttributes: { "AccountId": { "stringValue" : "000123456", "dataType" : "String.AccountId" } as Object { class: "org.mule.extension.sqs.api.model.MessageAttributeValue" }, "NumberId": { "stringValue" : "230.000000000000000001", "dataType" : "Number" } as Object { class : "org.mule.extension.sqs.api.model.MessageAttributeValue" } } as Object { class: "java.util.HashMap" } } as Object { class: "org.mule.extension.sqs.api.model.Message" }
The following screenshot shows the XML as it appears in the Output section of Studio:
Add and Configure the SQS Send Message Operation
-
In the Mule Palette view, search for Amazon SQS and select the Send message operation:
-
Drag the Send message operation onto the canvas, to the right of the Transform Message component.
-
In the Send message configuration, click + next to the Connector configuration field to add a global element.
-
Configure the global element for Send message:
-
Name
Name used to reference the configuration. This example usesAmazon_SQS_Large_Payload_Configuration
. -
Session Token
Session token used to validate the temporary security credentials. -
Access Key
Alphanumeric text string that uniquely identifies the user who owns the account. -
Secret Key
Key that acts as a password. -
Region Endpoint
Queue region. -
Default Global Queue URL
Default Amazon SQS queue URL credentials.The following image shows example values for the Send message global elements:
-
-
In the Connection section, click the Advanced tab.
-
In the Large Payload Support field, choose either:
-
Expression or Bean Reference
-
Edit Inline
-
Configure the fields as follows:
-
-
Bucket
Name of the AWS S3 bucket to use for storing large message payloads. The bucket must already be created and configured in AWS S3. Enabling this feature incurs additional charges for using AWS S3. -
Message Size Threshold
The message size threshold for storing message payloads in the AWS S3 bucket. The default value for message size threshold is 256 KB and the maximum threshold size value is 256KB. The maximum message size is 2 GB. -
Message Size Threshold Unit
Data unit for the message size threshold.The following image shows example values for Large payload support fields.
-
-
At the base of the Studio canvas, click Configuration XML to view the corresponding XML:
<sqs:config name="Amazon_SQS_Large_Configuration" doc:name="Amazon SQS Large Configuration" defaultQueueUrl="${sqs.queueUrl}"> <sqs:basic-connection accessKey="${sqs.accessKey}" secretKey="${sqs.secretKey}" region="us-east-1" sessionToken="${sqs.sessionToken}"> <sqs:large-payload-support bucket="large-sqs-payload-bucket" messageSizeThreshold="256" messageSizeThresholdUnit="KB" /> </sqs:basic-connection> </sqs:config>
-
Configure the following fields in the properties window:
-
Display Name
Name to display in the UI for the connector operation -
Connector Configuration
Global configuration you previously created for the Send message operation namedAmazon_SQS_Large_Payload_Configuration
-
Message
payload
-
Queue URL
Amazon SQS queue URL. If provided, the value of this field takes precedence over the value of the Default Global Queue URL field on the Global Configuration Elements window.The following image shows the General properties configuration fields:
-
Add and Configure a Logger Component
Add and configure a Logger component to display the message response in the Mule console:
-
In the Mule Palette view, search for Logger.
-
Drag the Logger component onto the canvas, to the right of the Send Message component.
-
Configure the following fields:
-
Display Name
Name for the Logger component -
Message
String or DataWeave expression that specifies the Mule log message -
Level
Configures the logging level. The default isINFO
.The following image shows example values for the fields:
-
Obtain the Number of Messages in the Queue
Configure a second Logger component in the flow to obtain the approximate number of messages in the queue:
-
In the Mule Palette view, search for Amazon SQS.
-
Select the Get approximate number of messages operation and drag it onto the canvas, to the right of the Logger component.
-
Configure the Amazon Queue url field, for example:
-
Configure the following fields:
-
Display Name
Name for the Logger component -
Message
String or DataWeave expression that specifies the Mule log message -
Level
Configures the logging level. The default isINFO
.The following image shows example values for the fields:
-
Create a Flow to Receive Messages
Finish this example by creating another flow to receive messages and log them before they are deleted from the queue.
-
In the Mule Palette view, search for SQS and select the Receive messages operation:
-
Drag the Receive messages operation onto the canvas.
-
Configure the following fields in the General properties window:
-
Display Name
Name that displays for the connector operation -
Connector Configuration
Global configuration you created previously. For this example, useAmazon_SQS_Large_Payload_Configuration
. -
Number of Messages
Number of messages to receive. For this example, it is10
. -
Queue url
Amazon SQS queue URL. If provided, the value of this field takes precedence over the value of the Default Global Queue URL field on the Global Configuration Elements window.
-
-
Add a Logger component to display the message in the Mule console.
-
Configure the Logger with these field values:
-
Display Name
Name for the Logger component -
Message
String or DataWeave expression that specifies the Mule log message -
Level
Configures the logging level. The default isINFO
.
-
Example Mule Application XML Code
Paste this code into your XML editor to load the flow for this example use case into your Mule application. If needed, change the values to reflect your environment.
<mule xmlns:sqs="http://www.mulesoft.org/schema/mule/sqs" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:file="http://www.mulesoft.org/schema/mule/file"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core
http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/file
http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
http://www.mulesoft.org/schema/mule/sqs
http://www.mulesoft.org/schema/mule/sqs/current/mule-sqs.xsd">
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<sqs:config name="Amazon_SQS_Configuration" doc:name="Amazon SQS Configuration"
defaultQueueUrl="${sqs.queueUrl}" >
<sqs:basic-connection accessKey="${sqs.accessKey}" secretKey="$sqs.secretKey}" region="us-east-1" />
</sqs:config>
<sqs:config name="Amazon_SQS_Large_Payload_Configuration" doc:name="Amazon SQS Configuration"
defaultQueueUrl="${sqs.queueUrl}" >
<sqs:basic-connection accessKey="${sqs.accessKey}" secretKey="$sqs.secretKey}" region="us-east-1" >
<sqs:large-payload-support bucket="large-sqs-payload-bucket" messageSizeThreshold="256" messageSizeThresholdUnit="KB" />
</sqs:basic-connection>
</sqs:config>
<file:config name="File_Config" doc:name="File Config" >
<file:connection workingDir="${app.home}" />
</file:config>
<flow name="sqs-send-LargeMessageFlow" >
<http:listener doc:name="Listener"
config-ref="HTTP_Listener_config"
path="/largePayload"/>
<file:read doc:name="Read" config-ref="File_Config" path="largePayload.txt"/>
<ee:transform doc:name="Transform Message" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
{
delaySeconds: 0,
body: payload,
messageAttributes: {
"AccountId": {
"stringValue" : "000123456",
"dataType" : "String.AccountId"
} as Object {
class: "org.mule.extension.sqs.api.model.MessageAttributeValue"
},
"NumberId": {
"stringValue" : "230.000000000000000001",
"dataType" : "Number"
} as Object {
class : "org.mule.extension.sqs.api.model.MessageAttributeValue"
}
} as Object {
class: "java.util.HashMap"
}
} as Object {
class: "org.mule.extension.sqs.api.model.Message"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<sqs:send-message doc:name="Send message" configref="Amazon_SQS_Large_Payload_Configuration" config-ref="Amazon_SQS_Large_Payload_Configuration"/>
<logger level="INFO"
doc:name="Log Response"
message="payload"/>
<sqs:get-approximate-number-of-messages
doc:name="Get approximate number of messages"
config-ref="Amazon_SQS_Large_Payload_Configuration"
queueUrl="${sqs.queueUrl}"/>
<logger level="INFO" doc:name="Log Count"
message="Sent Message: `#[payload]`"/>
</flow>
<flow name="sqs-receive-large-message-flow" >
<sqs:receivemessages doc:name="Receive messages"
config-ref="Amazon_SQS_Large_Payload_Configuration"/>
<logger level="INFO" doc:name="Log Receipt" />
</flow>
</mule>