新規メッセージのリスン

Anypoint MQ コネクタの Subscriber ソースは、宛先に到着したメッセージをコンシュームする機能を提供します。サブスクライバの動作の調整は、コネクタの設定内から行います。

メッセージのプリフェッチ

デフォルトでは、コネクタは最大メッセージスループットを最適化する設定を提供します。つまり、メッセージの prefetch が有効になっている場合、最大 fetchSize が可能です。

prefetch モードを使用しているとき、サブスクライバはいつでも fetchSize の 3 倍のローカルバッファをいっぱいにしようと試みます。つまり、メッセージを取得し、バッファで利用できるようにするために (最大 3 つの同時呼び出しで) 必要な数のサービス呼び出しを行います。その後、メッセージは処理のためにローカルバッファからフローにディスパッチされます。

Mule は多数のメッセージを高速で受け入れ、できるだけ多くのスレッドを使用して同時処理を行うため、バッファがいっぱいになることはほとんどありません。 fetchSize で定義された最大メッセージ数が必ずしもサービスへの要求すべてで提供されるわけではないので、負荷が低い場合、バッファをいっぱいにするには、3 つを超える要求が必要になる可能性があります。

プリフェッチは、次の設定要素を使用して調整できます。

<anypoint-mq:default-subscriber-config name="defaultPrefetchConfig"
                                       fetchSize="5"
                                       fetchTimeout="1000"
                                       frequency="4000">
    <anypoint-mq:connection url="${providerUrl}" clientId="${clientId}" clientSecret="${clientSecret}"/>
</anypoint-mq:default-subscriber-config>

<flow name="prefetchedListener">
    <anypoint-mq:subscriber config-ref="defaultPrefetchConfig" destination="${invoiceQueue}"/>

    <!-- Message processing-->
</flow>

この例では、API コール 1 回あたりに取得されるメッセージ量の削減によりバッファサイズが縮小され、デフォルトで保存される 30 個の代わりに、実質的にメッセージ 15 個のバッファを持つことになります。

fetchTimeoutfrequency などの他のパラメータは、キューが空の場合とコネクタがメッセージの到着を待機中である場合のサブスクライバの動作を変更します。

各パラメータの機能についての詳細は、「コネクタリファレンス」を参照してください。

バッファ、肯定応答、肯定応答タイムアウト

バッファに入っているとき、メッセージはブローカーのインフライトとして保持されるため、メッセージがフローにディスパッチされる必要がある限り、またはサブスクライバが停止されてバッファがクリアされるまで、再配信は発生しません。 ディスパッチされると、メッセージは acknowledgementTimeout が経過するまでインフライトのままになります。

デフォルトでは、コンシュームされたメッセージは、メッセージを受信するフローの実行が正常に完了した場合にのみ肯定応答されます。フローの実行中にエラーが発生した場合、セッションは回復し、メッセージが再配信されます。

メッセージ ACK についての詳細は、「メッセージ肯定応答の処理方法」を参照してください。

新規メッセージのポーリング

各サブスクライバによってコンシュームされるメッセージ数をより詳細に制御するには、*ポーリング*設定を使用してサービスからメッセージを固定レートでポーリングすることができます。

この動作は、fetchSize を 0 に設定して prefetch を実質的に無効にすることで得られます。ポーリングモードを使用しているとき、コネクタは常に要求 1 つあたりメッセージ 10 個をフェッチしようと試みます。各要求は pollingTime で定義された固定レートで行われます。重複する要求は実行されず、ポーリング 1 回あたり 1 つの要求がサービスによって処理されます。

固定レートでキューから新規メッセージをリスンする構文は、次のとおりです。

<anypoint-mq:default-subscriber-config name="pollingConfig"
                                       fetchSize="0"
                                       pollingTime="1000"
                                       acknowledgementTimeout="5000">
    <anypoint-mq:connection url="${providerUrl}"
                                      clientId="${clientId}"
                                      clientSecret="${clientSecret}"/>
</anypoint-mq:default-subscriber-config>

<flow name="prefetchedListener">
    <anypoint-mq:subscriber config-ref="pollingConfig" destination="${invoiceQueue}"/>

    <!-- Message processing-->
</flow>

このソースは、宛先によって識別されたキューから 1 秒毎にメッセージ 10 個の取得を試み、各メッセージを MuleMessage としてフローにディスパッチします。この場合、メッセージは 5 秒間インフライトのままです。

Mule メッセージは次で構成されます。

  • メッセージのペイロードとしてのコンテンツ。

  • メッセージ属性に含まれるメッセージのメタデータ。

デフォルトでは、コンシュームされたメッセージは、メッセージを受信するフローの実行が正常に完了した場合にのみ肯定応答されます。 フローの実行中にエラーが発生した場合、セッションは回復し、メッセージが再配信されます。

