Flex Gateway新着情報
Governance新着情報
Monitoring API Managerサポートカテゴリ: 選択
Azure Service Bus 2.0.0 用 Anypoint Connector
Azure Service Bus 用 Anypoint Connector (Azure Service Bus Connector) を使用すると、クラウドの Azure Service Bus とのメッセージインテグレーションが可能になります。このコネクタでは、キューおよびトピックとの通信がサポートされます。また、組み込みの管理 API を使用して、Azure Service Bus オブジェクトを自動的に検出してプロビジョニングできます。
Azure Service Bus Connector を使用するには、Mule、Anypoint Connector、Anypoint Studio、Mule の概念、Mule フローの要素、グローバル要素に精通していることが必要です。
対象リソースへの接続をテストするには、ログイン情報が必要です。基本認証を使用するか、ログイン情報を接続文字列で渡すことができます。
ソフトウェアの要件および互換性情報については、コネクタのリリースノートを参照してください。
Anypoint Studio で、Studio タスクバーの Exchange アイコンをクリックします。
Anypoint Exchange で [Login (ログイン)] をクリックします。
コネクタを検索して [Install (インストール)] をクリックします。
画面の指示に従ってコネクタをインストールします。
コネクタの設定を開始するには、まず入力元を選択して設定する必要があります。Azure Service Bus Connector では 3 つの入力元がサポートされます。
HTTP Connector。Web サーバーからメッセージを受信します。HTTP Connector を使用すると、コネクタをテストできます。
Queue listener。Azure キューからメッセージを受信します。
Subscription listener。Azure サブスクリプションからメッセージを受信します。
次の手順では、HTTP Connector を入力元として使用します。Queue listener と Subscription listener の使用方法ついての情報は、「入力元」を参照してください。
[Mule Palette (Mule パレット)] で「HTTP」を検索します。
HTTP コネクタを Studio キャンバスにドラッグします。
[Connector Configuration (コネクタ設定)] 項目の横にある [+] をクリックして、コネクタのグローバル要素を設定します。
[OK] をクリックして、デフォルトを受け入れます。
[Mule Palette (Mule パレット)] で「Azure」を検索します。
Azure Service Bus コネクタを選択します。
コネクタをキャンバスの HTTP コネクタの横にドラッグします。
[Connector Configuration (コネクタ設定)] 項目の横にある [+] をクリックして、コネクタのグローバル要素を設定します。
認証種別を選択します。
Basic authentication (基本認証)
[Basic authentication (基本認証)] を使用した場合、Azure Service Bus 名前空間に共有アクセスキーを提供することで、アプリケーションで Azure Service Bus リソースにアクセスできます。共有アクセスキーを使用すると、ユーザーアカウントのアクセスキーをアプリケーションに保存しなくても、ユーザーは Azure Service Bus リソースにアクセスできます。
| 項目 | 説明 |
|---|---|
Name (名前) |
共有アクセス署名の名前。 |
Service namespace (サービス名前空間) |
アプリケーション内の Service Bus リソースに対応するサービス名前空間の名前。 |
Shared access key name (共有アクセスキー名) |
Azure Service Bus 名前空間内で設定するアクセスキーの名前。下位レベルに作成されたアクセスキー (トピックレベルの共有キーなど) は、スタートアップ時の接続性テストを無効にしない限り、このオプションには使用できません。 |
Shared access key (共有アクセスキー) |
ストレージリソースへのアクセスを認証するために使用する 56 ビットの主キー。 |
| Name (名前) | 型 | 説明 | デフォルト値 | 必須 |
|---|---|---|---|---|
Operation Timeout (操作タイムアウト) |
Number (数値) |
エラーがスローされるまでの操作の実行の待機時間 (ミリ秒) |
30000 |
|
Minimum Retry Backoff (最小再試行バックオフ) |
Number (数値) |
再試行の最小バックオフ間隔 (ミリ秒)。 |
0 |
|
Maximum Retry Backoff (最大再試行バックオフ) |
Number (数値) |
再試行の最大バックオフ間隔 (ミリ秒)。 |
30 |
|
Retries (再試行回数) |
Number (数値) |
コネクタの最大再試行回数 |
10 |
|
Thread pool size (スレッドプールサイズ) |
Number (数値) |
接続プール内の最大スレッド数 |
20 |
| Name (名前) | 型 | 説明 | デフォルト値 | 必須 |
|---|---|---|---|---|
Frequency(ms) (頻度 (ms)) |
Number (数値) |
再接続の試行間の時間 |
20000 |
|
Reconnection attempts (再接続試行数) |
Number (数値) |
再接続の試行回数 |
2 |
|
Reconnection forever (繰り返し再接続) |
Boolean (ブール) |
再接続を試行するかどうかを示すフラグ |
false |
Connection string (接続文字列)
*[Connection string (接続文字列)] *認証を使用した場合、アプリケーションは、Azure Service Bus リソースへのアクセスに使用する情報を含む文字列を渡します。
| 項目 | 説明 |
|---|---|
Name (名前) |
接続文字列の名前。 |
Connection string (接続文字列) |
Endpoint、SharedAccessKeyName、および SharedAccessKey パラメーターの値を含む Microsoft Azure ポータルにより提供されるリンク。次に例を示します。
|
Service namespace (サービス名前空間) |
アプリケーション内の Service Bus リソースに対応するサービス名前空間の名前。 |
| Name (名前) | 型 | 説明 | デフォルト値 | 必須 |
|---|---|---|---|---|
Frequency(ms) (頻度 (ms)) |
Number (数値) |
再接続の試行間の時間 |
20000 |
|
Reconnection attempts (再接続試行数) |
Number (数値) |
再接続の試行回数 |
2 |
|
Reconnection forever (繰り返し再接続) |
Boolean (ブール) |
再接続を試行するかどうかを示すフラグ |
false |
再試行と再接続の違い
再試行は、再試行ポリシーがライブラリレベルで使用されるという点で再接続とは異なります。接続の問題があると、Azure は再試行パラメーターを使用して内部的に接続の再確立を試行します。再確立できない場合、ライブラリはコネクタに例外を送信し、コネクタは再接続を試行します。
Azure Service Bus Connector には 2 つの入力元操作があります。
アプリケーションで Azure キューからメッセージを受信する場合、Queue listener 入力元を使用します。ソースを次のように設定します。
| 項目 | 説明 |
|---|---|
Queue name (キュー名) |
イベントを受信するキュー。デッドレターキューからイベントを受信するには、この項目に |
Receive mode (受信モード) |
|
Server timeout (ms) (サーバータイムアウト (ms)) |
ハンドラーがメッセージの処理を完了していない場合にクライアントがメッセージの更新をロックする最大期間 (ミリ秒)。 |
Prefetch count (プリフェッチ数) |
公式の Service Bus クライアントのいずれかで |
CRON expression (CRON 式) |
(省略可能) レシーバーアクションをトリガーするタイミングを指定する UNIX CRON 式。たとえば、この項目を |
Max. messages to receive (受信するメッセージの最大数) |
スケジュールされた操作中に受信するメッセージの最大数。 |
アプリケーションで Azure サブスクリプションからメッセージを受信する場合、Subscription listener 入力元を使用します。ソースを次のように設定します。
| 項目 | 説明 |
|---|---|
Topic (トピック) |
接続先のトピック名。 |
Subscription (サブスクリプション) |
イベントを受信するサブスクリプション。デッドレターキューからイベントを受信するには、 |
Receive mode (受信モード) |
|
Server timeout (ms) (サーバータイムアウト (ms)) |
ハンドラーがメッセージの処理を完了していない場合にクライアントがメッセージの更新をロックする最大期間。 |
Prefetch count (プリフェッチ数) |
公式の Service Bus クライアントのいずれかで |
CRON Expression (CRON 式) |
(省略可能) レシーバーアクションをトリガーするタイミングを指定する UNIX CRON 式。たとえば、この項目を |
Max. messages to receive (受信するメッセージの最大数) |
スケジュールされた操作中に受信するメッセージの最大数。 |
[][examples]] == 例
このトピックには、次の例が含まれています。
次の例では、Azure Service Bus キューまたはトピックにメッセージを送信するように Mule アプリケーションを設定します。次の図は、この例のフローを示しています。
Studio で新しい Mule アプリケーションを作成します。
[Mule Palette (Mule パレット)] で「HTTP」を検索します。
HTTP コネクタを Studio キャンバスにドラッグします。
HTTP コネクタを選択します。
[Connector Configuration (コネクタ設定)] 項目の横にある [+] をクリックして、コネクタのグローバル要素を設定します。
デフォルト項目値のままにして、[OK] をクリックします。
[Mule Palette (Mule パレット)] で「Set Payload」を検索します。
Set Payload トランスフォーマーを Studio キャンバスの HTTP コネクタの横にドラッグします。
[Value (値)] 項目にキューのメッセージ値を入力します。
[Mule Palette (Mule パレット)] で「Azure」を検索します。
Microsoft Azure Service Bus コネクタを Studio キャンバスの Set Payload トランスフォーマーの横にドラッグします。
Microsoft Azure Service Bus コネクタの [Connector Configuration (コネクタ設定)] 項目の横にある [+] をクリックして、コネクタのグローバル要素を設定します。 「コネクタの設定」の説明に従って、コネクタの認証を選択し、設定します。
Microsoft Azure Bus サービスプロパティダイアログで、次のようにコネクタを設定します。
[Operation (操作)] の値を「Publish message」に設定し、[Destination type (宛先種別)] を QUEUE または TOPIC に設定します。
[Destination name (宛先名)] の値を入力します。
Publish message 操作に戻り値はないため、別の Set Payload トランスフォーマーを使用してメッセージ (#["message sent"]) を返します。
http://localhost:8081/send をコールすると、コネクタはメッセージをキューにアップロードします。
次の例では、キューまたはトピックからメッセージを受信した後で肯定応答を送信するように Mule アプリケーションを設定します。次の図は、この例のフローを示しています。
Studio で新しい Mule アプリケーションを作成します。
Microsoft Azure Service Bus コネクタをフロー提供元として追加し、コネクタを設定します。
[Operation field (操作項目)] で [Queue listener] または [Subscription listener] を選択します。
[Connector Configuration (コネクタ設定)] 項目の横にある [+] をクリックして、コネクタのグローバル要素を設定します。グローバル要素の設定についての詳細は、コネクタの設定 を参照してください。
メッセージをキューに送信するには、キューの名前を [Queue name (キュー名)] 項目に入力します。
メッセージをトピックに送信するには、トピックの名前を [Topic (トピック)] 項目に入力し、サブスクリプションの名前を [Subscription (サブスクリプション)] 項目に入力します。
[Receive mode (受信モード)] 項目を MANUAL に設定します。
Record Variable、Session Variable、または Variable トランスフォーマーをキャンバスの Microsoft Azure Service Bus の横にドラッグします。
次の式を使用して、受信したメッセージからのロックトークン値を保存します。
#[message.inboundProperties.get('lockToken')]
テスト目的で、メッセージを使用してアクションを実行します。たとえば、Logger コンポーネントを使用してメッセージを記録します。
別の Microsoft Azure Service Bus コネクタをキャンバスの変数コンポーネントの横に追加して、Acknowledge message 操作を選択します。
[Lock (ロック)] 項目値では、保存された lockToken 値を取得します。
コネクタは受信したすべてのメッセージを処理し、正常に処理された各メッセージに肯定応答します。
コネクタでメッセージに自動的に肯定応答するには、[Receive mode (受信モード)] 項目を AUTOMATIC に設定します。
次の例では、指定されたスケジュールでキューまたはトピックからメッセージを受信する Mule アプリケーションを設定します。
キューまたはトピックへのメッセージの送信 の説明に従って、キューまたはトピックからメッセージを受信するように Mule アプリケーションを作成します。
アプリケーションを作成する場合、次の操作を行います。
[CRON Expression (CRON 式)] 項目を、UNIX 標準に従う CRON 式に設定します。たとえば、毎日午前 8:00 に接続してメッセージを受信するには、0 8 * * * を使用します。
[Max. Messages to receive (受信するメッセージの最大数)] 項目を、CRON 式で受信がトリガーされるたびにコネクタで受信するメッセージの最大数に設定します。
CRON 式で命令されるたびにコネクタは一括受信を実行します。設定された最大数まで、使用可能な任意の数のメッセージがフローにより取得され、処理されます。
次の例では、キューに複数のメッセージを送信するように Mule アプリケーションを設定します。
キューまたはトピックへのメッセージの送信 の説明に従って、キューまたはトピックからメッセージを受信するように Mule アプリケーションを作成します。
Publish message 操作を Publish batch of messages 操作に置換えます。[Destination type (宛先種別)] と [Destination name (宛先名)] が正しく設定されていることを確認します。
アプリケーションは複数のメッセージを同時に送信するため、メッセージを作成する場合は、文字列のカンマ区切りリストを必ず作成します。
メッセージのカンマ区切り文字列を別々のメッセージに分割するには、Transform message トランスフォーマーをキャンバスにドラッグして次のスクリプトを使用します。
%dw 1.0
%output application/java
payload splitBy ','
次の例では、指定した時間にすべてのリスナーにメッセージを送信するように Mule アプリケーションを設定します。
キューまたはトピックへのメッセージの送信 の説明に従って、キューまたはトピックからメッセージを受信するように Mule アプリケーションを作成します。
Azure Service Bus Connector のプロパティウィンドウで、[Scheduled enqueue time UTC (スケジュールされたエンキュー時間 UTC)] 項目に、メッセージをディスパッチする日付または時間の値を入力します。たとえば、2019-06-27T21:16:46.866Z のようにします。
メッセージが宛先に送信され、宛先では指定された時間にメッセージがディスパッチされます。
Azure Service Bus Connector では、次の Azure Service Bus REST API 操作がサポートされます。
Queue (キュー)
Create (作成)
Get
Get Size (サイズを取得)
List
Update (更新)
Delete
Topic (トピック)
Create (作成)
Get
List
Update (更新)
Delete
Subscription
Create (作成)
Get
Get Size (サイズを取得)
List
Update (更新)
Delete
Rule (ルール)
Create (作成)
Get
List
Update (更新)
Delete
Queue listener 操作および Subscription listener 操作では、出力ペイロードは、メッセージを含む文字列になります。コネクタは追加の属性をアウトバウンドプロパティで返します。
リソースが amqp メッセージのみを送受信できる状況では、[Advanced (詳細)] タブ内のオプション [AMQP-only credentials (AMPQ 専用ログイン情報)] を有効にします。
Mule が大量のメッセージ (1 秒あたり数千件など) を処理する場合 (特に 1 つの Mule アプリケーションで同じ宛先に対して公開と読み取りを行う場合)、ログに次のようなメッセージが表示されることがあります。
[ERROR 2019-05-22 00:15:26,362 [ForkJoinPool.commonPool-worker-3]
com.microsoft.azure.servicebus.MessageAndSessionPump: onMessage with
message containing sequence number '95' threw exception com.mulesoft.connectors.microsoft.azure.servicebus.internal.error.exception.ConsumerException: Failed
setting attributes from original API message.]
デフォルトでは Mule はエラーを非同期で処理するため、エラーが発生する可能性があります。このエラーを修正するには、フローでメッセージリスナーを設定するときに queue-asynchronous を設定します。
メッセージがフローで処理されるまで待機している時間が 30,000 ミリ秒を超えた場合、Mule は例外をスローし、エラー (Mule SEDA キューのタイムアウト) が発生します。
このエラーを回避するには、カスタムのキュー非同期設定を作成し、次のいずれかまたは両方を行います。
maxThreads プロパティでスレッド数を増加します (デフォルトは 16)。
threadWaitTimeout プロパティで待機時間を増加します (デフォルトは 30,000 ミリ秒)。
設定を変更する手順は、次のとおりです。
フローを選択します。
[General (一般)] → [Optional Processing Strategy (省略可能な処理戦略)] → [Processing Strategy Ref (処理戦略参照)] を選択します。
[+] をクリックします。
新しい [Queued Asynchronous Processing Strategy (キューに登録された非同期処理戦略)] を追加します。
詳細は、「フロー処理戦略」を参照してください。
手動 ACK を使用しているときに大量のメッセージを操作していると、次のようなエラーを受け取ることがあります。
[com.mulesoft.connectors.microsoft.azure.servicebus.internal.error.exception.ConsumerException: Message
with lockToken 2dc1312f-b263-4282-bbb0-566998eff6e6 could not be ACK at com.mulesoft.connectors.microsoft.azure.servicebus.internal.connection.Connection.ack(Connection.java:192)]
[ERROR 2019-05-22 00:17:30,822 [ReactorThread6f355ff5-5a36-487b-bb70-1a995a6ddf74]
com.microsoft.azure.servicebus.primitives.CoreMessageReceiver: Completing pending
updateState operation for delivery '? ? &??I??=????' with exception com.microsoft.azure.servicebus.primitives.MessageLockLostException: The lock supplied
is invalid. Either the lock expired, or the message has already been removed from the queue.]
コネクタでプリフェッチが実行されると、lockToken の有効期間は、その瞬間を基準にして固定されます。lockToken の有効期間にレコードのバッチ全体を処理するための十分な長さがない場合、問題が発生します。この場合、肯定応答を実行する前に lockToken が期限切れになるため、Mule はエラーをスローする可能性があります。
この問題を回避するには、プリフェッチのサイズ (デフォルトは 100) を減らすか、ロックトークンの有効期間を長くするか、その両方を行います。この操作は、コネクタまたは Azure ポータル (既存のキューの場合) からキューまたはサブスクリプションを作成するときに [lock duration (ロック期間)] プロパティで行うことができます。Azure のロックトークンの最大期間の値は 5 分 (300 秒) です。
場合によって、コネクタで使用されている基盤となるライブラリが WARN レベルの完全なスタックトレースを定期的に記録することがあります。このメッセージは問題を表すものではありませんが、ログがわかりづらくなる可能性があります。ログのノイズを減らすには、src/main/resources/log4j2.xml ファイルに次のいずれかの変更を加えます。
ライブラリのパッケージに対して AsyncLogger を指定して、ログのレベルを上げます。
<!-- Recommended for packages not belonging to Mule -->
<AsyncLogger name="com.microsoft.azure.servicebus" level="ERROR"/>
既存のアペンダー (つまり RollingFile) に RegexFilter を追加します。
<!-- Avoids the log of messages that contain the specified regexp -->
<RegexFilter regex = ".message to filter.*" onMatch="DENY" onMismatch="ACCEPT"/>
重要: メッセージが複数行の場合、RegexFilter は正しく機能しないことがあります。
ネットワーク接続の問題がある場合、再接続戦略を指定して、Azure Service Bus Connector のソースコンポーネントに、再接続を自動的に試行するように指示できます。再接続戦略は、コネクタ設定を作成または編集するときに指定できます。
コネクタ設定の [General (一般)] タブで、[Connector Configuration (コネクタ設定)] 項目の横にある [+] をクリックします。
[Connector Configuration (コネクタ設定)] オプションのいずれかを選択します。
[Reconnection (再接続)] タブで、[Standard Reconnection (標準再接続)] を選択します。
必要に応じて、[Frequency (ms) (頻度 (ミリ秒))] 項目と [Reconnection Attempts (再接続試行回数)] 項目にデフォルト以外の値を指定します。
ネットワーク接続に微細な切断がある場合、非同期メッセージ送信フローで例外が発生し、メッセージが失われることがあります。これが発生するのは、Azure TimeoutException 例外 (com.microsoft.azure.servicebus.primitives.TimeoutException) がコネクタ再接続戦略の影響を受けないためです。このため、コールを非同期で実行するスレッドプール内で例外が生成された場合、その例外ではキャッチ例外戦略は使用されれません。この戦略を実装するには、XML を使用して Mule アプリケーションフロー内でソリューションをプログラムする必要があります。
次の例は、送信操作タイムアウトを処理する 1 つの方法を示しています。例について説明します。
アプリケーションは、トピックが宛先として使用されていることと、冗長なサブスクリプションをトピックに追加する権限がユーザーにあることを想定しています。
アプリケーションは MongoDB 用 Anypoint Connector (MongoDB Connector) を使用して、未送信メッセージを収集します。
15 分ごとに、cron ジョブ (ポーリングスコープ) は、未送信メッセージの MongoDB コレクションに 1 時間以上保持されているメッセージを Azure Service Bus に戻します。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
xmlns:tracking="http://www.mulesoft.org/schema/mule/ee/tracking"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:microsoft-azure-service-bus="http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus" xmlns:mongo="http://www.mulesoft.org/schema/mule/mongo"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/mongo
http://www.mulesoft.org/schema/mule/mongo/current/mule-mongo.xsd
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
http://www.mulesoft.org/schema/mule/ee/dw
http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd
http://www.mulesoft.org/schema/mule/ee/tracking
http://www.mulesoft.org/schema/mule/ee/tracking/current/mule-tracking-ee.xsd">
<http:listener-config
name="HTTP_Listener_Configuration"
host="0.0.0.0" port="8081"
doc:name="HTTP Listener Configuration"/>
<mongo:config
name="Mongo_DB__Configuration"
username="${mongodb.config.username}"
password="${mongodb.config.password}"
database="${mongodb.config.database}"
host="${mongodb.config.server}"
authenticationDatabase="${mongodb.config.database.auth}"
doc:name="Mongo DB: Configuration"/>
<microsoft-azure-service-bus:basic-authentication-config
name="Microsoft_Azure_Service_Bus__Basic_authentication"
sharedAccessKeyName="${azure.config.key.name}"
sharedAccessKey="${azure.config.shared.access.key}"
namespace="${azure.config.service.namespace}"
doc:name="Microsoft Azure Service Bus: Basic authentication"/>
<flow name="Flow_Send_Async">
<http:listener config-ref="HTTP_Listener_Configuration"
path="/send-async"
doc:name="HTTP"/>
<byte-array-to-string-transformer
doc:name="Byte Array to String"/>
<set-variable
variableName="azure_message"
value="#[payload]"
doc:name="Set Payload in Variable"/>
<dw:transform-message
doc:name="Format Document - Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
message : flowVars.azure_message,
timestamp: now as :string
}]]></dw:set-payload>
</dw:transform-message>
<mongo:insert-document
config-ref="Mongo_DB__Configuration"
collection="${mongodb.collection}"
doc:name="Insert Document - Mongo DB"/>
<microsoft-azure-service-bus:publish
config-ref="Microsoft_Azure_Service_Bus__Basic_authentication"
destination="TOPIC||${azure.topic}" body="#[flowVars.azure_message]"
doc:name="Microsoft Azure Service Bus"/>
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
id_azure: payload,
message_azure: flowVars.azure_message
}]]></dw:set-payload>
</dw:transform-message>
</flow>
<!-- Messages sent successfully are removed from MongoDB. -->
<!-- Only messages not received by Azure should remain, to retry later. -->
<flow name="Flow_Remove_Sent_Messages">
<microsoft-azure-service-bus:subscription-listener
config-ref="Microsoft_Azure_Service_Bus__Basic_authentication"
topic="${azure.topic}"
subscription="${azure.topic.suscription.redundant}"
doc:name="Microsoft Azure Service Bus (Streaming)"/>
<byte-array-to-string-transformer doc:name="Byte Array to String"/>
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
message : payload
}]]></dw:set-payload>
</dw:transform-message>
<mongo:remove-documents
config-ref="Mongo_DB__Configuration"
collection="${mongodb.collection}"
doc:name="Mongo DB"/>
<logger
message="#["Document sent successfully. The Mongo document is removed (from retry collection):" + payload]"
level="INFO" doc:name="Logger"/>
</flow>
<!-- Messages that are in MongoDB and are still alive after 1 hour are -->
<!-- sent again (a time is expected to avoid a race condition with the removal process). -->
<flow name="Flow_Retry_Send_Message">
<poll doc:name="Poll - Retry">
<fixed-frequency-scheduler frequency="15" timeUnit="MINUTES"/>
<dw:transform-message doc:name="Transform Message">
<dw:set-payload><![CDATA[%dw 1.0
%output application/json
---
{
}]]></dw:set-payload>
</dw:transform-message>
</poll>
<mongo:find-documents
config-ref="Mongo_DB__Configuration"
collection="${mongodb.collection}"
doc:name="Mongo DB"/>
<foreach collection="#[payload]" doc:name="For Each">
<choice doc:name="Choice">
<when expression="#[new org.mule.el.datetime.DateTime(payload.timestamp).plusHours(1).isBefore(server.dateTime)]">
<microsoft-azure-service-bus:publish
config-ref="Microsoft_Azure_Service_Bus__Basic_authentication"
destination="TOPIC||${azure.topic}"
body="#[payload.message]"
doc:name="Microsoft Azure Service Bus"/>
</when>
<otherwise>
<logger message="#["The message shouldn't be resent yet: " + payload]"
level="INFO"
doc:name="Logger"/>
</otherwise>
</choice>
</foreach>
</flow>
</mule>
Anypoint Studio でアプリケーションをデザインする場合、コネクタを [Mule Palette (Mule パレット)] ビューから Anypoint Studio キャンバスにドラッグすると、コネクタの名前空間とスキーマの場所が XML コードに自動的に入力されます。
名前空間: http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
スキーマの場所: http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
Studio XML エディターまたはその他のテキストエディターで Mule アプリケーションを手動でコーディングする場合は、次のステートメントを設定 XML のヘッダーの <mule> タグの内部に貼り付けてください:
<mule
...
xmlns:microsoft-azure-service-bus="http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus"
...
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus
http://www.mulesoft.org/schema/mule/microsoft-azure-service-bus/current/mule-microsoft-azure-service-bus.xsd
...
...">