Amazon SQS コネクタ

Select

Amazon SQS 用 Anypoint コネクタを使用すると、Amazon Simple Queue Service API に簡単に接続できます。これにより、Mule ユーザは API を直接処理することなく SQS キューサービスを管理できます。

Amazon Simple Queue Service (Amazon SQS) では、コンピュータ間を移動するメッセージを保存する信頼性と拡張性の高いキューがホストされます。Amazon SQS を使用すると、開発者は分散型アプリケーションコンポーネント間でデータを簡単に移動できます。このとき、メッセージは失われず、各コンポーネントを常に使用できるようにしておく必要もありません。Amazon SQS は、Amazon Elastic Compute Cloud (Amazon EC2) や他の AWS インフラストラクチャ Web サービスと緊密に連動するため、自動化されたワークフローを簡単に構築できます。

AWS SDK for Java は、AWS インフラストラクチャサービスの Java API を提供します。Amazon SQS コネクタは、SDK for Java を使用して構築されています。

Amazon SQS コネクタでは、「標準キュー」と「FIFO キュー」の 2 種類のキューがサポートされています。標準キューは高スループットですが、まれにメッセージの複数のコピーが配信されたり、送信時とは異なる順序で配信されたりすることがあります。FIFO キューの場合、スループットは制限されますが、メッセージは送信時とまったく同じ順序で配信されます。

前提条件

このドキュメントは、読者が Mule、Anypoint コネクタ、Anypoint Studio の基礎、Mule フローの要素、グローバル要素に精通していることを前提としています。

コネクタは、Anypoint Studio にプリバンドルされているため、次の作業を行う必要があります。

  • Anypoint Studio をインストールする。

  • Amazon Web Services にサインアップする。IAM 形式のログイン情報 (コネクタを使用して AWS にアクセスするために必要)

互換性

Amazon SQS コネクタ 5.0 は、以下と互換性があります。

アプリケーション/サービス バージョン

Mule Runtime

4.0.x 以降

AWS SDK for Java

1.11.79

引数のパッケージ名と大部分の操作の戻り値のデータ型は、このバージョンのコネクタで更新されています。コネクタの使用方法に応じて、更新されたパッケージを参照します。たとえば、Send Message 操作のメッセージ属性入力引数では Map<String, MessageAttributeValue> オブジェクトが想定されるため、com.amazonaws.services.sqs.model.MessageAttributeValue ではなく org.mule.extension.sqs.api.model.MessageAttributeValue パッケージを参照する必要があります。

コネクタのすべての操作が機能するように、現在の SQS ポリシーの Amazon SQS アクションのリスト全体のサブセットを新しいアクションで更新して、キューのアクションにアクセスする AWS アカウントを指定する必要があります。

sqs:GetQueueAttributes アクションはグローバル設定の [Test Connection (接続をテスト)] で使用されるため、このアクションが Amazon SQS ポリシーのテスト中のキューで有効になっていることを確認します。

FIFO キューの作成

FIFO キューを作成するには、コネクタで「create queue」操作を使用して、2 つの属性を追加する必要があります。これらの 2 つの属性は FifoQueue と ContentBasedDeduplication で、どちらも true に設定します。キュー名は .fifo サフィックスで終わる必要があります。たとえば、「MyTestFIFOQueue.fifo」のようになります。

Amazon では、FIFO キューのリージョンとして米国東部 (オハイオ) または米国西部 (オレゴン) のみがサポートされているため、FIFO キューを作成するときにこの 2 つのリージョンのいずれかを選択する必要があります。

SQS コネクタを使用してメッセージを FIFO キューに送信するときは、FIFO キューの作成時に使用したキュー URL とリージョン名を指定する必要があります。また、メッセージの送信時にコネクタ設定の [message group id (メッセージグループ ID)] 属性に値を指定する必要もあります。

このコネクタのインストール方法

  1. Anypoint Studio で、Studio タスクバーの Exchange アイコンをクリックします。

  2. Anypoint Exchange で [Login (ログイン)] をクリックします。

  3. コネクタを検索して [Install (インストール)] をクリックします。

  4. 画面の指示に従ってコネクタをインストールします。

Studio の更新がある場合、右下隅にメッセージが表示されます。メッセージをクリックすると、更新をインストールできます。

