Salesforce Connector 10.17 を使用したイベントの処理

Salesforce 用 Anypoint Connector (Salesforce Connector) をインバウンドコネクタとして使用すれば、Salesforce からアプリケーションにデータをストリーミングできます。コネクタをこのように使用するには、次のいずれかのソースを使用します。

  • Subscribe topic listener

  • Subscribe channel listener

  • Replay topic listener

  • Replay channel listener

分散環境 (ランタイムクラスターなど) にアプリケーションをデプロイする場合、ソースはプライマリノードで実行される必要があります。ソースが複数のノードで実行されている場合、イベントが重複してコンシュームされたり、Salesforce Connector によって保存されたデータのオブジェクトストアデータが破損したりします。

フローを通過する各イベントには、変更された Salesforce データに関する情報 (データがいつどのように変更されたのかなど) が含まれています。

Salesforce はイベントを 24 時間 (大量イベントの場合は 72 時間) 保存します。トピックまたはチャネルへのサブスクライバーは、この保持期間中にそのトピックまたはチャネルに関連するイベントを取得できます。その後、サブスクライバーは、まだ有効期限が切れていない新しいイベントのみを取得できます。

Salesforce では各ブロードキャストイベントに、組織とチャネルに固有の数値 ID が割り当てられます。ID は増分されますが、必ずしも連続するとは限りません。たとえば、イベント ​999​ の後のイベントがイベント ​1025​ になる場合があります。Salesforce は、削除されたイベントの ID を再利用しません。

始める前に

Salesforce Connector を使用してイベントを処理するには、以下があることを確認します。

  • Salesforce の対象リソースへのアクセス権

  • 組織で有効になっている必要な Salesforce ストリーミング API 権限

トピックに関連付けられている Salesforce の変更に関するイベントを受信するには、新しいトピックを作成するか、既存のトピックを使用します。

イベントでの Object Store の使用法

Salesforce Connector と Mule Runtime Engine (Mule) の両方で、オブジェクトストアを使用して自動的なメッセージの返信やメッセージの再配信などの機能に関するデータを保持します。

  • Mule アプリケーションのオンプレミスデプロイメント用の Mule に含まれるオブジェクトストアにトランザクション制限はありません。

  • Mule アプリケーションの CloudHub デプロイメント用のオブジェクトストアの無料バージョンには 10 トランザクション/秒 (TPS) の制限がありますが、オブジェクトストアを内部的に使用する他の Anypoint Connector の制限は 100 TPS です。

Object Store のバージョンについての詳細は、 「Object Store に関するメモ」​を参照してください。

Replay Topic Listener ソースと Replay Channel Listener ソース

Replay topic listener​ ソースと ​Replay channel listener​ ソースは、アプリケーションの再起動前に受信した最後の再生 ID から続行できます。

Mule アプリケーションを初めて起動すると、データを保存するオブジェクトストアが作成されます。

  • 失敗したイベントがない場合、ソースは、最後に正常に処理されたイベント ID に関連付けられているイベントから開始されます。

  • 失敗したイベントが 1 つ以上ある場合、ソースは、失敗した最小の再生 ID に関連付けられているイベントから開始されます。

この変更データキャプチャをオブジェクトストアに保存することで、メッセージが 2 回処理されることがなくなり、アプリケーションを再起動したときに、失敗したメッセージを再処理することができます。ID はオブジェクトストアに 72 時間保存された後、自動的に削除されます。

コネクタには、イベント (処理に成功したイベントや処理に失敗したイベントなど) の再生 ID を保存するメモリ内構造が含まれています。 このメモリ内構造では、オブジェクトストアに保持するコネクタ設定のユーザー名を使用して、特定のキーを作成します。冗長性メカニズムでは、1 つのバックアップキーで同じデータを保存して、重複するイベントの処理を回避します。 各永続操作は、既存のキーの削除と新しい値の保存で構成されており、新しい値の保存に失敗すると、コネクタは再試行するため、キーあたり 2 TPS 以上をコンシュームします。

複数のワーカーが CloudHub にデプロイされている場合、​Replay topic listener​ ソースと ​Replay channel listener​ ソースでは、すべての Mule インスタンスで各メッセージが処理されます。この状況を回避するには、次のいずれかを実行できます。

  • 重複するメッセージを除外するようにアプリケーションを変更する。重複するメッセージを除外しても、ワーカーごとにクォータがコンシュームされます。

  • アプリケーションを 1 つのワーカーにデプロイし、共有キューを使用してメッセージを異なるワーカーに渡して処理する。

最後の再生 ID から再開

