Anypoint MQ Connector 3.x 移行ガイド - Mule 4

Anypoint MQ Connector バージョン 3.0 には、簡素化された設定、標準化されたエンドポイント、非ブロック機能などの新機能があります。

グローバル設定

このコネクタのバージョン 2.x では、​default-subscriber-config​ に次のパラメーターが含まれています。

<anypoint-mq:default-subscriber-config
    name="Anypoint_MQ_Default_subscriber"
    acknowledgementMode="IMMEDIATE"
    acknowledgementTimeout="10000" <!-- (1) -->
    maxRedelivery="2"              <!-- (2) -->
    pollingTime="2000"
    fetchSize="5"                  <!-- (3) -->
    fetchTimeout="5000"            <!-- (3) -->
    frequency="2000">              <!-- (3) -->
  <anypoint-mq:connection          <!-- (4) -->
    clientId="${client.id}"
    clientSecret="${client.secret}"/>
  <anypoint-mq:circuit-breaker     <!-- (5) -->
    circuitName="InvoiceSettings"
    onErrorTypes="JMS:TIMEOUT"
    errorsThreshold="10" tripTimeout="1"
    tripTimeoutUnit="MINUTES" />
</anypoint-mq:default-subscriber-config>

このコネクタのバージョン 3.x では、​config​ 要素に接続設定のみが含まれています。

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection
        clientId="${client.id}"
        clientSecret="${client.secret}" />
</anypoint-mq:config>

次の表に、2.x の ​config​ 要素とそれに対応する 3.x の動作を示します。

要素 バージョン 2.x の動作 対応するバージョン 3.x の動作

(1)​ - 肯定応答

Anypoint MQ Subscriber ソースのグローバル肯定応答設定。

サブスクライバーインスタンスごとに設定します。

(2)​ - 最大再配信数

redeliveryCount​ ヘッダーにある Anypoint MQ メッセージの再配信の検索条件。

非推奨になり、スタンドアロン検索条件として使用できなくなりました。代わりに ​"DLQ + Idempotent Filter"​ パターンを使用します。

(3)​ - メッセージフェッチ

Anypoint MQ サービスからメッセージを取得する方法に関連するすべてのパラメーター (​prefetch​ 設定など)。

Subscriber Type​ パラメーターで各サブスクライバーのすべてのフェッチ戦略パラメーターを設定します。

(4)​ - 接続

接続パラメーター。

すべての接続設定で同じ設定です。

(5)​ - サーキットブレーカー

Anypoint MQ Subscriber ソースのサーキットブレーカー設定。

各サーキットブレーカーインスタンスを、各 Anypoint MQ サブスクライバーで参照されるスタンドアロン要素、またはこれを宣言する Subscriber ソースの非公開サーキットブレーカーとして処理します。

Subscriber (サブスクライバー) リスナー

このコネクタのバージョン 3.x では、メッセージリスニングの ​Subscriber​ メッセージソース設定が変更されました。

新しいサブスクライバーについては、​「Anypoint MQ Subscriber ソース」​を参照してください。

メッセージ肯定応答

このコネクタのバージョン 2.x では、Subscriber ソースの肯定応答モードが設定レベルで定義されます。

<anypoint-mq:default-subscriber-config name="Anypoint_MQ_Default_subscriber"
       acknowledgementMode="MANUAL" acknowledgementTimeout="120000">
    <anypoint-mq:connection clientId="${client.id}" clientSecret="${client.secret}"/>
</anypoint-mq:default-subscriber-config>

バージョン 3.x では、これらのパラメーターは Subscriber ソース自体に含まれ、その 1 つのサブスクライバーインスタンスにのみ影響します。肯定応答パラメーターのセマンティクスは、変更されていません。 Subscriber ソースの対応する設定を次に示します。

<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
    destination="queue"
    acknowledgementMode="MANUAL"
    acknowledgementTimeout="2"
    acknowledgementTimeoutUnit="MINUTES"/>

プリフェッチモード

このコネクタのバージョン 2.x では、​prefetch​ モードがデフォルトで有効になり、関連するすべてのパラメーターを設定レベルで指定していました。

<anypoint-mq:default-subscriber-config
      name="Anypoint_MQ_Default_subscriber"
      pollingTime="2000" fetchSize="5"
      fetchTimeout="5000" frequency="2000">
    <anypoint-mq:connection clientId="${client.id}"
      clientSecret="${client.secret}"/>
</anypoint-mq:default-subscriber-config>

バージョン 3.x には、​prefetch​ 設定を制御する ​Subscriber Type​ という新しい概念があります。

<anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
    destination="queue">
   <anypoint-mq:subscriber-type>
     <anypoint-mq:prefetch maxLocalMessages="20"/>
   </anypoint-mq:subscriber-type>
</anypoint-mq:subscriber>

次の表に、2.x の ​prefetch​ パラメーターとそれに対応する 3.x のパラメーターを示します。

バージョン 2.x のパラメーター 説明 対応するバージョン 3.x の動作 説明

fetchSize

config​ レベルで宣言されます。​fetchSize​ 値を 3 倍して計算されるローカルメッセージバッファのサイズを制御します。

maxLocalMessages

Subscriber​ レベルで宣言されます。ローカルメッセージバッファに含めることができる最大サイズを明示的に制御します。このパラメーターは、バッファをフルにするために必要な数のメッセージをフェッチするときの目標値として機能します。

fetchTimeout

サービスからの応答を待機する時間を制御します。

非推奨

このパラメーターは使用できなくなりました。コネクタは、常にロングポーリングの最大値を使用します。

frequency

キューが空になった後のメッセージの再確認のタイミングを制御します。

非推奨

このパラメーターは使用できなくなりました。コネクタは、内部で頻度を処理します (1 秒で固定)。

pollingTime

fetchSize​ が ​0​ の場合にのみ使用されます。実質的には ​prefetch​ モードを無効にして ​polling​ モードを有効にします。

非推奨

このパラメーターは、より強力な ​polling​ subscriber type に置き換わっています。

ポーリングモード

Anypoint MQ Connector のバージョン 2.x では、​fetchSize​ パラメーターを ​0​ に設定し、​pollingTime​ を固定頻度のポーリングスケジューラーとして使用することで ​prefetch​ モードを無効化できました。

このコネクタのバージョン 3.x では、Subscriber ソースのポーリングモードが簡素化および正規化されています。Mule Runtime Engine で標準で提供されるスケジューラーを ​fixed-frequency​ または ​cron​ として使用できます。

  • fixed-frequency

    <anypoint-mq:subscriber config-ref="Anypoint_MQ_Config"
      destination="queue">
       <anypoint-mq:subscriber-type >
          <anypoint-mq:polling fetchSize="9">
             <scheduling-strategy >
                <fixed-frequency frequency="1" timeUnit="SECONDS" />
             </scheduling-strategy>
          </anypoint-mq:polling>
       </anypoint-mq:subscriber-type>
    </anypoint-mq:subscriber>
  • cron

    <anypoint-mq:subscriber destination="queue"
        config-ref="Anypoint_MQ_Config">
        <anypoint-mq:subscriber-type>
            <anypoint-mq:polling fetchSize="9">
                <scheduling-strategy>
                    <cron expression="0 * 14 * * ?"
                    timeZone="America/Los_Angeles"/>
                </scheduling-strategy>
            </anypoint-mq:polling>
        </anypoint-mq:subscriber-type>
    </anypoint-mq:subscriber>

次の表に、2.x の ​polling​ パラメーターとそれに対応する 3.x のパラメーターを示します。

バージョン 2.x のパラメーター 説明 対応するバージョン 3.x の動作 説明

fetchSize

config​ レベルで宣言されます。​fetchSize​ 値を 3 倍して計算されるローカルメッセージバッファのサイズを制御します。

fetchSize

Subscriber​ レベルで宣言され、ポーリングの実行ごとにフェッチするメッセージの最大数 (1 ~ 10) を設定します。 デフォルトは 10 です。

fetchTimeout

サービスからの応答の待機時間を制限します。

非推奨

このパラメーターは使用できなくなりました。コネクタは、常にロングポーリングの最大値を使用します。

frequency

キューが空になった後のメッセージの再確認のタイミングを制御します。

非推奨

このパラメーターは使用できなくなりました。コネクタは、1 秒の一定間隔を使用します。

pollingTime

fetchSize​ が ​0​ の場合にのみ使用されます。実質的には ​prefetch​ モードを無効にして ​polling​ モードを有効にします。

非推奨

このパラメーターは、より強力な ​polling​ subscriber type に置き換わっています。

サーキットブレーカー

サーキットブレーカーの宣言は ​default-subscriber-config​ で行われなくなり、グローバル要素宣言またはインライン宣言のいずれかになります。

このコネクタのバージョン 2.x では、サブスクライバー設定の一部としてサーキットブレーカーを宣言し、複数の Subscriber ソースで参照していました。

<anypoint-mq:default-subscriber-config name="ConfigWithCircuit" >
     <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
      clientSecret="${clientSecret}"/>
   <anypoint-mq:circuit-breaker
       circuitName="InvoiceProcess"
       onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE"
       errorsThreshold="10"
       tripTimeout="5"
       tripTimeoutUnit="MINUTES"/>
</anypoint-mq:default-subscriber-config>

<flow name="subscribe">
    <anypoint-mq:subscriber
        config-ref="ConfigWithCircuit"
        destination="${reservationsQueue}"/>
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
        config-ref="ConfigWithCircuit"
        destination="${paymentsQueue}"/>
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}" config-ref="ftp-config"/>
  <http:request config-ref="requestConfig" path="/external"/>
</sub-flow>

このコネクタのバージョン 3.x では、1 つのスタンドアロングローバル要素を宣言し、各サブスクライバーからそのグローバル要素を参照します。接続をアプリケーションロジックサーキットにバインドすることはなくなりました。

<anypoint-mq:config name="Anypoint_MQ_Config">
    <anypoint-mq:connection url="${providerUrl}"
       clientId="${clientId}"
       clientSecret="${clientSecret}"/>
</anypoint-mq:config>

<anypoint-mq:circuit-breaker
    name="InvoiceProcess"
    onErrorTypes="FTP:RETRY_EXHAUSTED, HTTP:SERVICE_UNAVAILABLE"
    errorsThreshold="10"
    tripTimeout="5"
    tripTimeoutUnit="MINUTES"/>

<flow name="subscribe">
    <anypoint-mq:subscriber
        destination="${reservationsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>
    <flow-ref name="invoiceProcess">
</flow>

<flow name="otherSubscribe">
    <anypoint-mq:subscriber
        destination="${paymentsQueue}"
        config-ref="Anypoint_MQ_Config"
        circuitBreaker="GlobalCircuit"/>
    <flow-ref name="invoiceProcess">
</flow>

<sub-flow name="invoiceProcess">
  <ftp:write path="${auditFolder}" config-ref="ftp-config"/>
  <http:request config-ref="requestConfig" path="/external"/>
</sub-flow>

Publish 操作

このコネクタのバージョン 2.x では、Publish 操作のプロパティは次のように宣言されます。

<anypoint-mq:publish config-ref="Anypoint_MQ_Default_subscriber"
       destination="queue"
       messageId="#[vars.messageId]" sendContentType="false">
   <anypoint-mq:body >#[vars.messageBody]</anypoint-mq:body>
   <anypoint-mq:properties >
      <anypoint-mq:property key="MSG_TYPE" value="My Value"/>
   </anypoint-mq:properties>
</anypoint-mq:publish>

バージョン 3.x では、同じプロパティを動的なマップとして宣言するため、変換されたり、メッセージごとにキーが固定されずにキー数が動的になったりする可能性があります。

<anypoint-mq:publish config-ref="Anypoint_MQ_Config"
     destination="queue"
     messageId="#[vars.currentId]" sendContentType="false">
   <anypoint-mq:body >#[vars.messageBody]</anypoint-mq:body>
   <anypoint-mq:properties ><![CDATA[#[output application/java -----
{
   "MSG_TYPE" : vars.msgType
}]]]></anypoint-mq:properties>
</anypoint-mq:publish>

次の表に、2.x の Publish 操作とそれに対応する 3.x の操作を示します。

変更 バージョン 2.x バージョン 3.x

実行種別

Blocking

Non-Blocking

プロパティパラメーター

固定されたキー - 値のマップ

動的な「コンテンツ」パラメーターのマップ

詳細は、​「Anypoint MQ Publish 操作」​を参照してください。

Consume 操作

次の表に、2.x の Consume 操作とそれに対応する 3.x の操作を示します。

変更 バージョン 2.x バージョン 3.x

実行種別

Blocking

Non-Blocking

デフォルトの肯定応答モード

MANUAL

IMMEDIATE

詳細は、​「Anypoint MQ Consume 操作」​を参照してください。

ACK および NACK 操作

ACK および NACK 操作は、​ackToken​ 文字列を使用します。

ackToken​ は、​attributes​ 要素から取得できます。​ackToken​ 値は、​MANUAL​ 肯定応答モードのメッセージでのみ使用できます。

次の表に、2.x の ACK および NACK 操作とそれに対応する 3.x の操作を示します。

バージョン 2.x バージョン 3.x

messageContext​ パラメーターから受け取る操作の値は、特定のメッセージの ​attributes​ 要素である必要があります。

メッセージの ACK または NACK を実行するためのメッセージの ​ackToken​ 文字列。

詳細は、​「Anypoint MQ ACK および NACK 操作」​を参照してください。

プラットフォームの互換性

ソフトウェア バージョン

Mule Runtime Engine

4.1.1 以降

Anypoint Studio

v7 以降