Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerAnypoint MQ Connector の Subscriber ソースは、アプリケーションで新しいメッセージをリスンし、宛先に到達したメッセージをコンシュームできます。さまざまなリスニング戦略を設定し、パフォーマンス、予測可能性、スケジュールに合わせてコンシュームを調整できます。
デフォルトでは、Subscriber ソースは継続的リスニングモードで機能し、キューに到達するとすぐにメッセージをコンシュームしてスループットを最大化します。これは、prefetch
subscriber-type 設定と呼ばれます。prefetch
モードを使用する場合、Subscriber ソースはメッセージのローカルバッファが常にフルになるように試みます。これにより、アプリケーションでメッセージを受け入れ可能になり次第、Mule フローにディスパッチできます。
既知の問題のため、flow 要素で maxConcurrency="1" が設定された状態で prefetch モードを使用すると、メッセージ処理のレイテンシーが発生する可能性があります。
|
prefetch
subscriber-type 設定で、maxLocalMessages
パラメーターを設定してフルバッファの対象サイズを指定し、どれだけ迅速にキューからメッセージが取得されるのかを制御します。
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
</anypoint-mq:subscriber>
より大きなバッファサイズを使用してパフォーマンスを最大化する。他のコンシューマーはローカルで保持されるメッセージを使用できないことに留意してください。
コンシューマーがキューからメッセージを取得してローカルバッファに保存すると、メッセージはインフライトメッセージとして他のコンシューマーに表示され、他のコンシューマーはコンシュームできなくなります。メッセージは、アプリケーションで処理を開始するまで必要に応じてこのインフライト状態のままになります。
より小さなバッファサイズを使用して、キューにパブリッシュされたときにメッセージを処理する。
バッファを小さくすると、アプリケーションレベルのスループットが制限され、競合するコンシューマーがブロックされることを回避できます。
prefetch
モードを使用する場合、maxLocalMessages
を 1
より大きな値に設定して、メッセージ処理のレイテンシーの問題を回避してください。
maxLocalMessages
が 1
に設定されている場合、コネクタでメッセージがプリフェッチされるが、アプリケーションでコンシュームされず、メッセージが再処理のためにキューに戻ると、レイテンシーが発生する可能性があります。
prefetch
モードを使用する場合、acknowledgementTimeout
タイマーは、メッセージがキューから取得されたときに開始されます。
Anypoint MQ は、acknowledgementTimeout
で指定された有効期限 (時間) を順守しようとします。
ただし、メッセージがバッファに保持されている時間がタイムアウト値の 80% に達すると、Anypoint MQ は有効期限タイムアウトを延長します。
次のような状況では、prefetch
モードを使用しないでください。
maxConcurrency
属性を使用して同時処理を制限している、処理時間が長いアプリケーション。
これらのオプションにより、メッセージがバッファに長く残る可能性があり、その結果メッセージが期限切れになり、キューに送り返される可能性があります。
FIFO キューを使用している場合
prefetch
モードを使用すると、Anypoint MQ が順序を保証できないため、FIFO キューでは使用しないでください。
処理時間の長いアプリケーションで prefetch
モードを使用すると、Anypoint MQ はローカルバッファにあるすべてのメッセージの ACK TTL を監視し、メッセージがキューに送り返されないように TTL を延長するため、大量のリソースがコンシュームされる可能性があります。
アプリケーションで最大スループットではなく、メッセージのコンシュームを予測して制御する必要がある場合、ポーリングソースとして Subscriber を設定できます。Subscriber ソースは、固定スケジュールレートでキューの新規メッセージをチェックします。ポーリングがトリガーされるたびに、Subscriber ソースは 1 ~ 10 個のメッセージを取得して、フローに 1 つずつ順番にディスパッチします。
メッセージは、すぐにコンシュームされません。代わりに、Subscriber ソースの次のポーリングがトリガーされるまでキューに保持されます。キューには、取得されるメッセージが最大 10 個含まれます。たとえば、キューに 15 個のメッセージがある場合、1 つのポーリングで最初の 10 個のメッセージがコンシュームされ、残りの 5 個のメッセージは次のポーリングを待機します。
Subscriber をポーリングソースとして設定するには、polling
subscriber-type を指定します。
デフォルトの polling
subscriber-type の宣言は次のようになります。
<anypoint-mq:subscriber doc:name="Subscriber" config-ref="Anypoint_MQ_Config" destination="myQueue">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling >
<scheduling-strategy >
<fixed-frequency />
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
デフォルトの polling
subscriber-type の宣言では、宣言された宛先から 1 秒ごとにメッセージ 10 個の取得を試み、取得したメッセージを MuleMessage
インスタンスとして 1 つずつフローにディスパッチします。
結果の MuleMessage
には、以下が含まれます。
メッセージのペイロードとしての本文
メッセージ属性に含まれるメッセージのメタデータ
標準 Mule Runtime Engine スケジュール戦略を使用して、Subscriber ソースのポーリング戦略をカスタマイズできます。
Subscriber ソースの新規メッセージのポーリング頻度はカスタマイズできます。これを行うには、カスタマイズされた fixed-frequency
スケジュール戦略を使用して、polling
subscriber-type 戦略を宣言します。
デフォルトの fixed-frequency
スケジュール戦略宣言は次のようになります。
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling>
<scheduling-strategy >
<fixed-frequency/>
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
Anypoint MQ サブスクライバーは、cron
スケジュール戦略を使用することもできます。このスケジュール戦略を使用すると、「毎日午後 2 時から 2 時 59 分まで、毎分実行」のようなジョブのスケジュールが可能になります。このスケジュール戦略を使用するには、カスタマイズされた cron
スケジュール戦略を使用して polling
subscriber-type 戦略を宣言します。
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
<anypoint-mq:subscriber-type >
<anypoint-mq:polling >
<scheduling-strategy >
<cron expression="0 * 14 * * ?" timeZone="America/Los_Angeles" />
</scheduling-strategy>
</anypoint-mq:polling>
</anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>
フローの実行が正常に完了した場合や、処理前にアプリケーションがメッセージをコンシュームした場合にのみ受信したメッセージに自動的に肯定応答できます。
自動
デフォルトでは、Subscriber ソースは自動肯定応答モードを使用します。このモードでは、メッセージフローの処理が成功した後で Subscriber ソースが取得するメッセージが自動的に肯定応答されます。つまり、Subscriber ソースはメッセージを取得してフローにディスパッチし、どのようにメッセージの処理が完了するのかを確認するために待機します。例外なしで処理が完了した場合にのみ ACK (肯定応答) を実行します。
例外が伝播されて処理フローの実行が完了すると、メッセージは自動的に肯定応答されず、再配信のためにキューに返されます。
詳細は、「自動肯定応答」を参照してください。
即時
IMMEDIATE
肯定応答モードを使用する場合、コンシュームされたメッセージが肯定応答 (キューから削除) され、処理のために Mule フローにディスパッチされます。
メッセージの肯定応答に失敗すると、メッセージは破棄されます。メッセージはフローにディスパッチされず、肯定応答がタイムアウトするまでインフライトのままになります。
アプリケーションが肯定応答とディスパッチの間で再起動した場合、メッセージはキューから削除されているため、再度使用できない可能性があります。
処理されるまでメッセージが削除されないようにするには、代わりに [AUTO (自動)]
または [MANUAL (手動)]
肯定応答モードを使用します。
IMMEDIATE
肯定応答モードでプリフェッチモードの Subscriber ソースを使用すると、スレッドが累積したり、アプリケーションが無応答になったりする場合があります。
詳細は、「即時肯定応答」を参照してください。
手動
手動肯定応答を実行するには、結果のメッセージ属性の一部として提供される ackToken
の値が必要です。
詳細は、「手動肯定応答」を参照してください。
肯定応答タイムアウトについての詳細は、「肯定応答タイムアウト」を参照してください。
Subscriber ソースは、サーキットブレーク機能を提供します。この機能を使用すると、コンシュームされたメッセージの処理中に発生するエラーをコネクタでどのように処理するのかを制御できます。
たとえば、外部サービスに接続している場合、サーキットブレーカーを使用してそのサービスのダウンタイムを処理できます。サーキットブレーカーでは、要求の実行を停止したり、負荷を減らして外部サービスを回復したりできます。
Mule 4 アプリケーションでの Anypoint MQ の使用は、キューからメッセージをコンシュームし、外部サービスを使用して処理を行う、MQ サブスクライバーを使用する Mule フローがあることを意味します。このサービスが使用できない場合、次のようになります。
要求が失敗する。
エラーが発生する。
メッセージの処理が失敗に終わるか、デッドレターキュー (DLQ) にメッセージを送信するなど、メッセージ処理のカスタムエラーで終了します。
外部サービスを利用できない場合、メッセージを処理するすべての試みは失敗し、アプリケーションは成功しないメッセージのコンシュームを繰り返し実行し続けます。この動作は、一定期間メッセージをさらにコンシュームしないように、サブスクライバーにエラーを通知すれば回避できます。
Subscriber ソースによって提供されるサーキットブレーカー機能は、Mule で提供されるエラー処理メカニズムにバインドされています。エラー通知メカニズムを使用してサーキット障害と呼ばれる外部サービスに関連するエラー数を記録します。サーキット障害にはあらゆるエラーをバインドできます。たとえば、HTTP:TIMEOUT
や FTP:SERVICE_NOT_AVAILABLE
、さらには ORG:EXTERNAL_ERROR
のようなアプリケーションのカスタムエラーもバインドできます。
Mule フローの実行がエラーに終わった場合、Subscriber ソースはエラーが外部サービスエラーを示す onErrorTypes
のいずれかであるかどうかをチェックし、errorsThreshold
に達するまで、連続発生数を数えます。
errorsThreshold
に達すると、サーキットがトリップし、tripTimeout
によって指定される期間中、オープン状態のままになります。
サーキットがオープン状態の間、ポーリングまたはコンシュームされたすべてのメッセージは肯定応答 (NACK) されず、Anypoint MQ は tripTimeout
が経過するまでその後の操作 (ポーリング、ACK、NACK、パブリッシュ) を停止します。
tripTimeout
が経過すると、サーキットブレーカーは新しい状態に移行します。
新しい状態が [Closed (クローズ)] の場合、クライアントは次のポーリングでメッセージをコンシュームできます。
アプリケーションを CloudHub にデプロイするワーカーが 1 つか複数かによって、この動作は異なります。
ワーカーはメッセージをキューから個別に処理し、エラー数が errorsThreshold
の値に達するとサーキットブレーカーをオープン状態にします。
各ワーカーに関連付けられたサーキットブレーカーがあり、errorsThreshold
と tripTimeout
の値を個別に追跡します。
各ワーカーはメッセージをキューから個別に処理し、エラー数がそのワーカーで設定された errorsThreshold
の値に達するとサーキットブレーカーをオープン状態にします。
デフォルトでは、サーキットブレーク機能は無効になっています。
サーキットブレーカーは、3 つの状態 (クローズド、オープン、ハーフオープン) があります。アプリケーションの動作は、現在の状態に基づいて変わります。詳細は、 「Microsoft Circuit Breaker pattern (Microsoft サーキットブレーカーパターン)」を参照してください。
クローズド
通常 Subscriber ソースが設定に基づいて MQ からメッセージを取得する開始状態。実質的にサーキットブレーカーが存在しないかのように機能します。
クローズドからオープンへの遷移
メッセージの処理中に成功することなく失敗が連続して発生し、errorsThreshold
値に達すると、サーキットブレーカーがトリップし、オープン状態に遷移します。
フローにすでにディスパッチされたメッセージは、結果が成功か失敗かに関係なく処理を完了します。
ブローカーのインフライトであるが、まだディスパッチされておらず、ローカルに保持されているメッセージは、肯定応答されず、別のコンシューマーに再配信するためにキューに戻されます。
オープン
Subscriber ソースはメッセージの取得を試みず、tripTimeout
に達するまで通知せずにイテレーションをスキップします。
ハーフオープン
tripTimeout
の経過後、Subscriber ソースはハーフオープン状態になります。次回のメッセージのポーリングで、Subscriber ソースは通常のクローズド状態に戻る前にサービスからメッセージを 1 つ取得し、そのメッセージを使用してシステムが回復したかどうかをチェックします。
Subscriber ソースが 1 つのメッセージを正常にフェッチしてフローにディスパッチし、処理が正常に完了すると、Subscriber ソースは通常の状態に戻り、すぐに追加のメッセージのフェッチを試みます。
Mule フローの処理が、予期される onErrorTypes
のいずれかで失敗すると、サーキットはオープン状態に戻り、tripTimeout
タイマーがリセットされます。
サーキットブレーカーは、グローバルサーキットブレーカーまたは非公開サーキットブレーカーとして設定できます。
どちらの方法でも、設定パラメーターは同じです。
onErrorTypes
フローの実行中に障害とみなされるエラー種別。エラーの発生は、フローがエラーの伝達によって完了した場合にのみ計数されます。デフォルトでは、すべてのエラーがサーキット障害とみなされます。
errorsThreshold
サーキットブレーカーがオープンになるのに発生する必要がある onErrorTypes
エラーの回数。
tripTimeout
errorsThreshold
に達してからサーキットがオープンのままである期間。
circuitName
この設定にバインドするサーキットブレーカーの名前。デフォルトでは、各キューに独自のサーキットブレーカーがあります。
各サブスクライバーが同じ「サーキット」の一部であるかのようにサーキットの状態を複数のサブスクライバーで共有する場合、グローバルサーキットブレーカーを使用します。
Anypoint Studio で、キャンバスの [Global Elements (グローバル要素)] タブをクリックします。
[Create (作成)] > [Component Configuration (コンポーネント設定)] > [Circuit Breaker (サーキットブレーカー)] を選択します。
設定ウィザードで、必要に応じて次の項目を入力します。設定が完了したら、Anypoint MQ サブスクライバーからこの Circuit_breaker
宣言を参照できます。
サーキットブレーカーを参照する手順は、次のとおりです。
キャンバスで Subscriber ソースを選択します。
[Advanced (詳細)] タブをクリックします。
[Circuit Breaker (サーキットブレーカー)] > [Global Reference (グローバル参照)] を選択し、リストからグローバルサーキットブレーカー設定を選択します。
非公開サーキットブレーカーは、1 つのサブスクライバーで内部的に宣言します。このサーキットの宣言は、Subscriber ソースが宣言されているフローでみ使用され、他のすべてのサーキットから分離されています。
この設定を使用する手順は、次のとおりです。
キャンバスで Subscriber ソースを選択します。
[Advanced (詳細)] タブをクリックします。
[Circuit breaker (サーキットブレーカー)] > [Edit Inline (インライン編集)] を選択し、項目に入力します。
この例では、1 つのサブスクライバーがキューからメッセージをコンシュームし、REST API を使用して別のサービスにメッセージを投稿します。外部サービスに 5 回要求してタイムアウトした後、メッセージの処理を停止できます。処理が停止したら、Subscriber ソースは新しいメッセージで再試行する前に、サービスの回復を 30 秒待機します。
この例では、設定で次のサーキットブレーカーパラメーターを指定する必要があります。
<anypoint-mq:config name="Anypoint_MQ_Config">
<anypoint-mq:connection url="${providerUrl}"
clientId="${clientId}"
clientSecret="${clientSecret}"/>
</anypoint-mq:config>
<flow name="subscribe">
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
destination="${subscribedQueue}">
<anypoint-mq:circuit-breaker
onErrorTypes="HTTP:TIMEOUT" <!-- (1) -->
errorsThreshold="5" <!-- (2) -->
tripTimeout="30" <!-- (3) -->
tripTimeoutUnit="SECONDS"/>
</anypoint-mq:subscriber>
<http:request config-ref="RequesterConfig"
path="/external" method="POST"/> <!-- (4) -->
</flow>
(1) |
サーキットをトリップさせるエラー種別を設定します。エラーが |
(2) |
サーキットが障害状態にあるとみなされるために連続して発生する必要があるメッセージ数のしきい値を設定します。 |
(3) |
|
(4) |
|
サーキットブレーカーでは、onErrorTypes
パラメーターにリストされていないすべてのエラーが無視されます。この例では、HTTP:BAD_REQUEST
などのエラーがサーキットブレーカーで無視されます。
多くの場合、1 つの共通サービスでさまざまなキューからのメッセージを処理します。この例では、両方のサブスクライバーを 1 つのサーキットにバインドするよう circuitName
パラメーターを設定します。
<anypoint-mq:config name="Anypoint_MQ_Config">
<anypoint-mq:connection url="${providerUrl}"
clientId="${clientId}"
clientSecret="${clientSecret}"/>
</anypoint-mq:config>
<anypoint-mq:circuit-breaker
name="InvoiceProcess" <!-- (1) -->
onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" <!-- (2) -->
errorsThreshold="10"
tripTimeout="5"
tripTimeoutUnit="MINUTES"/>
<flow name="subscribe">
<anypoint-mq:subscriber destination="${reservationsQueue}"
config-ref="Anypoint_MQ_Config"
circuitBreaker="GlobalCircuit"/> <!-- (3) -->
<flow-ref name="invoiceProcess">
</flow>
<flow name="otherSubscribe">
<anypoint-mq:subscriber
destination="${paymentsQueue}"
config-ref="Anypoint_MQ_Config"
circuitBreaker="GlobalCircuit"/> <!-- (3) -->
<flow-ref name="invoiceProcess">
</flow>
<sub-flow name="invoiceProcess">
<ftp:write path="${auditFolder}" <!-- (4) -->
config-ref="ftp-config"/>
<http:request config-ref="requestConfig" <!-- (5) -->
path="/external"/>
</sub-flow>
(1) |
共通のサーキットブレーカーを複数のキューで共有するよう |
(2) |
Subscriber ソースからのメッセージの処理に影響する可能性がある 2 つのエラーを識別し、それぞれ CSV リストとして渡します。 |
(3) |
両方のサブスクライバーで、グローバルサーキットブレーカー設定を参照します。 |
(4) |
複数のエラーがスローされる場合もありますが、サーキットブレーカーに関連しているのは |
(5) |
HTTP Connector は |
このシナリオの場合、両方のサブスクライバーで FTP:RETRY_EXHAUSTED
および HTTP:SERVICE_UNAVAILABLE
の両方のエラーを数え、その数が errorsThreshold="10"
値に達すると直ちにメッセージのポーリングが停止されます。tripTimeout
値が経過すると、一方のサブスクライバーでメッセージをポーリングし、そのメッセージを使用してサーキットをテストします。そのメッセージの処理が成功すれば、両方のサブスクライバーでポーリングが有効になります。
FIFO キューは、単一コンシューマーのシナリオに最適です。1 つのコンシューマーがメッセージにアクセスしている場合、その他すべてのコンシューマーは最初のバッチが処理されるまでブロックされます。すべてのインフライトメッセージが肯定応答されるか否定応答されるまで、メッセージは配信されません。
メッセージグループを使用すると、複数のコンシューマーが FIFO キュー内のメッセージに同時にアクセスできます。この場合、あるコンシューマーがグループ内のメッセージにアクセスしているときに、別のコンシューマーは別のグループ内のメッセージにアクセスします。メッセージの順序は、各メッセージグループ内で保持されます。
FIFO キューでは、メッセージグループ ID でメッセージを取得することはできません。
メッセージの処理順序を保持するには、FIFO キューからのメッセージをコンシュームするフローで maxConcurrency の値を 1 に設定します。
|
FIFO キューは、クラスター環境と非クラスター環境で同様に動作します。
FIFO キューは、指定された順序でメッセージをコンシュームします。メッセージがコンシュームされたら、その後のメッセージ処理を他のノードに分散できます。この場合、コンシューマーがメッセージを完全に処理する前に肯定応答すると、メッセージ処理中にメッセージの順序が変更される可能性があります。
オンプレミスの高可用性クラスタリング環境では、デフォルトで Anypoint MQ Connector の Subscriber ソースがすべてのノードで実行されます。
[Advanced (詳細)] タブの [Primary node only (プライマリノードのみ)] を選択して、プライマリノードとして実行するように動作を変更できます。
複数のワーカーを使用する CloudHub では、すべてのワーカーがプライマリノードとして実行されます。
この場合、アプリケーションを実行しているすべてのワーカーが同じ FIFO キューからコンシュームします。