メッセージ ACK についての詳細は、「メッセージ肯定応答の処理方法」を参照してください。

サーキットブレーカ機能

バージョン 2.1.0 から、MQ コネクタサブスクライバでは標準でサーキットブレーカ機能が提供され、コンシュームしたメッセージの処理中にエラーが発生した場合、コネクタでどのように対処するかをより詳細に制御できます。

サーキットブレーカ

サービスに接続する必要があるシナリオでは、外部サービスが失敗した場合どうなるかを検討します。外部サービスのダウンタイムへの一般的な対処パターンはサーキットブレーカです。この機能では、失敗する可能性が高い要求を停止したり、負荷を減らして外部サービスを回復したりできます。 サーキットブレーカは、3 つの状態 (クローズド、オープン、ハーフオープン) を経て、現在の状態に基づいてアプリケーションの動作を変更します。詳細は、 「Microsoft Circuit Breaker Pattern (Microsoft サーキットブレーカーパターン)」 を参照してください。

Mule 4 アプリケーションでは、Anypoint MQ の使用は、キューからメッセージをコンシュームし、外部サービスを使用して処理を行う、MQ サブスクライバを使用する Mule フローがあることを意味します。このサービスを利用できず要求が失敗すると、エラーが伝達され、メッセージの処理は失敗に終わるか、DLQ にメッセージを送信するなど、メッセージ処理のカスタムエラーで終了します。 外部サービスを利用できない場合、メッセージを処理するすべての試みは失敗し、アプリケーションは成功しないメッセージのコンシュームを繰り返し実行し続けます。これは、一定期間メッセージをさらにコンシュームしないように、サブスクライバにエラーが通知されれば回避できます。

サーキットブレーカプロセス

anypoint-mq:subscriber によって提供されるサーキットブレーカ機能は、Mule が標準で提供するエラー処理メカニズムにバインドされており、エラー通知メカニズムを使用してサーキット障害と呼ばれる外部サービスに関連するエラー数を記録します。サーキット障害にはあらゆるエラーをバインドできます。たとえば、HTTP:TIMEOUTFTP:SERVICE_NOT_AVAILABLE、さらには ORG:EXTERNAL_ERROR のようなアプリケーションのカスタムエラーもバインドできます。

Mule フローの実行がエラーに終わった場合、サブスクライバはエラーが外部サービスエラーを示す onErrorTypes のいずれかであるかどうかをチェックし、errorsThreshold に達するまで、連続発生数をカウントします。errorsThreshold に達すると、サーキットがトリップし、設定可能な tripTimeout の期間中、新規メッセージのポーリングを停止します。tripTimeout の経過後、メッセージは次回のポーリングで再びコンシュームされます。 デフォルトでは、サーキットブレーク機能は無効になっています。

サーキットブレーカの状態

mq subscriber states view
  • クローズド

    通常サブスクライバが設定に基づいて MQ からメッセージを取得する開始状態。実質的にサーキットブレーカが存在しないかのように機能します。

  • クローズドからオープンへの遷移

    メッセージの処理中に失敗が連続して発生し、その間まったく成功がなく、errorsThreshold 値に達すると、サーキットブレーカがトリップし、オープン状態に変わります。

    フローにすでにディスパッチされたメッセージは、結果が成功か失敗かに関係なく処理を完了します。

    ブローカーのインフライトであるが、まだディスパッチされておらず、ローカルに保持されているメッセージは、否定応答され、別のコンシューマに再配信するためにキューに戻されます。

  • オープン

    サブスクライバはメッセージの取得を試みず、tripTimeout に達するまで通知せずにイテレーションをスキップします。

  • ハーフオープン

    tripTimeout の経過後、サブスクライバはハーフオープン状態になります。つまり、次回のメッセージのポーリングで、通常のクローズド状態に戻る前にサービスからメッセージを 1 つ取得し、そのメッセージを使用してシステムが回復したかどうかをチェックします。

    1 つのメッセージが正常にフェッチされ、フローにディスパッチされ、処理が正常に完了すると、サブスクライバは通常の状態に戻り、すぐに追加のメッセージのフェッチを試みます。

    Mule フローの処理が、予期される onErrorTypes のいずれかで失敗すると、サーキットはオープン状態に戻り、tripTimeout タイマーがリセットされます。

サーキットブレーカの設定

anypoint-mq:default-subscriber-config の一部としてサーキットブレーカを設定できます。

