Contact Free trial Login

Amazon Kinesis Data Streams Connector 1.0 Examples - Mule 4

The following example shows how to use Anypoint Connector for Amazon Kinesis Data Streams (Amazon Kinesis Data Streams Connector) to put data records and listen from an Amazon Kinesis data stream.

Before you try the example, access Anypoint Studio (Studio) and verify that the Mule Palette view displays entries for Kinesis. If not, follow the instructions in Add the Connector to Your Project.

Flows for This Example

The following screenshots show the Anypoint Studio app flows for this example:

  • This flow uses the Put Records operation to put data into the Amazon Kinesis data stream. In this flow, HTTP > Listener receives a query parameter named data that sets the payload for the Put Record operation. It uses a query parameter named partitionKey to set the partition key value for the Put Record operation.

    Put Record operation flow
  • This flow uses the Listener source to listen for new data records:

    Listener source flow
  • This flow uses the Listener source to listen for new data records. After receiving the new records, it calls the Checkpoint operation to set a manual checkpoint:

    Checkpoint operation flow

XML Code for This Example

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:os="http://www.mulesoft.org/schema/mule/os" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns:kinesis="http://www.mulesoft.org/schema/mule/kinesis" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kinesis http://www.mulesoft.org/schema/mule/kinesis/current/mule-kinesis.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">

    <configuration-properties doc:name="Configuration properties" file="mule-artifact.properties" />

    <kinesis:config name="Kinesis_config" doc:name="Kinesis config">
        <kinesis:connection accessKey="${config.accessKey}" secretKey="${config.secretKey}" />
    </kinesis:config>
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>

    <flow name="kinesis-demo-put-record" >
        <http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/putrecord"/>
        <kinesis:put-record doc:name="Put Record"  config-ref="Kinesis_config" streamName="${streamName}"
                            partitionKey="#[attributes.queryParams.partitionKey]">
            <kinesis:data ><![CDATA[#[attributes.queryParams.data]]]></kinesis:data>
        </kinesis:put-record>
        <ee:transform doc:name="Transform Message" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" message="Sent message: #[payload]"/>
    </flow>

    <!-- Stop the following flow and enable/start the other one in order to run it -->
    <flow name="kinesis-demo-listener-latest">
        <kinesis:listener doc:name="Listener" config-ref="Kinesis_config"
                          streamName="${streamName}" applicationName="${applicationName}"
                          absolutePosition="LATEST"/>
        <ee:transform doc:name="Transform Message"  >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" message="Received message:  #[payload]" />
    </flow>

    <flow name="kinesis-demo-checkpoint" initialState="stopped">
        <kinesis:listener doc:name="Listener" config-ref="Kinesis_config"
                          streamName="${streamName}" applicationName="${applicationName}"
                          absolutePosition="LATEST" checkpointOnComplete="false"/>
        <ee:transform doc:name="Transform Message" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" message="Received message:  #[payload]" />
        <kinesis:checkpoint doc:name="Checkpoint" config-ref="Kinesis_config" applicationName="${applicationName}" streamName="${streamName}"/>
    </flow>

</mule>