Salesforce Connector 10.8 を使用したイベントの処理

Salesforce 用 Anypoint Connector (Salesforce Connector) をインバウンドコネクタとして使用すれば、Salesforce からアプリケーションにデータをストリーミングできます。コネクタをこのように使用するには、次のいずれかのソース操作を使用します。

  • Subscribe topic listener

  • Subscribe channel listener

  • Replay topic listener

  • Replay channel listener

分散環境 (ランタイムクラスターなど) にアプリケーションをデプロイする場合、ソース操作はプライマリノードで実行される必要があります。同じソースが複数のノードで実行されている場合、イベントが重複してコンシュームされます。

前提条件

Salesforce Connector を使用してイベントを処理するには、次の前提条件を満たしていることを確認します。

  • Salesforce の対象リソースへのアクセス権を持っている必要があります。

  • トピックに関連付けられている Salesforce の変更に関するイベントを受信できるようにするには、事前にトピックを作成する必要があります (存在しない場合)。

  • 必要な Salesforce ストリーミング API 権限が組織で有効になっている必要があります。

オブジェクトストアの使用法

Salesforce Connector と Mule の両方で、オブジェクトストアを使用して自動的なメッセージの返信やメッセージの再配信などの機能に関するデータを保持します。

  • Mule アプリケーションのオンプレミスデプロイメント用の Mule に含まれるオブジェクトストアにトランザクション制限はありません。

  • Mule アプリケーションの CloudHub デプロイメント用の Object Store の無料バージョンには 1 秒あたり 10 件のトランザクション制限があります。

オブジェクトストアのバージョンについての詳細は、 「Object Store Notes (オブジェクトストアに関するメモ)」を参照してください。

Replay Topic 操作と Replay Channel Listener 操作

Replay topic​ 操作と ​Replay channel listener​ 操作には、アプリケーションを再起動する前に受信した最後の再生 ID から続行するオプションがあります。

Mule アプリケーションを初めて起動すると、コネクタによって正常に処理されたメッセージまたは失敗したメッセージに関連するデータを保存するために使用されるオブジェクトストアが作成されます。

  • 正常に処理された各メッセージでは、保存される情報はメッセージに関連付けられた再生 ID です。

  • 失敗した各メッセージでは、保存される情報は処理に失敗した最小の再生 ID を表す番号です。

これらの 2 つの構造により、メッセージが 2 回処理されることがなく、アプリケーションを再起動したときに、失敗したメッセージを再処理することができます。

コネクタがサブスクライブされているトピックまたはストリーミングチャネルを通じてメッセージが送信されるたびに、コネクタでは最大 4 つのトランザクションを使用してオブジェクトストア情報が更新されます。

最後の再生 ID から再開

Replay Topic Listener​ および ​Replay Channel Listener​ ソース操作で​最後の再生 ID から再開​機能を有効にするには、ソース操作を使用したフローで正常に処理されたイベントに関する一部の情報がコネクタに保存されている必要があります。コネクタは Object Store を使用してこの情報を保存します。

Object Store を使用して​最後の再生 ID から再開​で必要なデータを保存する方法は、環境によって異なります。

  • Mule アプリケーションがローカルで実行されている場合、コネクタは永続的なストアをファイルシステムに作成します。これにより、アプリケーションが停止または再起動しても、または再接続をトリガーする接続の問題が発生しても、オブジェクトストアに保存されたデータは失われず、コネクタはメッセージが停止した位置でメッセージ処理を続行できます。

  • アプリケーションをクラスターで実行する場合、同じマシン上のノードを使用しているか、異なるマシン上のノードを使用しているかにかかわらず、分散されたメモリ内オブジェクトストアが使用されます。これにより、プライマリノード (メッセージをコンシュームしているノード) がシャットダウンしても、プライマリノードを引き継いだノードは同じメッセージを再度コンシュームせず、以前のノードでメッセージ処理が停止された場所から処理を続行します。
    クラスターが停止した場合、オブジェクトストアからのデータは失われます、そのため、クラスターが再起動したときに、メッセージはソースレベルの設定に応じて再度処理される可能性があります。
    再起動時にアプリケーションがイベントの処理を続行できるようにデータを保持する必要がある場合は、データベースを永続性レイヤーとして使用するようにオブジェクトストアを設定できます。詳細は、​「クラスターの作成および管理」​を参照してください。

  • アプリケーションを CloudHub で実行しているときに、複数のワーカーを使用すると、すべてのワーカーがイベントを受信して処理します。

Salesforce からデータを受信するトピックの作成

トピックを作成すると、コネクタによって ​PushTopic​ が作成されます。これは Salesforce の特殊なオブジェクトで、名前 (この場合はトピックの名前) と Salesforce Object Query Language (SOQL) クエリを一緒にバインドします。トピックを作成したら、そのトピックを名前でサブスクライブできます。

Create​ (​create​) または ​Publish topic​ (​publish-topic​) 操作を使用して、トピックを作成できます。次の例では、​publish-topic​ 操作を使用して、トピックを作成します。

<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>

または、Salesforce で、システムログからアクセス可能な ​[Enter Apex Code (Apex コードの入力)]​ ウィンドウで次のようなコードを実行して、トピックを作成することも可能です。

PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 23.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);

Salesforce からデータを受信するストリーミングチャネルの作成

ストリーミングチャネルを作成するには、組織で適切な Salesforce ストリーミング API 権限が有効になっている必要があります。

ストリーミングチャネルを作成する手順は、次のとおりです。

  1. Salesforce Developer Edition 組織にログインします。

  2. [すべてのタブ] (+)​ で、​[ストリーミングチャネル]​ を選択します。

  3. [ストリーミングチャネル]​ タブで、​[新規]​ を選択して新しいストリーミングチャネルを作成します。

  4. [ストリーミングチャネル名]​ 項目に ​/u/notifications/ExampleUserChannel​ と入力します。

  5. 説明 (省略可能) を入力します。

コネクタの ​Create​ 操作または ​Publish streaming channel​ (​publish-streaming-channel​) 操作を使用して、ストリーミングチャネルを作成することもできます。次の例では、​publish-streaming-channel​ 操作を使用します。

<sfdc:publish-streaming-channel
    name="/u/Notifications"
    description="General notifications"/>

Subscribe to a Topic

トピックをサブスクライブするには、​Subscribe topic listener​ (​subscribe-topic-listener​) または ​Replay topic listener​ (​replay-topic-listener​) をフローの入力元として追加します。入力元はインバウンドエンドポイントとして機能します。サブスクリプションがイベントを受信するたびに、入力元が Mule アプリケーションのフローの残りの部分を実行します。

次の XML の例では、​AccountUpdates​ トピックがイベントを受信すると、Mule が INFO レベルのメッセージをログに出力します。

<flow name="accountUpdatesSubscription">
    <!-- INBOUND ENDPOINT -->
    <sfdc:subscribe-topic-listener topic="AccountUpdates"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>

過去に Salesforce でパブリッシュされていないトピックをサブスクライブできます。ただし、トピックがパブリッシュされると、再度サブスクライブしないと、そのトピックの通知を受信しなくなります。

フローを通過する各イベントには、変更された Salesforce データに関する情報 (データがいつどのように変更されたのかなど) が含まれています。

Salesforce はイベントを 24 時間 (大量イベントの場合は 72 時間) 保存します。トピックまたはチャネルへのサブスクライバーは、24 時間の保持期間中にそのトピックまたはチャネルに関連するイベントを取得できます。保持期間が終了すると、サブスクライバーは、まだ有効期限が切れていない新しいイベントを取得できます。

Salesforce は、各ブロードキャストイベントに数値の ID を割り当てます。ID は増分されますが、必ずしも連続するイベントごとに 1 つずつ増分するとは限りません。たとえば、ID 999 のイベントに続くイベントの ID は 1025 になる可能性があります。ブロードキャストイベント ID は組織とチャネルで一意です。Salesforce は、削除されたイベントの ID を再利用しません。

ストリーミングチャネルのサブスクライブ

ストリーミングチャネルを作成した後にそのチャネルをサブスクライブすれば、イベントの受信を開始できます。​subscribe-channel-listener​ 入力元はインバウンドエンドポイントのように機能します。この例では、​/u/TestStreaming​ のサブスクリプションがイベントを受信するたびに、フローの残りの部分を実行し、メッセージを INFO レベルで記録します。

<flow name="notificationsChannelSubscription">
  <!-- INBOUND ENDPOINT -->
  <sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
  <!-- REST OF YOUR FLOW -->
  <logger level="INFO" message="Received an event: #[payload]"/>
</flow>

Salesforce 環境で使用可能な変更イベントは、​Subscribe channel listener​ 操作の ​[Streaming channel (ストリーミングチャネル)]​ 項目には表示されません。ただし、コネクタはストリーミングチャネルをサブスクライブしてこの情報を取得できます。たとえば、​All Change Events​ チャネルをサブスクライブするには、サブスクライブするチャネル名として ​/data/ChangeEvents​ を使用します。

詳細は、『Salesforce Change Data Capture Developer Guide (Salesforce 変更データキャプチャ開発者ガイド)』の 「Subscription Channels (サブスクリプションチャネル)」​を参照してください。

トピックからのメッセージの再生

サブスクライバーは、どのイベントを受信するか指定できます。デフォルトでは、サブスクライバーはサブスクライブ後に発生したイベントのみを受信します。イベントは 24 時間 (大量イベントの場合は 72 時間) の保持期間を過ぎると破棄されます。