Replay topic listener​ および ​Replay channel listener​ ソースの「最後の再生 ID から再開」機能を有効にするには、このソースが使用されているフローで正常に処理されたイベントに関する情報をコネクタで保存している必要があります。コネクタではオブジェクトストアを使用してこの情報が保存されます。

「最後の再生 ID から再開」機能で必要なデータをオブジェクトストアに保存する方法は、環境によって異なります。

  • Mule アプリケーションがローカルで実行されている場合、コネクタは永続的なストアをファイルシステムに作成します。

    アプリケーションが停止して再起動しても、または接続の問題が発生して再接続がトリガーされても、オブジェクトストアに保存されたデータにより、コネクタはメッセージが停止した位置でメッセージ処理を続行できます。

  • Mule アプリケーションをクラスターで実行する場合、同じマシン上のノードを使用しているか、異なるマシン上のノードを使用しているかにかかわらず、分散されたメモリ内オブジェクトストアが使用されます。

    • プライマリノード (メッセージをコンシュームしているノード) がシャットダウンしても、プライマリノードを引き継いだノードは同じメッセージを再処理しません。代わりに、ノードは、前のノードでメッセージ処理が停止された位置から続行します。

    • クラスターが停止すると、オブジェクトストアのデータは失われます。クラスターが再起動すると、ソースレベルの設定によりコネクタでメッセージを再処理するかどうかが決まります。

    再起動時にアプリケーションがイベントの処理を続行できるようにデータを保持するには、​「手動によるクラスターの作成および管理」​の説明に従って永続的なオブジェクトストアを作成します。

  • アプリケーションを CloudHub で実行しているときに、複数のワーカーを使用すると、すべてのワーカーがイベントを受信して処理します。

同じソースを使用する複数のフロー

同じチャネルまたはトピックでイベントをリスンする 1 つのソースを複数のフローで使用すると、そのイベントはユーザーのクォータから 2 回以上コンシュームされます。これを回避するには、代わりにそのロジックを 1 つのフローに実装することを考慮します。

複数のソースで同じイベントをリスンする場合、​[Resume from the Last Replay Id (最後の再生 ID から再開)]​ オプションが有効になっていると、データの破損やイベントの消失が発生する可能性があります。

異なるソースを使用する複数のフロー

異なるソースを使用する複数のフローがある場合、​[Replay Failed Events If Any or Resume from Last Replay Id (存在する場合は失敗したイベントを再生または最後の再生 ID から再開)]​ オプションが有効になっていると、特定の条件でパフォーマンスに影響する可能性があります。

パフォーマンスへの影響を回避するには、Salesforce ユーザー名ごとに異なる Salesforce Connector 設定を使用します。

設定のユーザー名を変更すると、最終処理された再生 ID や失敗した再生 ID がコネクタで読み込まれなくなります。処理は、Salesforce で使用できる最初のイベントから再開します。
個人ユーザーアカウントを使用して Salesforce にアクセスしないでください。

トピックイベントの操作

トピックを作成したり、トピックをサブスクライブしたり、トピックメッセージを再生したりして、トピックイベントを操作します。

Salesforce からデータを受信するトピックの作成

トピックを作成すると、コネクタによって ​PushTopic​ が作成されます。これは Salesforce の特殊なオブジェクトで、名前 (この場合はトピックの名前) と Salesforce Object Query Language (SOQL) クエリを一緒にバインドします。トピックを作成したら、そのトピックを名前でサブスクライブできます。

Create​ (​create​) 操作または ​Publish topic​ (​publish-topic​) 操作を使用して、トピックを作成できます。次の例では、​publish-topic​ 操作を使用して、トピックを作成します。

<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>

または、Salesforce で、システムログからアクセス可能な ​[Enter Apex Code (Apex コードの入力)]​ ウィンドウで次のようなコードを実行して、トピックを作成することも可能です。

PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 23.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);

Subscribe to a Topic

トピックをサブスクライブするには、​Subscribe topic listener​ (​subscribe-topic-listener​) または ​Replay topic listener​ (​replay-topic-listener​) をフローのソースとして追加します。このソースはインバウンドエンドポイントとして機能します。サブスクリプションがイベントを受信するたびに、このソースが Mule アプリケーションのフローの残りの部分を実行します。

次の XML の例では、​AccountUpdates​ トピックがイベントを受信すると、Mule が INFO レベルのメッセージをログに出力します。

<flow name="accountUpdatesSubscription">
    <!-- INBOUND ENDPOINT -->
    <sfdc:subscribe-topic-listener topic="AccountUpdates"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>

過去に Salesforce でパブリッシュされていないトピックをサブスクライブできます。ただし、トピックがパブリッシュされると、再度サブスクライブしないと、そのトピックの通知を受信しなくなります。

トピックからのメッセージの再生

サブスクライバーは、どのイベントを受信するか指定できます。デフォルトでは、サブスクライバーはサブスクライブ後に発生したイベントのみを受信します。イベントは 24 時間 (大量イベントの場合は 72 時間) の保持期間を過ぎると破棄されます。

Replay topic listener​ ソースには次のオプションがあります。

  • ALL

    サブスクライバーは、24 時間 (または 72 時間) の保持期間内の過去のイベントと、クライアントがサブスクライブした後に送信された新規イベントを含め、すべてのイベントを受信します。

  • ONLY_NEW

    サブスクライバーは、クライアントがサブスクライブした後に送信された新規イベントを受信します。

  • FROM_REPLAY_ID

    サブスクライバーが、指定したイベント ​replayId​ より後のすべてのイベントを受信します。

ALL​ または ​ONLY_NEW​ 再生オプションを指定すると、​replayId​ の値は無視されます。

[Resume from the Last Replay Id (最後の再生 ID から再開)]​ チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が 24 時間の保持期間を経過している場合は、再生オプションによりどのイベントを再生するかが決まります。

「最後の再生 ID から再開」機能をサポートするため、コネクタでは永続的なオブジェクトストアを使用して、処理されたメッセージに関するさまざまな詳細を保存します。この機能によりメッセージ損失の可能性が低くなり、重複するメッセージの処理が回避されます。オブジェクトストアの使用方法についての詳細は、​「イベントでの Object Store の使用法」​を参照してください。

次の XML の例では、​Replay topic listener​ 操作 (​replay-topic-listener​) が ​Logger​ コンポーネントメッセージのインバウンドエンドポイントのように機能します。

<flow name="accountUpdatesReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

ストリーミングチャネルイベントの操作

ストリーミングチャネルを作成したり、ストリーミングチャネルをサブスクライブしたり、ストリーミングチャネルイベントを再生したり、ストリーミングチャネルにイベントをプッシュしたりして、ストリーミングチャネルイベントを操作します。

ストリーミングチャネルの作成

ストリーミングチャネルを作成するには、組織で適切な Salesforce ストリーミング API 権限が有効になっている必要があります。

ストリーミングチャネルを作成する手順は、次のとおりです。

  1. Salesforce Developer Edition 組織にログインします。

  2. [すべてのタブ] (+)​ で、​[ストリーミングチャネル]​ を選択します。

  3. [Streaming Channels (ストリーミングチャネル)]​ タブで ​[New (新規)]​ を選択します。

  4. [ストリーミングチャネル名]​ 項目に ​/u/notifications/ExampleUserChannel​ と入力します。

  5. 説明 (省略可能) を入力します。

コネクタの ​Create​ 操作または ​Publish streaming channel​ (​publish-streaming-channel​) 操作を使用して、ストリーミングチャネルを作成することもできます。次の例では、​publish-streaming-channel​ 操作を使用します。

<sfdc:publish-streaming-channel
    name="/u/Notifications"
    description="General notifications"/>

ストリーミングチャネルのサブスクライブ

ストリーミングチャネルを作成した後にそのチャネルをサブスクライブすれば、イベントの受信を開始できます。​Subscribe channel listener​ (​subscribe-channel-listener​) ソースはインバウンドエンドポイントのように機能します。この例では、​/u/TestStreaming​ のサブスクリプションがイベントを受信するたびに、フローの残りの部分を実行し、メッセージを INFO レベルで記録します。

<flow name="notificationsChannelSubscription">
  <!-- INBOUND ENDPOINT -->
  <sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
  <!-- REST OF YOUR FLOW -->
  <logger level="INFO" message="Received an event: #[payload]"/>
</flow>

Salesforce 環境で使用可能な変更イベントは、​Subscribe channel listener​ 操作の ​[Streaming channel (ストリーミングチャネル)]​ 項目には表示されません。ただし、コネクタはストリーミングチャネルをサブスクライブしてこの情報を取得できます。たとえば、​All Change Events​ チャネルをサブスクライブするには、サブスクライブするチャネル名として ​/data/ChangeEvents​ を使用します。

詳細は、『Salesforce Change Data Capture Developer Guide (Salesforce 変更データキャプチャ開発者ガイド)』の 「Subscription Channels (サブスクリプションチャネル)」​を参照してください。

ストリーミングチャネルからのメッセージの再生

ストリーミングチャネルは、通知を再生できます。​Replay channel listener​ (​replay-channel-listener​) ソースは、次の例のようにインバウンドエンドポイントとして機能します。