Anypoint Studio の [Advanced (詳細)] タブで、[Circuit Breaker group (サーキットブレーカグループ)] を有効にし、必要に応じて次の項目に入力します。

  • onErrorTypes - フローの実行中に発生して、サーキットで障害としてカウントされるエラーの種類。エラーの発生は、フローがエラーの伝達によって完了した場合にのみカウントされます。デフォルトでは、すべてのエラーがサーキット障害としてカウントされます。

  • errorsThreshold - サーキットブレーカがオープンになるのに発生する必要がある onErrorTypes エラーの回数。

  • tripTimeout - errorsThreshold に達してからサーキットがオープンのままである期間。

  • circuitName - この設定にバインドするサーキットブレーカの名前。デフォルトでは、各キューに独自のサーキットブレーカがあります。

mq subscriber cb config tab

1 つのサブスクライバのサーキット設定

キューからメッセージをコンシュームし、REST API を使用して別のサービスにメッセージを投稿する anypoint-mq:subscriber が 1 つあるシナリオの例では、外部サービスに 5 回要求した後、メッセージの処理を停止できます。そうなった後、この例では新しいメッセージで再試行する前に、サービスの回復を 30 秒待機します。

この場合、設定で次のサーキットブレーカパラメータを指定する必要があります。

<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
   	<anypoint-mq:connection url="${providerUrl}" clientId="${clientId}" clientSecret="${clientSecret}"/>
<anypoint-mq:circuit-breaker
           onErrorTypes="HTTP:TIMEOUT" (1)
           errorsThreshold="5" (2)
           tripTimeout="30" (3)
           tripTimeoutUnit="SECONDS"/>
</anypoint-mq:default-subscriber-config>

<flow name="subscribe">
   <anypoint-mq:subscriber config-ref="ConfigWithCircuit" destination="${subscriberQueue}"/> (4)
    <http:request config-ref="RequesterConfig" path="/external" method="POST"/> (5)
</flow>
1 サーキットをトリップさせるエラーの種類を設定します。エラーが errorsThreshold の回数発生すると、ポーリングが停止されます。
2 サーキットが障害状態にあるとみなされるために連続して発生する必要があるメッセージ数のしきい値を設定します。
3 errorsThreshold に達したためにサーキットブレーカがトリップした後、新しいメッセージのポーリングを再開する前に待機する期間を設定します。
4 サブスクライバで必要なのは、サーキットブレーカを使用する設定を参照することだけです。
5 onErrorTypes パラメータで予期されるエラーをスローする操作を定義します。

onErrorTypes パラメータにリストされないその他のエラーは、サーキットブレーカでは無視されることに留意してください。この例では、HTTP:BAD_REQUEST のようなエラーは無視されます。

さまざまなキューからサーキットを共有する

多くの場合、さまざまなキューからのメッセージを処理する 1 つの共通サービスがあります。この例では、両方のサブスクライバを 1 つのサーキットにバインドするよう circuitName パラメータを設定します。

<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
   	<anypoint-mq:connection url="${providerUrl}" clientId="${clientId}" clientSecret="${clientSecret}"/>
<anypoint-mq:circuit-breaker
           circuitName="InvoiceProcess" (1)
           onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" (2)
           errorsThreshold="10"
           tripTimeout="5"
           tripTimeoutUnit="MINUTES"/>
</anypoint-mq:default-subscriber-config>

<flow name="subscribe">
    <anypoint-mq:subscriber  config-ref="ConfigWithCircuit" destination="${reservationsQueue}"/> (3)
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber  config-ref="ConfigWithCircuit" destination="${paymentsQueue}"/> (3)
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}" config-ref="ftp-config"/> (4)
  <http:request config-ref="requestConfig" path="/external"/> (5)
</sub-flow>
1 共通のサーキットブレーカを複数のキューで共有するよう circuitName パラメータを設定します。
2 サブスクライバからのメッセージの処理に影響する可能性があるエラーを 2 つ考え、それぞれ CSV リストとして渡します。
3 両方のサブスクライバで、サーキットブレーカ設定を使用する設定を参照します。
4 このコンポーネントは、他の多数のエラーとともに FTP:RETRY_EXHAUSTED エラーをスローすることがあります。サーキットブレーカでは FTP:RETRY_EXHAUSTED エラーのみが考慮されます。
5 HTTP コネクタは HTTP:SERVICE_UNAVAILABLE をスローすることがあり、その場合はメッセージの処理が阻止されます。

このシナリオの場合、両方のサブスクライバで FTP:RETRY_EXHAUSTED および HTTP:SERVICE_UNAVAILABLE の両方のエラーをカウントして、その数が errorsThreshold="10" 値に達すると直ちにメッセージのポーリングが停止されます。tripTimeout が経過すると、一方のサブスクライバでメッセージをポーリングし、そのメッセージを使用してサーキットをテストします。そのメッセージの処理が成功すれば、両方のサブスクライバでポーリングが有効になります。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub