Flex Gateway新着情報
Governance新着情報
Monitoring API Manager次の例は、Amazon Kinesis Data Streams 用 Anypoint Connector (Amazon Kinesis Data Streams コネクタ) を使用して、データレコードを配置し、Amazon Kinesis データストリームからリスンする方法を示しています。
この例を試す前に、Anypoint Studio (Studio) にアクセスし、[Mule Palette (Mule パレット)] ビューに Kinesis のエントリが表示されていることを確認します。表示されていない場合、「プロジェクトへのコネクタの追加」の指示に従います。
次のスクリーンショットは、この例の Anypoint Studio アプリケーションフローを示しています。
このフローは、Put Records 操作を使用して、Amazon Kinesis データストリームにデータを配置します。このフローでは、[HTTP] > [Listener] は、Put Record 操作のペイロードを設定する data という名前のクエリパラメーターを受け取ります。partitionKey という名前のクエリパラメーターを使用して、Put Record 操作のパーティションキー値を設定します。
このフローは、Listener ソースを使用して新しいデータレコードをリスンします。
このフローは、Listener ソースを使用して新しいデータレコードをリスンします。新しいレコードを受信したら、Checkpoint 操作をコールして手動チェックポイントを設定します。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:os="http://www.mulesoft.org/schema/mule/os" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:kinesis="http://www.mulesoft.org/schema/mule/kinesis" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kinesis http://www.mulesoft.org/schema/mule/kinesis/current/mule-kinesis.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">
<configuration-properties doc:name="Configuration properties" file="mule-artifact.properties" />
<kinesis:config name="Kinesis_config" doc:name="Kinesis config">
<kinesis:connection accessKey="${config.accessKey}" secretKey="${config.secretKey}" />
</kinesis:config>
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<flow name="kinesis-demo-put-record" >
<http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/putrecord"/>
<kinesis:put-record doc:name="Put Record" config-ref="Kinesis_config" streamName="${streamName}"
partitionKey="#[attributes.queryParams.partitionKey]">
<kinesis:data ><![CDATA[#[attributes.queryParams.data]]]></kinesis:data>
</kinesis:put-record>
<ee:transform doc:name="Transform Message" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" message="Sent message: #[payload]"/>
</flow>
<!-- Stop the following flow and enable/start the other one in order to run it -->
<flow name="kinesis-demo-listener-latest">
<kinesis:listener doc:name="Listener" config-ref="Kinesis_config"
streamName="${streamName}" applicationName="${applicationName}"
absolutePosition="LATEST"/>
<ee:transform doc:name="Transform Message" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" message="Received message: #[payload]" />
</flow>
<flow name="kinesis-demo-checkpoint" initialState="stopped">
<kinesis:listener doc:name="Listener" config-ref="Kinesis_config"
streamName="${streamName}" applicationName="${applicationName}"
absolutePosition="LATEST" checkpointOnComplete="false"/>
<ee:transform doc:name="Transform Message" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" message="Received message: #[payload]" />
<kinesis:checkpoint doc:name="Checkpoint" config-ref="Kinesis_config" applicationName="${applicationName}" streamName="${streamName}"/>
</flow>
</mule>
xml