Anypoint MQ Subscriber ソース - Mule 4

Anypoint MQ Connector の Subscriber ソースは、アプリケーションで新しいメッセージをリスンし、宛先に到達したメッセージをコンシュームできます。さまざまなリスニング戦略を設定し、パフォーマンス、予測可能性、スケジュールに合わせてコンシュームを調整できます。

最大メッセージスループット

デフォルトでは、Subscriber ソースは継続的リスニングモードで機能し、キューに到達するとすぐにメッセージをコンシュームしてスループットを最大化します。これは、​prefetch​ subscriber-type 設定と呼ばれます。​prefetch​ モードを使用する場合、Subscriber ソースはメッセージのローカルバッファが常にフルになるように試みます。これにより、アプリケーションでメッセージを受け入れ可能になり次第、Mule フローにディスパッチできます。

プリフェッチサブスクライバー種別
Figure 1. 矢印は、​[Subscriber (サブスクライバー)]​ プロパティウィンドウで ​[Prefetch (プリフェッチ)]​ に設定された ​[Subscriber type (サブスクライバー種別)]​ を示しています。

prefetch​ subscriber-type 設定で、​maxLocalMessages​ パラメーターを設定してフルバッファの対象サイズを指定し、どれだけ迅速にキューからメッセージが取得されるのかを制御します。

<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
</anypoint-mq:subscriber>
  • より大きなバッファサイズを使用してパフォーマンスを最大化する。他のコンシューマーはローカルで保持されるメッセージを使用できないことに留意してください。

    コンシューマーがキューからメッセージを取得してローカルバッファに保存すると、メッセージはインフライトメッセージとして他のコンシューマーに表示され、他のコンシューマーはコンシュームできなくなります。メッセージは、アプリケーションで処理を開始するまで必要に応じてこのインフライト状態のままになります。

  • より小さなバッファサイズを使用して、キューにパブリッシュされたときにメッセージを処理する。

    バッファを小さくすると、アプリケーションレベルのスループットが制限され、競合するコンシューマーがブロックされることを回避できます。

FIFO キューとプリフェッチモード

prefetch​ モードを使用する場合、​acknowledgementTimeout​ タイマーは、メッセージがキューから取得されたときに開始されます。 Anypoint MQ は、​acknowledgementTimeout​ で指定された有効期限 (時間) を順守しようとします。 ただし、メッセージがバッファに保持されている時間がタイムアウト値の 80% に達すると、Anypoint MQ は有効期限タイムアウトを延長します。

次のような状況では、​prefetch​ モードを使用しないでください。

  • maxConcurrency​ 属性を使用して同時処理を制限している、処理時間が長いアプリケーション。

    これらのオプションにより、メッセージがバッファに長く残る可能性があり、その結果メッセージが期限切れになり、キューに送り返される可能性があります。

  • FIFO キューを使用している場合

    prefetch​ モードを使用すると、Anypoint MQ が順序を保証できないため、FIFO キューでは使用しないでください。

    処理時間の長いアプリケーションで ​prefetch​ モードを使用すると、Anypoint MQ はローカルバッファにあるすべてのメッセージの ACK TTL を監視し、メッセージがキューに送り返されないように TTL を延長するため、大量のリソースがコンシュームされる可能性があります。

予測可能なメッセージのコンシューム

アプリケーションで最大スループットではなく、メッセージのコンシュームを予測して制御する必要がある場合、ポーリングソースとして Subscriber を設定できます。Subscriber ソースは、固定スケジュールレートでキューの新規メッセージをチェックします。ポーリングがトリガーされるたびに、Subscriber ソースは 1 ~ 10 個のメッセージを取得して、フローに 1 つずつ順番にディスパッチします。

メッセージは、すぐにコンシュームされません。代わりに、Subscriber ソースの次のポーリングがトリガーされるまでキューに保持されます。キューには、取得されるメッセージが最大 10 個含まれます。たとえば、キューに 15 個のメッセージがある場合、1 つのポーリングで最初の 10 個のメッセージがコンシュームされ、残りの 5 個のメッセージは次のポーリングを待機します。

