Salesforce Connector 10.22 を使用したイベントの処理 - Mule 4

Salesforce 用 Anypoint Connector (Salesforce Connector) をインバウンドコネクタとして使用すれば、Salesforce からアプリケーションにデータをストリーミングできます。この方法でコネクタを使用するには、任意の種別のイベント (プラットフォームイベント、変更データキャプチャイベント、従来の PushTopic や汎用イベント) で次のリソースを使用できます。

  • Subscribe topic listener

  • Subscribe channel listener

  • Replay topic listener

  • Replay channel listener

プラットフォームイベントや変更データキャプチャイベントの新しいコネクタを追加する場合は、Pub/Sub API に基づく ​『Salesforce Pub/Sub Connector』​ を使用できます。

分散環境 (ランタイムクラスターなど) にアプリケーションをデプロイする場合、ソースはプライマリノードで実行される必要があります。ソースが複数のノードで実行されている場合、イベントが重複してコンシュームされたり、Salesforce Connector によって保存されたデータのオブジェクトストアデータが破損したりします。

フローを通過する各イベントには、変更された Salesforce データに関する情報 (データがいつどのように変更されたのかなど) が含まれています。

Salesforce はイベントを 72 時間 (従来の PushTopic や汎用イベントの場合は 24 時間) 保存します。トピックまたはチャネルへのサブスクライバーは、この保持期間中にそのトピックまたはチャネルに関連するイベントを取得できます。その後、サブスクライバーは、まだ有効期限が切れていない新しいイベントのみを取得できます。

Salesforce によって再生 ID が各ブロードキャストイベントに割り当てられ、これを使用して保存されたイベントメッセージを再生できます。詳細は、『Streaming API Developer Guide (ストリーミング API 開発者ガイド)』の 「Message Durability (メッセージの永続性)」Leaving the Site​を参照してください。

Salesforce インスタンスを Hyperforce に移行すると、無効な再生 ID エラーが発生する可能性があります。これは、Mule アプリケーションが、新しい Hyperforce インスタンスで無効になったオブジェクトストアから再生 ID にアクセスしようとした場合に発生します。このエラーを回避するには、この ナレッジ記事Leaving the Site​を参照してください。

始める前に

Salesforce Connector を使用してイベントを処理するには、以下があることを確認します。

  • Salesforce の対象リソースへのアクセス権

  • 組織で有効になっている必要な Salesforce ストリーミング API 権限

トピックに関連付けられている Salesforce の変更に関するイベントを受信するには、新しいトピックを作成するか、既存のトピックを使用します。

イベントでの Object Store の使用法

Salesforce Connector と Mule Runtime Engine (Mule) の両方で、オブジェクトストアを使用して自動的なメッセージの返信やメッセージの再配信などの機能に関するデータを保持します。

  • Mule アプリケーションのオンプレミスデプロイメント用の Mule に含まれるオブジェクトストアにトランザクション制限はありません。

  • Mule アプリケーションの CloudHub および CloudHub 2.0 デプロイメント用のオブジェクトストアの無料バージョンには 10 トランザクション/秒 (TPS) の制限がありますが、オブジェクトストアを内部的に使用する他の Anypoint Connector の制限は 100 TPS です。

Object Store のバージョンについての詳細は、 「Object Store に関するメモ」​を参照してください。

Replay Topic Listener ソースと Replay Channel Listener ソース

Replay topic listener​ ソースと ​Replay channel listener​ ソースは、アプリケーションの再起動前に受信した最後の再生 ID から続行できます。

Mule アプリケーションを初めて起動すると、データを保存するオブジェクトストアが作成されます。

  • 失敗したイベントがない場合、ソースは、最後に正常に処理されたイベント ID に関連付けられているイベントから開始されます。

  • 失敗したイベントが 1 つ以上ある場合、ソースは、失敗した最小の再生 ID に関連付けられているイベントから開始されます。

この変更データキャプチャをオブジェクトストアに保存することで、メッセージが 2 回処理されることがなくなり、アプリケーションを再起動したときに、失敗したメッセージを再処理することができます。ID はオブジェクトストアに 72 時間保存された後、自動的に削除されます。

コネクタには、イベント (処理に成功したイベントや処理に失敗したイベントなど) の再生 ID を保存するメモリ内構造が含まれています。 このメモリ内構造では、オブジェクトストアに保持するコネクタ設定のユーザー名を使用して、特定のキーを作成します。冗長性メカニズムでは、1 つのバックアップキーで同じデータを保存して、重複するイベントの処理を回避します。 各永続操作は、既存のキーの削除と新しい値の保存で構成されており、新しい値の保存に失敗すると、コネクタは再試行するため、キーあたり 2 TPS 以上をコンシュームします。

複数のワーカーが CloudHub にデプロイされている場合、​Replay topic listener​ ソースと ​Replay channel listener​ ソースでは、すべての Mule インスタンスで各メッセージが処理されます。この状況を回避するには、アプリケーションを 1 つのワーカーにデプロイし、共有キューを使用してメッセージを異なるワーカーに渡して処理できます。

最後の再生 ID から再開

Replay topic listener​ および ​Replay channel listener​ ソースの「最後の再生 ID から再開」機能を有効にするには、このソースが使用されているフローで正常に処理されたイベントに関する情報をコネクタで保存している必要があります。コネクタではオブジェクトストアを使用してこの情報が保存されます。

「最後の再生 ID から再開」機能で必要なデータをオブジェクトストアに保存する方法は、環境によって異なります。

  • Mule アプリケーションがローカルで実行されている場合、コネクタは永続的なストアをファイルシステムに作成します。

    アプリケーションが停止して再起動しても、または接続の問題が発生して再接続がトリガーされても、オブジェクトストアに保存されたデータにより、コネクタはメッセージが停止した位置でメッセージ処理を続行できます。

  • Mule アプリケーションをクラスターで実行する場合、同じマシン上のノードを使用しているか、異なるマシン上のノードを使用しているかにかかわらず、分散されたメモリ内オブジェクトストアが使用されます。

    • プライマリノード (メッセージをコンシュームしているノード) がシャットダウンしても、プライマリノードを引き継いだノードは同じメッセージを再処理しません。代わりに、ノードは、前のノードでメッセージ処理が停止された位置から続行します。

    • クラスターが停止すると、オブジェクトストアのデータは失われます。クラスターが再起動すると、ソースレベルの設定によりコネクタでメッセージを再処理するかどうかが決まります。

    再起動時にアプリケーションがイベントの処理を続行できるようにデータを保持するには、​「手動によるクラスターの作成および管理」​の説明に従って永続的なオブジェクトストアを作成します。

  • アプリケーションを CloudHub で実行しているときに、複数のワーカーを使用すると、すべてのワーカーがイベントを受信して処理します。

[Replay Sources (ソースを再生)] 項目の設定

異なる設定を使用してセットアップされた [replay sources (ソースを再生)] 項目の優先度を処理する方法について説明します。

  • Replay failed events if any or resume from last replay id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)

    有効になっている場合、​[Replay option (再生オプション)]​ 項目で選択された要素と ​[Replay id (再生 ID)]​ で入力された要素がこの項目で上書きされます。

    [Replay failed events if any or resume from last replay id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)]​ 項目では、オブジェクトストアをチェックして、失敗したイベント ID または最後に正常に処理されたイベント ID を確認します。失敗したイベントがある場合、最も小さい失敗したイベントから処理が開始されるため、​lowestFailedEventId​ から現在までのすべてのイベントを受信します。たとえば、​FailedEventIdInObjectStore​ が ​10​ で、​LastSuccessfullyProcessedEventIdInObjectStore​ が ​15​ の場合、処理は 10 から開始され、ストリーミングクライアントはイベント 10 ~ 15 を受信してクォータをコンシュームします。ただし、これらのイベントはフローで再処理されず、コネクタによって絞り込まれます。

    失敗したイベント ID が保存されていない場合、最も大きい既知の再生 ID から処理が開始されます。オブジェクトストアに何も保存されていない場合、​[Replay option (再生オプション)]​ 項目が使用されます。

  • ReplayOption.ALL

    -2​ でサブスクライブします。ユーザーは、Salesforce で使用可能なすべてのイベントを受信します。

  • ReplayOption.ONLY_NEW

    -1​ でサブスクライブします。ユーザーは、コネクタがサブスクライブした後に作成されたイベントを受信します。

  • ReplayOption.FROM_REPLAY_ID

    ユーザーが ​[Replay id (再生 ID)]​ 項目に入力した内容でサブスクライブします。

  • ReplayOption.FROM_LAST_REPLAY_ID

    オブジェクトストアをチェックして、受信した最も大きいイベント ID を確認します。イベントが正常に処理されたかどうかは関係ありません。処理された最も大きいイベント ID よりも小さい失敗したイベント ID でソースを開始する ​[Replay failed events if any or resume from last replay id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)]​ 項目と比較すると、このオプションではクォータのコンシュームを回避できます。

  • Cache events in memory (メモリにイベントをキャッシュ)

    有効になっている場合、コネクタはアプリケーションの開始時に再生 ID からサブスクライブします。Salesforce API は、サブスクライブに使用される再生 ID から開始されるすべてのイベントをプッシュするため、クォータがコンシュームされます。

    使用可能なイベントが多い場合、Mule アプリケーションの処理が遅れ、すべてのイベントを処理するのに時間がかかる可能性があります。

    このオプションが有効になっている場合、API によってプッシュされたイベントがメモリに保存され、受信した順序で順次コンシュームされます。

    接続の問題やトークンの期限切れがあり、コネクタが再サブスクライブする必要がある場合、この設定がないとイベントは失われ、クォータがコンシュームされます。

このコネクタは Object Store とやりとりして ​[Replay Option (再生オプション)]​ 項目が ​[FROM_LAST_REPLAY_ID]​ に設定されているか ​[Replay failed events if any or resume from last replay id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)]​ チェックボックスがオンになっている場合のみデータを保存して取得します。

CloudHub および CloudHub 2.0 で ​[Replay Option (再生オプション)]​ 項目が ​[FROM_LAST_REPLAY_ID]​ に設定されているか ​[Replay failed events if any or resume from last replay id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)]​ チェックボックスがオンになっている場合、最後の既知の再生 ID が Object Store で見つからない可能性があり、この場合はアプリケーションが再起動または再デプロイされたときにすべてのイベントが再生されます。これは、Object Store v2 に分散ロックメカニズムがないためです。詳細は、 「Last known replayId not found in ObjectStore (ObjectStore で最後の既知の replayId が見つからない)」Leaving the Site​を参照してください。

同じソースを使用する複数のフロー

同じチャネルまたはトピックでイベントをリスンする 1 つのソースを複数のフローで使用すると、そのイベントはユーザーのクォータから 2 回以上コンシュームされます。これを回避するには、代わりにそのロジックを 1 つのフローに実装することを考慮します。

複数のソースで同じイベントをリスンする場合、​[Resume from the Last Replay Id (最後の再生 ID から再開)]​ オプションが有効になっていると、データの破損やイベントの消失が発生する可能性があります。

異なるソースを使用する複数のフロー

異なるソースを使用する複数のフローがある場合、​[Replay Failed Events If Any or Resume from Last Replay Id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)]​ オプションが有効になっていると、特定の条件でパフォーマンスに影響する可能性があります。

パフォーマンスへの影響を回避するには、Salesforce ユーザー名ごとに異なる Salesforce Connector 設定を使用します。

設定のユーザー名を変更すると、最終処理された再生 ID や失敗した再生 ID がコネクタで読み込まれなくなります。処理は、Salesforce で使用できる最初のイベントから再開します。
個人ユーザーアカウントを使用して Salesforce にアクセスしないでください。

プラットフォームイベントを操作する

Salesforce Connector を使用して、プラットフォームイベントをサブスクライブします。次の例は、 「プラットフォームイベントの定義およびパブリッシュ」Leaving the Site​ Trailhead のプラットフォームイベントに基づきます。

Salesforce Connector を使用して Salesforce からプラットフォームイベントを受信する手順は、次のとおりです。

  1. Salesforce にログインし、​Cloud_News__e​ などの Salesforce プラットフォームイベントを作成します。

  2. Anypoint Studio に移動して Mule アプリケーションを作成します。

  3. [Subscribe Channel Listener]​ ソースをキャンバスにドラッグし、​[ストリーミングチャネル]​ 項目で ​/event/Cloud_News__e​ を指定します。

  4. Logger​ コンポーネントをキャンバスにドラッグしてペイロードを表示し、次の例のように受信したメッセージをキャンバスで確認できるようにします。

    <flow name="SampleFlow" doc:id="d25ff96a-aec9-45ee-89f2-74080fb83b45" >
      <salesforce:subscribe-channel doc:name="Subscribe channel" doc:id="cb21f452-9280-41f8-ba52-93c49a03ea38" config-ref="Salesforce_Config"
       streamingChannel="/event/Cloud_News__e"/>
      <logger level="INFO" doc:name="Logger" doc:id="5ebd77bf-87de-4f55-ab81-2af3abbe2bee" message="#[payload]"/>
    </flow>
    xml
  5. プラットフォームイベントメッセージを ​Cloud_News__e​ にパブリッシュします。Apex コード、プロセス、Salesforce フロー、または Salesforce API を使用できます。たとえば、次のように POST 要求を送信できます。

    {
       "Location__c" : "Mountain City",
       "Urgent__c" : true,
       "News_Content__c" : "Lake Road is closed due to mudslides."
    }
    text

    コンソールには次のように表示されます。

    INFO  2019-10-26 16:11:50,483 [[MuleRuntime].cpuLight.05: [test].SampleFlow.CPU_LITE @2b42bef0]
    [event: e00096e0-f7bf-11e9-b534-8c85907d741e] org.mule.runtime.core.internal.processor.LoggerMessageProcessor:
    {data={schema=eGRz2Sfoy-YO9mVvH8J4fg,
    payload=
    {News_Content__c=Lake Road is closed due to mudslides.,
     CreatedById=0050o00000U3Q8vAAF,
     CreatedDate=2019-10-26T07:12:01.026Z,
     Location__c=Mountain City, Urgent__c=true},
     event={replayId=49544589}},
     channel=/event/Cloud_News__e}

変更データキャプチャイベントを操作する

Salesforce Connector を使用して、Salesforce オブジェクトの作成、更新、削除などの変更データキャプチャイベントをサブスクライブします。この例では、イベントを再生できる ​Replay Channel Listener​ ソースを使用しています。イベントを再生しない場合は、代わりに ​Subscribe Channel Listener​ ソースを使用できます。

  1. Salesforce にログインします。

  2. [設定] > [インテグレーション] > [変更データキャプチャ]​ に進み、取引先などの Salesforce オブジェクトを選択します。

  3. Anypoint Studio に移動して Mule アプリケーションを作成します。

  4. [Replay Channel Listener]​ ソースをキャンバスにドラッグし、​[ストリーミングチャネル]​ 項目で ​/data/AccountChangeEvent​ を指定します。チャネル名のリストは、 「Subscription Channels (登録チャネル)」Leaving the Site​を参照してください。

  5. Logger​ コンポーネントをキャンバスにドラッグしてペイロードを表示し、次の例のように受信したメッセージをキャンバスで確認できるようにします。

    <flow name="ytaoka-salesforce-replaychannelFlow" doc:id="01d0fd5c-f777-4eda-a167-a931ef240f65" >
    		<salesforce:replay-channel streamingChannel="/data/AccountChangeEvent" replayOption="ONLY_NEW" doc:name="Replay channel" doc:id="c036e7c5-86ed-4904-ae34-185ea42319e9" config-ref="Salesforce_Config" replayId="-1" autoReplay="true"/>
    		<logger level="INFO" doc:name="Logger" doc:id="978f0aad-ab09-4910-bc54-a7c3dcc5935c" message="#[payload]"/>
    </flow>
    xml
  6. Salesforce にログインして任意の取引先レコードを更新します。たとえば、取引先の名前を ​TestName2​ に更新します。コンソールには次のように表示されます。

    INFO  2019-11-12 08:17:27,496 [[MuleRuntime].cpuLight.05: [sample].sampleFlow.CPU_LITE @14741f50] [event: xxx]
    org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {data={schema=CEjkFTwpfASSecY9UGNoOg,
    payload={LastModifiedDate=2019-11-11T23:17:30.000Z, ChangeEventHeader={commitNumber=10743571519745, commitUser=0050o00000XTesxAAD,
    sequenceNumber=1, entityName=Account, changeType=UPDATE,
    changedFields=[Ljava.lang.Object;@4f738b9d, changeOrigin=com/salesforce/api/soap/47.0;client=SfdcInternalAPI/,
    transactionKey=0002463d-1e88-1d80-5638-15c821f06b79, commitTimestamp=1573514251000, recordIds=[Ljava.lang.Object;@6e812151},
    Name=TestName2},
    event={replayId=1065378}}, channel=/data/AccountChangeEvent}

PushTopic イベントの操作 (レガシー)

PushTopic を作成したり、PushTopic をサブスクライブしたり、PushTopic メッセージを再生したりして、PushTopic イベントを操作します。

PushTopic イベントはレガシー製品です。Salesforce では今後新機能によって PushTopic イベントが強化されることはなく、この製品に対して提供されるサポートは限定的です。PushTopic イベントの代わりに、 「Change Data Capture Developer Guide (変更データキャプチャ開発ガイド)」Leaving the Site​および 「変更データキャプチャの基礎」Leaving the Site​ Trailhead モジュールで説明されているように変更データキャプチャイベントを使用してください。

Salesforce からデータを受信する PushTopic の作成

トピックを作成すると、コネクタによって ​PushTopic​ が作成されます。これは Salesforce の特殊なオブジェクトで、名前 (この場合はトピックの名前) と Salesforce Object Query Language (SOQL) クエリを一緒にバインドします。PushTopic を作成したら、その PushTopic を名前でサブスクライブできます。

Create​ (​create​) 操作または ​Publish topic​ (​publish-topic​) 操作を使用して、PushTopic を作成できます。次の例では、​publish-topic​ 操作を使用して、PushTopic を作成します。

<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>

または、Salesforce で、システムログからアクセス可能な ​[Enter Apex Code (Apex コードの入力)]​ ウィンドウで次のようなコードを実行して、トピックを作成することも可能です。

PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 50.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);
text

PushTopic のサブスクライブ

PushTopic をサブスクライブするには、​Subscribe topic listener​ (​subscribe-topic-listener​) または ​Replay topic listener​ (​replay-topic-listener​) をフローのソースとして追加します。このソースはインバウンドエンドポイントとして機能します。サブスクリプションがイベントを受信するたびに、このソースが Mule アプリケーションのフローの残りの部分を実行します。

次の XML の例では、​AccountUpdates​ PushTopic がイベントを受信すると、Mule が INFO レベルのメッセージをログに出力します。

<flow name="accountUpdatesSubscription">
    <!-- INBOUND ENDPOINT -->
    <sfdc:subscribe-topic-listener topic="AccountUpdates"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>
xml

過去に Salesforce でパブリッシュされていない PushTopic をサブスクライブできます。ただし、PushTopic がパブリッシュされると、再度サブスクライブしないと、その PushTopic の通知を受信しなくなります。

PushTopic からのメッセージの再生

サブスクライバーは、どのイベントを受信するか指定できます。デフォルトでは、サブスクライバーはサブスクライブ後に発生したイベントのみを受信します。イベントは保持期間を過ぎると破棄されます。

Replay topic listener​ ソースには次のオプションがあります。

  • ALL

    サブスクライバーは、保持期間内の過去のイベントと、クライアントがサブスクライブした後に送信された新規イベントを含め、すべてのイベントを受信します。

  • ONLY_NEW

    サブスクライバーは、クライアントがサブスクライブした後に送信された新規イベントを受信します。

  • FROM_REPLAY_ID

    サブスクライバーが、指定したイベント ​replayId​ より後のすべてのイベントを受信します。

  • FROM_LAST_REPLAY_ID

    サブスクライバーが、処理に成功したかどうかにかかわらず、オブジェクトストアに保存されている最も高い再生 ID を使用します。

ALL​ または ​ONLY_NEW​ 再生オプションを指定すると、​replayId​ の値は無視されます。

[Resume from the Last Replay Id (最後の再生 ID から再開)]​ チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が保持期間を経過している場合は、再生オプションによりどのイベントを再生するかが決まります。

「最後の再生 ID から再開」機能をサポートするため、コネクタでは永続的なオブジェクトストアを使用して、処理されたメッセージに関するさまざまな詳細を保存します。この機能によりメッセージ損失の可能性が低くなり、重複するメッセージの処理が回避されます。オブジェクトストアの使用方法についての詳細は、​「イベントでの Object Store の使用法」​を参照してください。

次の XML の例では、​Replay topic listener​ 操作 (​replay-topic-listener​) が ​Logger​ コンポーネントメッセージのインバウンドエンドポイントのように機能します。

<flow name="accountUpdatesReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
xml

汎用イベントの操作 (レガシー)

ストリーミングチャネルを作成したり、ストリーミングチャネルをサブスクライブしたり、汎用イベントを再生したり、ストリーミングチャネルにイベントをプッシュしたりして、汎用イベントを操作します。

汎用イベントはレガシー製品です。Salesforce では今後新機能によって汎用イベントが強化されることはなく、この製品に対して提供されるサポートは限定的です。汎用イベントの代わりに、 「Platform Events Developer Guide (プラットフォームイベント開発ガイド)」Leaving the Site​および 「プラットフォームイベントの基礎」Leaving the Site​ Trailhead モジュールで説明されているようにプラットフォームイベントを使用してください。

ストリーミングチャネルの作成

ストリーミングチャネルを作成するには、組織で適切な Salesforce ストリーミング API 権限が有効になっている必要があります。

ストリーミングチャネルを作成する手順は、次のとおりです。

  1. Salesforce Developer Edition 組織にログインします。

  2. [すべてのタブ] (+)​ で、​[ストリーミングチャネル]​ を選択します。

  3. [Streaming Channels (ストリーミングチャネル)]​ タブで ​[New (新規)]​ を選択します。

  4. [ストリーミングチャネル名]​ 項目に ​/u/notifications/ExampleUserChannel​ と入力します。

  5. 説明 (省略可能) を入力します。

コネクタの ​Create​ 操作または ​Publish streaming channel​ (​publish-streaming-channel​) 操作を使用して、ストリーミングチャネルを作成することもできます。次の例では、​publish-streaming-channel​ 操作を使用します。

<sfdc:publish-streaming-channel
    name="/u/Notifications"
    description="General notifications"/>
xml

ストリーミングチャネルのサブスクライブ

ストリーミングチャネルを作成した後にそのチャネルをサブスクライブすれば、イベントの受信を開始できます。​Subscribe channel listener​ (​subscribe-channel-listener​) ソースはインバウンドエンドポイントのように機能します。この例では、​/u/TestStreaming​ のサブスクリプションがイベントを受信するたびに、フローの残りの部分を実行し、メッセージを INFO レベルで記録します。

<flow name="notificationsChannelSubscription">
  <!-- INBOUND ENDPOINT -->
  <sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
  <!-- REST OF YOUR FLOW -->
  <logger level="INFO" message="Received an event: #[payload]"/>
</flow>
xml

ストリーミングチャネルからのメッセージの再生

ストリーミングチャネルは、通知を再生できます。​Replay channel listener​ (​replay-channel-listener​) ソースは、次の例のようにインバウンドエンドポイントとして機能します。

<flow name="flowStreamingChannelReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
xml

ALL​ または ​ONLY_NEW​ 再生オプションを指定すると、​replayId​ の値は無視されます。

[Resume from the Last Replay Id (最後の再生 ID から再開)]​ チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が保持期間を経過している場合は、再生オプションによりどのイベントを再生するかが決まります。

「最後の再生 ID から再開」機能をサポートするため、コネクタでは永続的なオブジェクトストアを使用して、処理されたメッセージに関する詳細を保存します。この機能によりメッセージ損失の可能性が低くなり、重複するメッセージの処理が回避されます。オブジェクトストアの使用方法についての詳細は、​「イベントでの Object Store の使用法」​を参照してください。

リスナーを使用しないストリーミングチャネルへのイベントのプッシュ

パブリッシュされたイベントを読み取るリスナーがストリーミングチャネルになくても、ユーザーはストリーミングチャネルにイベントをプッシュできます。チャネルのリスナーが開始されると、1 日の上限に基づいて Salesforce ストリーミング API で任意の数のメッセージをリスナーにプッシュできます。

たとえば、無料の Salesforce 組織の 24 時間以内に配信されるイベント通知の最大数が 10,000 で、15,000 件のイベントをそのチャネルにパブリッシュするとします。Salesforce Connector がそのチャネルをサブスクライブしている場合、ストリーミング API は 10,000 件のイベントをプッシュしようとして、日次クォータがコンシュームされます。その後、API は、翌日に新しいイベントをプッシュする前に残りの 5,000 件のイベントをプッシュしようとします。

この場合、コネクタはイベントを 1 つずつ Mule アプリケーションにストリーミングします。アプリケーションのメッセージ処理時間が長すぎる場合、ストリーミング API がコネクタに再接続するよう指示する可能性があります。その場合、ストリーミング API は未処理のメッセージをすべて削除します。このような状況を回避するには、​「信頼性パターン」​の説明に従って信頼性パターンを実装します。

汎用イベント通知の取得

Salesforce Connector を使用すると、Salesforce データの変更とは関係のない一般的なイベントに適用されるカスタムイベント通知を取得できます。

汎用イベント通知を取得する手順は、次のとおりです。

  1. Publish streaming channel​ 操作を使用してストリーミングチャネルを作成します。

    StreamingChannel​ は Salesforce の特殊なオブジェクトで、汎用のストリーミング API イベントをリスナーに通知するために使用するチャネルを表します。

    Salesforce を使用してストリーミングチャネルを作成することもできます。

  2. Subscribe channel listener​ 操作を使用してチャネルをサブスクライブします。

    Salesforce Connector は、ストリーミングチャネルのカスタムイベントを Mule イベントに変換します。

ストリーミングチャネルへの汎用イベントのプッシュ

Salesforce では、REST API を使用してカスタムイベントを特定のストリーミングチャネルにプッシュできます。これを行うには、このコネクタを使用します。

次の例では、コネクタの ​Push generic event​ (​push-generic-event​) 操作を使用して、カスタムイベントを ID ​0M6j0000000KyjBCAS​ のチャネルにプッシュします。

<flow name="flowPushGenericEvent">
    <!-- INBOUND ENDPOINT -->
    <sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
      <sfdc:events>
            <sfdc:event payload="Notification message text"/>
        </sfdc:events>
  </sfdc:push-generic-event>
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
xml

チャネル ID は、​Publish streaming channel​ 操作の応答マップから取得できます。または、Salesforce ページからチャネル ID を取得することもできます。

  1. Salesforce Developer Edition 組織にログインします。

  2. [すべてのタブ] (+)​ で、​[ストリーミングチャネル]​ を選択します。

チャネルリストにチャネル ID 項目が表示されていない場合は、次の手順を実行します。

  1. [Create New View (新規ビューの作成)]​ をクリックします。

  2. [Name (名前)]​ 入力項目にビューの名前を入力します。

  3. [Available Fields (選択可能な項目)]​ リストで、​[Streaming Channel ID (ストリーミングチャネル ID)]​ を選択し、​[Add (追加)]​ をクリックします。

    リストに各ストリーミングチャネルの ID が表示されるはずです。

  4. 他の項目を追加します。

  5. [Save (保存)]​ をクリックします。

push event 操作から応答として受信した JSON は次のようになります。

[
  {
  "userOnlineStatus": {
  },
  "fanoutCount": 0
  }
]
json