<flow name="flowStreamingChannelReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

ALL​ または ​ONLY_NEW​ 再生オプションを指定すると、​replayId​ の値は無視されます。

[Resume from the Last Replay Id (最後の再生 ID から再開)]​ チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が 24 時間の保持期間を経過している場合は、再生オプションによりどのイベントを再生するかが決まります。

「最後の再生 ID から再開」機能をサポートするため、コネクタでは永続的なオブジェクトストアを使用して、処理されたメッセージに関する詳細を保存します。この機能によりメッセージ損失の可能性が低くなり、重複するメッセージの処理が回避されます。オブジェクトストアの使用方法についての詳細は、​「イベントでの Object Store の使用法」​を参照してください。

リスナーを使用しないストリーミングチャネルへのイベントのプッシュ

パブリッシュされたイベントを読み取るリスナーがストリーミングチャネルになくても、ユーザーはストリーミングチャネルにイベントをプッシュできます。チャネルのリスナーが開始されると、1 日の上限に基づいて Salesforce ストリーミング API で任意の数のメッセージをリスナーにプッシュできます。

たとえば、無料の Salesforce 組織の 24 時間以内に配信されるイベント通知の最大数が 10,000 で、15,000 件のイベントをそのチャネルにパブリッシュするとします。Salesforce Connector がそのチャネルをサブスクライブしている場合、ストリーミング API は 10,000 件のイベントをプッシュしようとして、日次クォータがコンシュームされます。その後、API は、翌日に新しいイベントをプッシュする前に残りの 5,000 件のイベントをプッシュしようとします。

この場合、コネクタはイベントを 1 つずつ Mule アプリケーションにストリーミングします。アプリケーションのメッセージ処理時間が長すぎる場合、ストリーミング API がコネクタに再接続するよう指示する可能性があります。その場合、ストリーミング API は未処理のメッセージをすべて削除します。このような状況を回避するには、​「信頼性パターン」​の説明に従って信頼性パターンを実装します。

カスタムイベントの操作

カスタムイベント通知を取得したり、ストリーミングチャネルにカスタムイベントをプッシュしたりして、カスタムイベントを操作します。

カスタムイベント通知の取得

Salesforce Connector を使用すると、Salesforce データの変更とは関係のない一般的なイベントに適用されるカスタムイベント通知を取得できます。

カスタムイベント通知を取得する手順は、次のとおりです。

  1. Publish streaming channel​ 操作を使用してストリーミングチャネルを作成します。

    StreamingChannel​ は Salesforce の特殊なオブジェクトで、汎用のストリーミング API イベントをリスナーに通知するために使用するチャネルを表します。

    Salesforce または ワークベンチ​を使用してストリーミングチャネルを作成することもできます。

  2. Subscribe channel listener​ 操作を使用してチャネルをサブスクライブします。

    Salesforce Connector は、ストリーミングチャネルのカスタムイベントを Mule イベントに変換します。

ストリーミングチャネルへのカスタムイベントのプッシュ

Salesforce では、REST API を使用してカスタムイベントを特定のストリーミングチャネルにプッシュできます。これを行うには、 ワークベンチ​またはこのコネクタを使用します。

次の例では、コネクタの ​Push generic event​ (​push-generic-event​) 操作を使用して、カスタムイベントを ID ​0M6j0000000KyjBCAS​ のチャネルにプッシュします。

<flow name="flowPushGenericEvent">
    <!-- INBOUND ENDPOINT -->
    <sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
      <sfdc:events>
            <sfdc:event payload="Notification message text"/>
        </sfdc:events>
  </sfdc:push-generic-event>
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

チャネル ID は、​Publish streaming channel​ 操作の応答マップから取得できます。または、Salesforce ページからチャネル ID を取得することもできます。

  1. Salesforce Developer Edition 組織にログインします。

  2. [すべてのタブ] (+)​ で、​[ストリーミングチャネル]​ を選択します。

チャネルリストにチャネル ID 項目が表示されていない場合は、次の手順を実行します。

  1. [Create New View (新規ビューの作成)]​ をクリックします。

  2. [Name (名前)]​ 入力項目にビューの名前を入力します。

  3. [Available Fields (選択可能な項目)]​ リストで、​[Streaming Channel ID (ストリーミングチャネル ID)]​ を選択し、​[Add (追加)]​ をクリックします。

    リストに各ストリーミングチャネルの ID が表示されるはずです。

  4. 他の項目を追加します。

  5. [Save (保存)]​ をクリックします。

push event 操作から応答として受信した JSON は次のようになります。

[
  {
  "userOnlineStatus": {
  },
  "fanoutCount": 0
  }
]