Amazon Kinesis Data Streams Connector 1.0 の例

次の例は、Amazon Kinesis Data Streams 用 Anypoint Connector (Amazon Kinesis Data Streams コネクタ) を使用して、データレコードを配置し、Amazon Kinesis データストリームからリスンする方法を示しています。

Amazon Kinesis Data Streams Connector は、デフォルトで 拡張ファンアウト (EFO)​ をサポートしています。これは、コネクタが EFO を使用して設定済みストリームのすべてのシャードに自動的にサブスクライブすることを意味します。

コネクタの動作は、AWS Kinesis Client Library (KCL) バージョン 2.0 以降に準拠しています。コネクタは、 「Java で KCL 2.x を使用して拡張ファンアウトコンシューマーを開発する」​に示されている実装を使用します。

この例を試す前に、Anypoint Studio (Studio) にアクセスし、​[Mule Palette (Mule パレット)]​ ビューに Kinesis のエントリが表示されていることを確認します。表示されていない場合、​「プロジェクトへのコネクタの追加」​の指示に従います。

この例のフロー

次のスクリーンショットは、この例の Anypoint Studio アプリケーションフローを示しています。

  • このフローは、​Put Records​ 操作を使用して、Amazon Kinesis データストリームにデータを配置します。このフローでは、​[HTTP] > [Listener]​ は、​Put Record​ 操作のペイロードを設定する ​data​ という名前のクエリパラメーターを受け取ります。​partitionKey​ という名前のクエリパラメーターを使用して、​Put Record​ 操作のパーティションキー値を設定します。

    Put Record 操作フロー
  • このフローは、​Listener​ ソースを使用して新しいデータレコードをリスンします。

    Listener ソースフロー
  • このフローは、​Listener​ ソースを使用して新しいデータレコードをリスンします。新しいレコードを受信したら、​Checkpoint​ 操作をコールして手動チェックポイントを設定します。

    Checkpoint 操作フロー

この例の XML コード

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:os="http://www.mulesoft.org/schema/mule/os" xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns:kinesis="http://www.mulesoft.org/schema/mule/kinesis" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/kinesis http://www.mulesoft.org/schema/mule/kinesis/current/mule-kinesis.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/os http://www.mulesoft.org/schema/mule/os/current/mule-os.xsd">

    <configuration-properties doc:name="Configuration properties" file="mule-artifact.properties" />

    <kinesis:config name="Kinesis_config" doc:name="Kinesis config">
        <kinesis:connection accessKey="${config.accessKey}" secretKey="${config.secretKey}" />
    </kinesis:config>
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>

    <flow name="kinesis-demo-put-record" >
        <http:listener doc:name="Listener" config-ref="HTTP_Listener_config" path="/putrecord"/>
        <kinesis:put-record doc:name="Put Record"  config-ref="Kinesis_config" streamName="${streamName}"
                            partitionKey="#[attributes.queryParams.partitionKey]">
            <kinesis:data ><![CDATA[#[attributes.queryParams.data]]]></kinesis:data>
        </kinesis:put-record>
        <ee:transform doc:name="Transform Message" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" message="Sent message: #[payload]"/>
    </flow>

    <!-- Stop the following flow and enable/start the other one in order to run it -->
    <flow name="kinesis-demo-listener-latest">
        <kinesis:listener doc:name="Listener" config-ref="Kinesis_config"
                          streamName="${streamName}" applicationName="${applicationName}"
                          absolutePosition="LATEST"/>
        <ee:transform doc:name="Transform Message"  >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" message="Received message:  #[payload]" />
    </flow>

    <flow name="kinesis-demo-checkpoint" initialState="stopped">
        <kinesis:listener doc:name="Listener" config-ref="Kinesis_config"
                          streamName="${streamName}" applicationName="${applicationName}"
                          absolutePosition="LATEST" checkpointOnComplete="false"/>
        <ee:transform doc:name="Transform Message" >
            <ee:message >
                <ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
            </ee:message>
        </ee:transform>
        <logger level="INFO" doc:name="Logger" message="Received message:  #[payload]" />
        <kinesis:checkpoint doc:name="Checkpoint" config-ref="Kinesis_config" applicationName="${applicationName}" streamName="${streamName}"/>
    </flow>

</mule>