POM ファイルの更新

新しいバージョンのコネクタがリリースされたら、POM 連動関係を Mule アプリケーションの POM.xml ファイルに追加する必要があります。次に例を示します。

<dependency>
    <groupId>org.mule.connectors</groupId>
    <artifactId>mule-sqs-connector</artifactId>
    <version>5.0.0</version>
    <classifier>mule-plugin</classifier>
</dependency>

新しいプロジェクトを作成する

  1. Studio で、[File (ファイル)] > [New (新規)] > [Mule Project (Mule プロジェクト)] をクリックします。

  2. 新しいプロジェクトの名前を入力し、残りのオプションをデフォルト値のままにします。

    amazon sqs new project
  3. Git を使用する場合、[Create a default .gitignore file (デフォルトの gitignore ファイルを作成)] を選択し (Studio プロジェクトの場合は無視)、[Next (次へ)] をクリックします。

  4. [Finish (完了)] をクリックして、プロジェクトを作成します。

Amazon SQS コネクタのグローバル要素を設定する

Mule アプリケーションで Amazon SQS コネクタを使用するには、アプリケーションのすべての Amazon SQS コネクタで使用できるグローバル要素を設定します。

グローバル Amazon SQS コネクタ設定を作成する手順は、次のとおりです。

  1. キャンバスの下部にある [Global Elements (グローバル要素)] タブをクリックします。

  2. [Global Configuration Elements (グローバル設定要素)] 画面で、[Create (作成)] をクリックします。

  3. [Choose Global Type (グローバル種別の選択)] ウィザードで、[Connector Configuration (コネクタ設定)] を展開し、[Amazon SQS: Configuration (Amazon SQS: 設定)] を選択します。

    amazon sqs global type
  4. [OK] をクリックします。

  5. グローバル要素のプロパティを入力します。

    amazon sqs config pic
    項目 説明

    Access Key (アクセスキー)

    アカウントを所有するユーザを一意に識別する英数字のテキスト文字列。

    Secret Key (シークレットキー)

    パスワードの役割を果たすキー。

    Try AWS Credentials Provider Chain (AWS ログイン情報プロバイダチェーンを試す)

    一時ログイン情報を使用する必要があるかどうかを制御するドロップダウン。

    Queue Name (キュー名)

    デフォルトのキュー名。存在しない場合、Mule は自動的にキューを作成します。

    Queue URL (キュー URL)

    アクションを実行する Amazon SQS キューの URL。

    Region Endpoint (リージョンエンドポイント)

    要求を処理するリージョンエンドポイント。

    グローバル要素で [Queue Name (キュー名)] が指定されている場合、コネクタは自動的にキューを作成し、このキューの URL を [Queue URL (キュー URL)] として設定します。グローバル要素を参照するすべての Amazon SQS メッセージプロセッサは、この [Queue URL (キュー URL)] を使用してを操作を実行します。

フローの特定のメッセージプロセッサで別の [Queue URL (キュー URL)] を参照する必要がある場合、メッセージプロセッサの [Queue URL (キュー URL)] 属性を使用して操作を実行できます。

+ . [Proxy (プロキシ)] タブと [Advanced (詳細)] タブは、デフォルトエントリのままにします。 . [Test Connection (接続をテスト)] をクリックして、グローバル設定のパラメータが正しいことと、Mule から Amazon SQS のインスタンスに正常に接続できることを確認します。 . [OK] をクリックして、グローバルコネクタ設定を保存します。

コネクタの使用

Amazon SQS コネクタは、操作ベースのコネクタです。つまり、コネクタをフローに追加するときに、コネクタで実行させる特定の操作を設定する必要があります。Amazon SQS コネクタは次の操作をサポートしています。

  • Add Permission

  • Change message visibility

  • Change message visibility batch

  • Create queue

  • Delete message

  • Delete message batch

  • Delete queue

  • Get approximate number of messages

  • Get queue attributes

  • Get queue URL

  • List dead letter source queues

  • List queues

  • Purge Queue

  • Read (Receive Messages)

  • Remove permission

  • Send message batch

  • Send message

  • Set Queue Attributes