Subscriber をポーリングソースとして設定するには、​polling​ subscriber-type を指定します。

デフォルトの ​polling​ subscriber-type の宣言は次のようになります。

ポーリングサブスクライバー種別
Figure 2. 矢印は、​[Subscriber (サブスクライバー)]​ プロパティウィンドウで ​[Polling (ポーリング)]​ に設定された ​[Subscriber type (サブスクライバー種別)]​ を示しています。
<anypoint-mq:subscriber doc:name="Subscriber" config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type >
    <anypoint-mq:polling >
      <scheduling-strategy >
        <fixed-frequency />
      </scheduling-strategy>
    </anypoint-mq:polling>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

デフォルトの ​polling​ subscriber-type の宣言では、宣言された宛先から 1 秒ごとにメッセージ 10 個の取得を試み、取得したメッセージを ​MuleMessage​ インスタンスとして 1 つずつフローにディスパッチします。

結果の ​MuleMessage​ には、以下が含まれます。

  • メッセージのペイロードとしての本文

  • メッセージ属性に含まれるメッセージのメタデータ

メッセージ出力のペイロードと属性
Figure 3. このスクリーンショットは、​[Output (出力)]​ タブのメッセージの (​1​) ​[Payload (ペイロード)]​ と (​2​) ​[Attributes (属性)]​ を示しています。

標準 Mule Runtime Engine スケジュール戦略を使用して、Subscriber ソースのポーリング戦略をカスタマイズできます。

カスタマイズされた固定ポーリング

Subscriber ソースの新規メッセージのポーリング頻度はカスタマイズできます。これを行うには、カスタマイズされた ​fixed-frequency​ スケジュール戦略を使用して、​polling​ subscriber-type 戦略を宣言します。

デフォルトの ​fixed-frequency​ スケジュール戦略宣言は次のようになります。

一定間隔のスケジュール戦略
Figure 4. 矢印は、​[Subscriber (サブスクライバー)]​ プロパティウィンドウで ​[Fixed Frequency (一定間隔)]​ に設定された ​[Scheduling Strategy (スケジュール戦略)]​ を示しています。
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type >
    <anypoint-mq:polling>
      <scheduling-strategy >
        <fixed-frequency/>
      </scheduling-strategy>
    </anypoint-mq:polling>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

cron ベースのポーリング

Anypoint MQ サブスクライバーは、​cron​ スケジュール戦略を使用することもできます。このスケジュール戦略を使用すると、「毎日午後 2 時から 2 時 59 分まで、毎分実行」のようなジョブのスケジュールが可能になります。このスケジュール戦略を使用するには、カスタマイズされた ​cron​ スケジュール戦略を使用して ​polling​ subscriber-type 戦略を宣言します。

Cron スケジュール戦略
Figure 5. 矢印は、​[Subscriber (サブスクライバー)]​ プロパティウィンドウで ​[Cron]​ に設定された ​[Scheduling Strategy (スケジュール戦略)]​ を示しています。
<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config" destination="myQueue">
  <anypoint-mq:subscriber-type >
    <anypoint-mq:polling >
      <scheduling-strategy >
        <cron expression="0 * 14 * * ?" timeZone="America/Los_Angeles" />
      </scheduling-strategy>
    </anypoint-mq:polling>
  </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

メッセージ肯定応答