Replay Topic Listener​ 操作には次のオプションがあります。

  • ALL

    サブスクライバーは、24 時間 (または 72 時間) の保持期間内の過去のイベントと、クライアントがサブスクライブした後に送信された新規イベントを含め、すべてのイベントを受信します。

  • ONLY_NEW

    サブスクライバーが、クライアントのサブスクライブ後にブロードキャストされた新規イベントを受信します。

  • FROM_REPLAY_ID

    サブスクライバーが、指定したイベント ​replayId​ より後のすべてのイベントを受信します。

ALL​ または ​ONLY_NEW​ 再生オプションを指定すると、​replayId​ の値は無視されます。

[Resume from the Last Replay Id (最後の再生 ID から再開)]​ チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が 24 時間の保持期間を経過している場合は、再生オプションがどのイベントを再生するかを決定します。

最後の再生 ID から再開​機能をサポートするため、コネクタは永続的なオブジェクトストアを使用して、処理されたメッセージに関するさまざまな詳細を保存します。この機能は Salesforce Connector 10.x では、メッセージ損失の可能性を減らし、重複するメッセージの処理を回避するように拡張されています。Object Store の使用方法についての詳細は、​​「Object Store の使用法」​を参照してください。

次の XML の例では、​Replay topic listener​ 操作が Logger コンポーネントメッセージのインバウンドエンドポイントのように機能します。

<flow name="accountUpdatesReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

ストリーミングチャネルからのメッセージの再生

ストリーミングチャネルは、通知を再生できます。​Replay channel listener​ 入力元はインバウンドエンドポイントとして機能します。次の例のように使用できます。

<flow name="flowStreamingChannelReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

ALL​ または ​ONLY_NEW​ 再生オプションを指定すると、​replayId​ の値は無視されます。

[Resume from the Last Replay Id (最後の再生 ID から再開)]​ チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が 24 時間の保持期間を経過している場合は、再生オプションがどのイベントを再生するかを決定します。

最後の再生 ID から再開​機能をサポートするため、コネクタは永続的なオブジェクトストアを使用して、処理されたメッセージに関するさまざまな詳細を保存します。この機能は Salesforce Connector 10.x では、メッセージ損失の可能性を減らし、重複するメッセージの処理を回避するように拡張されています。Object Store の使用方法についての詳細は、​​「Object Store の使用法」​を参照してください。

カスタムイベント通知

Salesforce Connector を使用すると、カスタムイベント通知を取得できます。これらの通知は、Salesforce データの変更とは関係のない一般的なイベントに適用されます。

カスタムイベント通知を取得する手順は、次のとおりです。

  1. Publish streaming channel​ 操作を使用してストリーミングチャネルを作成します。

    StreamingChannel​ は Salesforce の特殊なオブジェクトで、汎用のストリーミング API イベントをリスナーに通知するために使用するチャネルを表します。

    Salesforce または ワークベンチ​を使用してストリーミングチャネルを作成することもできます。

  2. Subscribe channel listener​ 操作を使用してチャネルをサブスクライブします。

    Salesforce Connector は、ストリーミングチャネルのカスタムイベントを Mule イベントに変換します。

ストリーミングチャネルの操作についての詳細は、​​「Salesforce からデータを受信するストリーミングチャネルの作成」​および ​​「Salesforce Connector を使用したイベントの処理」​を参照してください。

ストリーミングチャネルへのイベントのプッシュ

Salesforce では、REST API を使用してカスタムイベントを特定のストリーミングチャネルにプッシュできます。これを行うには、 ワークベンチ​またはこのコネクタを使用します。

次の例では、コネクタの ​push-generic-event​ 操作を使用して、カスタムイベントを ID ​0M6j0000000KyjBCAS​ のチャネルにプッシュします。

<flow name="flowPushGenericEvent">
    <!-- INBOUND ENDPOINT -->
    <sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
      <sfdc:events>
            <sfdc:event payload="Notification message text"/>
        </sfdc:events>
  </sfdc:push-generic-event>
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

チャネル ID は、​publish-streaming-channel​ 操作の応答マップから取得できます。または、Salesforce ページからチャネル ID を取得することもできます。

  1. Salesforce Developer Edition 組織にログインします。

  2. [すべてのタブ] (+)​ で、​[ストリーミングチャネル]​ を選択します。

チャネルリストにチャネル ID 項目が表示されていない場合は、次の手順を実行します。

  1. [Create New View (新規ビューの作成)]​ をクリックします。

  2. [Name (名前)]​ 入力項目にビューの名前を入力します。

  3. [Available Fields (選択可能な項目)]​ リストで、​[Streaming Channel ID (ストリーミングチャネル ID)]​ を選択し、​[Add (追加)]​ をクリックします。

    リストに各ストリーミングチャネルの ID が表示されるはずです。

  4. 他の項目を追加します。

  5. [Save (保存)]​ をクリックします。

push event 操作から応答として受信した JSON は次のようになります。

[
  {
  "userOnlineStatus": {
  },
  "fanoutCount": 0
  }
]