Amazon SQS コネクタを Studio フローに追加する

  1. Anypoint Studio で新しい Mule プロジェクトを作成します。

  2. Amazon SQS コネクタを選択して Read 操作をキャンバスにドラッグし、それを選択してプロパティエディタを開きます。

  3. 操作パラメータを設定します。

    amazon sqs demo receive messages
    項目

    Display Name (表示名)

    アプリケーションのコネクタ操作の一意の表示ラベルを入力します。

    Connector Configuration (コネクタ設定)

    ドロップダウンからグローバル Amazon SQS コネクタ要素を選択します。

    Queue URL (キュー URL)

    操作のパラメータを選択します。

    Max no of messages (メッセージの最大数)

    操作のパラメータに値を指定します。

  4. コネクタ設定を保存します。

ユースケースの例

メタデータと共にメッセージを Amazon SQS キューに送信し、キューから受信します。これは、次の 2 つのフローに分割できます。

  1. メタデータと共にメッセージを送信し、キューのメッセージ数を取得して、メッセージが送信されたことを確認する。

  2. メッセージを受信し、メッセージ本文を記録する。

Studio ビジュアルエディタ

Send Message 操作のフロー
Receive および Delete Message 操作のフロー

メッセージを送信するフローを作成する

メッセージをキューに送信して、フローを開始します。

  1. Anypoint Studio で新しい Mule プロジェクトを作成します。

  2. HTTP コネクタの Listener 操作をキャンバスにドラッグし、それを選択してプロパティエディタコンソールを開きます。

  3. 新しい HTTP リスナの設定グローバル要素を追加します。

  4. 項目の [General (一般)] グループで、[Path (パス)] に / という値を設定します。

  5. [Basic Settings (基本設定)] で、プラスボタンをクリックします。

    amazon sqs http config
  6. 次の HTTP パラメータを設定します。その他の項目はデフォルト値のままにします。

    amazon sqs http params
    項目

    Host (ホスト)

    0.0.0.0

    Port (ポート)

    8081

  7. Transform Message コンポーネントを追加して、メタデータを添付します。

    Transform Message コンポーネント
    %dw 2.0
    output application/java
    ---
    {
    	delaySeconds: 0,
    	body: "Hello World",
    	messageAttributes: {
    		"AccountId": {
    			"stringValue" : "000123456",
    			"dataType" : "String.AccountId"
    		} as Object {
    			class: "org.mule.extension.sqs.api.model.MessageAttributeValue"
    		},
    		"NumberId": {
    			"stringValue" : "230.000000000000000001",
    			"dataType" : "Number"
    		} as Object {
    			class : "org.mule.extension.sqs.api.model.MessageAttributeValue"
    		}
    	} as Object {
    		class: "java.util.HashMap"
    	}
    } as Object {
    	class: "org.mule.extension.sqs.api.model.Message"
    }
  8. Amazon SQS コネクタの send message 操作をフローにドラッグし、コネクタをダブルクリックしてそのプロパティエディタを開きます。

  9. [Extension configuration (拡張機能設定)] 項目の右側の [Basic Settings (基本設定)] で、プラスアイコンをクリックし、[Access Key (アクセスキー)]、[Secret Key (シークレットキー)]、[Queue Name (キュー名)] の値を追加します。

  10. コネクタの残りのパラメータを設定します。

    [Send Message (メッセージを送信)] のパラメータ
    項目

    Display Name (表示名)

    コネクタ操作の名前を入力します。

    Connector Configuration (コネクタ設定)

    作成したグローバル設定を選択します。

    Message (メッセージ)

    #[payload]

  11. Mule コンソールに応答を出力するロガーを追加します。

    amazon sqs demo logger
    項目

    Display Name (表示名)

    ロガーの名前を入力します。

    Message (メッセージ)

    Sent Message: [payload] (メッセージを送信: [payload])

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

  12. 別の Amazon SQS コネクタを追加し、[Get approximate number of messages (おおよそのメッセージ数を取得)] を選択してキューのメッセージ数を取得します。

    amazon sqs demo get message count
    項目

    Display Name (表示名)

    コネクタ操作の名前を入力します。

    Connector Configuration (コネクタ設定)

    作成したグローバル設定を選択します。

  13. Mule コンソールに数を出力するロガーを追加します。

    amazon sqs demo logger2