フローの実行が正常に完了した場合や、処理前にアプリケーションがメッセージをコンシュームした場合にのみ受信したメッセージに自動的に肯定応答できます。

  • 自動

    デフォルトでは、Subscriber ソースは自動肯定応答モードを使用します。このモードでは、メッセージフローの処理が成功した後で Subscriber ソースが取得するメッセージが自動的に肯定応答されます。つまり、Subscriber ソースはメッセージを取得してフローにディスパッチし、どのようにメッセージの処理が完了するのかを確認するために待機します。例外なしで処理が完了した場合にのみ ACK (肯定応答) を実行します。

    例外が伝播されて処理フローの実行が完了すると、メッセージは自動的に肯定応答されず、再配信のためにキューに返されます。

    詳細は、​「自動肯定応答」​を参照してください。

  • 即時

    即時肯定応答モードを使用する場合、コンシュームされたメッセージが Mule フローにディスパッチされる直前に肯定応答されます。メッセージの肯定応答に失敗すると、メッセージは破棄されます。メッセージはフローにディスパッチされず、肯定応答がタイムアウトするまでインフライトのままになります。

    IMMEDIATE 肯定応答モードでプリフェッチモードの Subscriber ソースを使用すると、スレッドが累積したり、アプリケーションが無応答になったりする場合があります。

    詳細は、​「即時肯定応答」​を参照してください。

  • 手動

    手動肯定応答モードを使用する場合、​ACK​ または ​NACK​ ソースを使用して、メッセージの肯定応答を実行するタイミングをアプリケーションロジックで決定します。

    手動肯定応答を実行するには、結果のメッセージ属性の一部として提供される ​ackToken​ の値が必要です。

    詳細は、​「手動肯定応答」​を参照してください。

肯定応答タイムアウトについての詳細は、​「肯定応答タイムアウト」​を参照してください。

サーキットブレーカー機能

Subscriber ソースは、サーキットブレーク機能を提供します。この機能を使用すると、コンシュームされたメッセージの処理中に発生するエラーをコネクタでどのように処理するのかを制御できます。

たとえば、外部サービスに接続している場合、サーキットブレーカーを使用してそのサービスのダウンタイムを処理できます。サーキットブレーカーでは、要求の実行を停止したり、負荷を減らして外部サービスを回復したりできます。

Mule 4 アプリケーションでの Anypoint MQ の使用は、キューからメッセージをコンシュームし、外部サービスを使用して処理を行う、MQ サブスクライバーを使用する Mule フローがあることを意味します。このサービスが使用できない場合、次のようになります。

  1. 要求が失敗する。

  2. エラーが発生する。

  3. メッセージの処理が失敗に終わるか、デッドレターキュー (DLQ) にメッセージを送信するなど、メッセージ処理のカスタムエラーで終了します。

外部サービスを利用できない場合、メッセージを処理するすべての試みは失敗し、アプリケーションは成功しないメッセージのコンシュームを繰り返し実行し続けます。この動作は、一定期間メッセージをさらにコンシュームしないように、サブスクライバーにエラーを通知すれば回避できます。

サーキットブレーカープロセス

Subscriber ソースによって提供されるサーキットブレーカー機能は、Mule で提供されるエラー処理メカニズムにバインドされています。エラー通知メカニズムを使用してサーキット障害と呼ばれる外部サービスに関連するエラー数を記録します。サーキット障害にはあらゆるエラーをバインドできます。たとえば、​HTTP:TIMEOUT​ や ​FTP:SERVICE_NOT_AVAILABLE​、さらには ​ORG:EXTERNAL_ERROR​ のようなアプリケーションのカスタムエラーもバインドできます。

Mule フローの実行がエラーに終わった場合、Subscriber ソースはエラーが外部サービスエラーを示す ​onErrorTypes​ のいずれかであるかどうかをチェックし、​errorsThreshold​ に達するまで、連続発生数を数えます。

errorsThreshold​ に達すると、サーキットがトリップし、​tripTimeout​ によって指定される期間中、オープン状態のままになります。 サーキットがオープン状態の間、ポーリングまたはコンシュームされたすべてのメッセージは肯定応答 (NACK) されず、Anypoint MQ は ​tripTimeout​ が経過するまでその後の操作 (ポーリング、ACK、NACK、パブリッシュ) を停止します。

tripTimeout​ が経過すると、サーキットブレーカーは新しい​状態​に移行します。 新しい状態が ​[Closed (クローズ)]​ の場合、クライアントは次のポーリングでメッセージをコンシュームできます。

アプリケーションを CloudHub にデプロイするワーカーが 1 つか複数かによって、この動作は異なります。

1 つのワーカー

ワーカーはメッセージをキューから個別に処理し、エラー数が ​errorsThreshold​ の値に達するとサーキットブレーカーをオープン状態にします。

複数のワーカー

各ワーカーに関連付けられたサーキットブレーカーがあり、​errorsThreshold​ と ​tripTimeout​ の値を個別に追跡します。 各ワーカーはメッセージをキューから個別に処理し、エラー数がそのワーカーで設定された ​errorsThreshold​ の値に達するとサーキットブレーカーをオープン状態にします。

デフォルトでは、サーキットブレーク機能は無効になっています。

サーキットブレーカーの状態

サーキットブレーカーは、3 つの状態 (クローズド、オープン、ハーフオープン) があります。アプリケーションの動作は、現在の状態に基づいて変わります。詳細は、 「Microsoft Circuit Breaker pattern (Microsoft サーキットブレーカーパターン)」​を参照してください。

サーキットブレーカーの状態図
  • クローズド

    通常 Subscriber ソースが設定に基づいて MQ からメッセージを取得する開始状態。実質的にサーキットブレーカーが存在しないかのように機能します。

  • クローズドからオープンへの遷移

    メッセージの処理中に成功することなく失敗が連続して発生し、​errorsThreshold​ 値に達すると、サーキットブレーカーがトリップし、オープン状態に遷移します。

    フローにすでにディスパッチされたメッセージは、結果が成功か失敗かに関係なく処理を完了します。

    ブローカーのインフライトであるが、まだディスパッチされておらず、ローカルに保持されているメッセージは、肯定応答されず、別のコンシューマーに再配信するためにキューに戻されます。

  • オープン

    Subscriber ソースはメッセージの取得を試みず、​tripTimeout​ に達するまで通知せずにイテレーションをスキップします。

  • ハーフオープン

    tripTimeout​ の経過後、Subscriber ソースはハーフオープン状態になります。次回のメッセージのポーリングで、Subscriber ソースは通常のクローズド状態に戻る前にサービスからメッセージを 1 つ取得し、そのメッセージを使用してシステムが回復したかどうかをチェックします。

    Subscriber ソースが 1 つのメッセージを正常にフェッチしてフローにディスパッチし、処理が正常に完了すると、Subscriber ソースは通常の状態に戻り、すぐに追加のメッセージのフェッチを試みます。

    Mule フローの処理が、予期される ​onErrorTypes​ のいずれかで失敗すると、サーキットはオープン状態に戻り、​tripTimeout​ タイマーがリセットされます。

サーキットブレーカーの設定

サーキットブレーカーは、グローバルサーキットブレーカーまたは非公開サーキットブレーカーとして設定できます。

どちらの方法でも、設定パラメーターは同じです。

  • onErrorTypes

    フローの実行中に障害とみなされるエラー種別。エラーの発生は、フローがエラーの伝達によって完了した場合にのみ計数されます。デフォルトでは、すべてのエラーがサーキット障害とみなされます。

  • errorsThreshold

    サーキットブレーカーがオープンになるのに発生する必要がある ​onErrorTypes​ エラーの回数。

  • tripTimeout

    errorsThreshold​ に達してからサーキットがオープンのままである期間。

  • circuitName

    この設定にバインドするサーキットブレーカーの名前。デフォルトでは、各キューに独自のサーキットブレーカーがあります。

グローバルサーキットブレーカー

各サブスクライバーが同じ「サーキット」の一部であるかのようにサーキットの状態を複数のサブスクライバーで共有する場合、グローバルサーキットブレーカーを使用します。

  1. Anypoint Studio で、キャンバスの ​[Global Elements (グローバル要素)]​ タブをクリックします。

  2. [Create (作成)] > [Component Configuration (コンポーネント設定)] > [Circuit Breaker (サーキットブレーカー)]​ を選択します。

    「Global Element Properties (グローバル要素のプロパティ)」 ウィンドウのサーキットブレーカー設定のプロパティ
    Figure 6. このスクリーンショットは、​[Global Element Properties (グローバル要素のプロパティ)]​ ウィンドウのサーキットブレーカー設定のプロパティを示しています。

    設定ウィザードで、必要に応じて次の項目を入力します。設定が完了したら、Anypoint MQ サブスクライバーからこの ​Circuit_breaker​ 宣言を参照できます。

サーキットブレーカーを参照する手順は、次のとおりです。

  1. キャンバスで Subscriber ソースを選択します。

  2. [Advanced (詳細)]​ タブをクリックします。

  3. [Circuit Breaker (サーキットブレーカー)] > [Global Reference (グローバル参照)]​ を選択し、リストからグローバルサーキットブレーカー設定を選択します。

    「Subscriber (サブスクライバー)」 プロパティウィンドウの 「Advanced (詳細)」 タブのサーキットブレーカーグローバル参照
    Figure 7. 矢印は、[Subscriber (サブスクライバー)] プロパティウィンドウの ​[Advanced (詳細)]​ タブのサーキットブレーカーグローバル参照を示しています。

非公開サーキットブレーカー

非公開サーキットブレーカーは、1 つのサブスクライバーで内部的に宣言します。このサーキットの宣言は、Subscriber ソースが宣言されているフローでみ使用され、他のすべてのサーキットから分離されています。

この設定を使用する手順は、次のとおりです。

  1. キャンバスで Subscriber ソースを選択します。

  2. [Advanced (詳細)]​ タブをクリックします。

  3. [Circuit breaker (サーキットブレーカー)] > [Edit Inline (インライン編集)]​ を選択し、項目に入力します。

    「Subscriber (サブスクライバー)」 プロパティウィンドウの 「Advanced (詳細)」 タブで 「Edit inline (インライン編集)」 が選択されたサーキットブレーカー
    Figure 8. 矢印は、[Subscriber (サブスクライバー)] プロパティウィンドウの ​[Advanced (詳細)]​ タブで ​[Edit inline (インライン編集)]​ が選択されたサーキットブレーカーを示しています。

サーキットブレーカーの例

1 つのサブスクライバーのサーキット設定

この例では、1 つのサブスクライバーがキューからメッセージをコンシュームし、REST API を使用して別のサービスにメッセージを投稿します。外部サービスに 5 回要求してタイムアウトした後、メッセージの処理を停止できます。処理が停止したら、Subscriber ソースは新しいメッセージで再試行する前に、サービスの回復を 30 秒待機します。

この例では、設定で次のサーキットブレーカーパラメーターを指定する必要があります。

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
    clientId="${clientId}"
    clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<flow name="subscribe">
   <anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
      destination="${subscribedQueue}">
        <anypoint-mq:circuit-breaker
          onErrorTypes="HTTP:TIMEOUT"   <!-- (1) -->
          errorsThreshold="5"           <!-- (2) -->
          tripTimeout="30"              <!-- (3) -->
          tripTimeoutUnit="SECONDS"/>
   </anypoint-mq:subscriber>
    <http:request config-ref="RequesterConfig"
       path="/external" method="POST"/> <!-- (4) -->
</flow>

(1)

サーキットをトリップさせるエラー種別を設定します。エラーが ​errorsThreshold​ の回数発生すると、ポーリングが停止されます。

(2)

サーキットが障害状態にあるとみなされるために連続して発生する必要があるメッセージ数のしきい値を設定します。

(3)

errorsThreshold​ に達したためにサーキットブレーカーがトリップした後、新しいメッセージのポーリングを再開する前に待機する期間を指定します。

(4)

onErrorTypes​ パラメーターで予期されるエラーをスローする操作を定義します。

サーキットブレーカーでは、​onErrorTypes​ パラメーターにリストされていないすべてのエラーが無視されます。この例では、​HTTP:BAD_REQUEST​ などのエラーがサーキットブレーカーで無視されます。

さまざまなキューからのサーキットの共有

多くの場合、1 つの共通サービスでさまざまなキューからのメッセージを処理します。この例では、両方のサブスクライバーを 1 つのサーキットにバインドするよう ​circuitName​ パラメーターを設定します。

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
       clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<anypoint-mq:circuit-breaker
    name="InvoiceProcess"                    <!-- (1) -->
    onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE" <!-- (2) -->
    errorsThreshold="10"
    tripTimeout="5"
    tripTimeoutUnit="MINUTES"/>

<flow name="subscribe">
    <anypoint-mq:subscriber destination="${reservationsQueue}"
       config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>     <!-- (3) -->
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
      destination="${paymentsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>    <!-- (3) -->
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}"          <!-- (4) -->
	   config-ref="ftp-config"/>
  <http:request config-ref="requestConfig"  <!-- (5) -->
	   path="/external"/>
</sub-flow>

(1)

共通のサーキットブレーカーを複数のキューで共有するよう ​name​ パラメーターを設定します。

(2)

Subscriber ソースからのメッセージの処理に影響する可能性がある 2 つのエラーを識別し、それぞれ CSV リストとして渡します。

(3)

両方のサブスクライバーで、グローバルサーキットブレーカー設定を参照します。

(4)

複数のエラーがスローされる場合もありますが、サーキットブレーカーに関連しているのは ​FTP:RETRY_EXHAUSTED​ のみです。

(5)

HTTP Connector は ​HTTP:SERVICE_UNAVAILABLE​ エラーをスローすることがあり、その場合はメッセージの処理が阻止されます。

このシナリオの場合、両方のサブスクライバーで ​FTP:RETRY_EXHAUSTED​ および ​HTTP:SERVICE_UNAVAILABLE​ の両方のエラーを数え、その数が ​errorsThreshold="10"​ 値に達すると直ちにメッセージのポーリングが停止されます。​tripTimeout​ 値が経過すると、一方のサブスクライバーでメッセージをポーリングし、そのメッセージを使用してサーキットをテストします。そのメッセージの処理が成功すれば、両方のサブスクライバーでポーリングが有効になります。

FIFO キュー

FIFO キューは、単一コンシューマーのシナリオに最適です。1 つのコンシューマーがメッセージにアクセスしている場合、その他すべてのコンシューマーは最初のバッチが処理されるまでブロックされます。すべてのインフライトメッセージが肯定応答されるか否定応答されるまで、メッセージは配信されません。

メッセージグループを使用すると、複数のコンシューマーが FIFO キュー内のメッセージに同時にアクセスできます。この場合、あるコンシューマーがグループ内のメッセージにアクセスしているときに、別のコンシューマーは別のグループ内のメッセージにアクセスします。メッセージの順序は、各メッセージグループ内で保持されます。

FIFO キューでは、メッセージグループ ID でメッセージを取得することはできません。

メッセージの処理順序を保持するには、FIFO キューからのメッセージをコンシュームするフローで ​maxConcurrency​ の値を ​1​ に設定します。

FIFO キューとクラスタリング

FIFO キューは、クラスター環境と非クラスター環境で同様に動作します。

FIFO キューは、指定された順序でメッセージをコンシュームします。メッセージがコンシュームされたら、その後のメッセージ処理を他のノードに分散できます。この場合、コンシューマーがメッセージを完全に処理する前に肯定応答すると、メッセージ処理中にメッセージの順序が変更される可能性があります。

  • オンプレミスの高可用性クラスタリング環境​では、デフォルトで Anypoint MQ Connector の Subscriber ソースがすべてのノードで実行されます。

    [Advanced (詳細)]​ タブの ​[Primary node only (プライマリノードのみ)]​ を選択して、プライマリノードとして実行するように動作を変更できます。

  • 複数のワーカーを使用する ​CloudHub​ では、すべてのワーカーがプライマリノードとして実行されます。

    この場合、アプリケーションを実行しているすべてのワーカーが同じ FIFO キューからコンシュームします。