メッセージを受信するフローを作成する

これにより、ユースケースの最初の部分が完了します。次に、メッセージを受信して記録し、キューから削除する別のフローを作成します。

  1. Amazon SQS コネクタをドラッグし、インバウンドエンドポイントとして設定します。

    amazon sqs demo receive messages
    項目

    Display Name (表示名)

    コネクタ操作の名前を入力します。

    Connector Configuration (コネクタ設定)

    作成したグローバル設定を選択します。

    Number of Messages (メッセージ数)

    10

    メッセージプロセッサの [Queue URL (キュー URL)] 属性は、[Global Element Properties (グローバル要素のプロパティ)] の [Queue URL (キュー URL)] よりも優先されます。[Global Element Properties (グローバル要素のプロパティ)] に属する属性 ([Queue Name (キュー名)] や [Queue URL (キュー URL)] など) やメッセージプロセッサの [Queue URL (キュー URL)] が指定されていない場合、コネクタは例外をスローします。

  2. Mule コンソールにメッセージを出力するロガーを追加します。

    項目

    Display Name (表示名)

    任意の名前を入力します。

    Message (メッセージ)

    #[payload]

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

Studio XML エディタ

Anypoint Studio でこのコードが機能するように、Amazon Web Services ログイン情報を提供する必要があります。 コードで変数をそれらの値に置き換えるか、src/main/resources/mule-artifact.properties ファイルで各変数の値を指定できます。

<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:sqs="http://www.mulesoft.org/schema/mule/sqs" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/sqs http://www.mulesoft.org/schema/mule/sqs/current/mule-sqs.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
  <configuration-properties file="mule-artifact.properties"/>
  <sqs:config name="Amazon_SQS_Configuration" doc:name="Amazon SQS Configuration" doc:id="ID_VALUE">
    <sqs:basic-connection accessKey="${sqs.accessKey}" secretKey="${sqs.secretKey}" defaultQueueName="${sqs.queueName}"/>
  </sqs:config>
  <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="ID_VALUE">
    <http:listener-connection host="0.0.0.0" port="8081"/>
  </http:listener-config>
  <flow name="sqs-send-message-operation-demo-flow" doc:id="ID_VALUE">
    <http:listener config-ref="HTTP_Listener_config" path="/" doc:name="Listener" doc:id="ID_VALUE"/>
    <ee:transform doc:name="Transform Message" doc:id="ID_VALUE">
      <ee:message>
        <ee:set-payload><![CDATA[%dw 2.0
output application/java
---
{
	delaySeconds: 0,
	body: "Hello World",
	messageAttributes: {
		"AccountId": {
			"stringValue" : "000123456",
			"dataType" : "String.AccountId"
		} as Object {
			class: "org.mule.extension.sqs.api.model.MessageAttributeValue"
		},
		"NumberId": {
			"stringValue" : "230.000000000000000001",
			"dataType" : "Number"
		} as Object {
			class : "org.mule.extension.sqs.api.model.MessageAttributeValue"
		}
	} as Object {
		class: "java.util.HashMap"
	}
} as Object {
	class: "org.mule.extension.sqs.api.model.Message"
}]]></ee:set-payload>
      </ee:message>
    </ee:transform>
    <sqs:send-message config-ref="Amazon_SQS_Configuration" doc:name="Send message" doc:id="ID_VALUE" message="#[payload]"/>
    <logger level="INFO" doc:name="Logger" doc:id="ID_VALUE" message="#[payload]"/>
    <sqs:get-approximate-number-of-messages config-ref="Amazon_SQS_Configuration" doc:name="Get approximate number of messages" doc:id="ID_VALUE"/>
    <logger level="INFO" doc:name="Logger" doc:id="ID_VALUE" message="#[payload]"/>
  </flow>
  <flow name="sqs-receive-delete-message-operations-demo-flow" doc:id="ID_VALUE">
    <sqs:receivemessages config-ref="Amazon_SQS_Configuration" doc:name="Receivemessages" doc:id="ID_VALUE"/>
    <logger level="INFO" doc:name="Logger" doc:id="ID_VALUE message="#[payload]"/>
  </flow>
</mule>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub