Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerSalesforce 用 Anypoint Connector (Salesforce Connector) をインバウンドコネクタとして使用すれば、Salesforce からアプリケーションにデータをストリーミングできます。コネクタをこのように使用するには、次のいずれかのソース操作を使用します。
Subscribe topic listener
Subscribe channel listener
Replay topic listener
Replay channel listener
| 分散環境 (ランタイムクラスターなど) にアプリケーションをデプロイする場合、ソース操作はプライマリノードで実行される必要があります。同じソースが複数のノードで実行されている場合、イベントが重複してコンシュームされます。 |
Salesforce Connector を使用してイベントを処理するには、次の前提条件を満たしていることを確認します。
Salesforce の対象リソースへのアクセス権を持っている必要があります。
トピックに関連付けられている Salesforce の変更に関するイベントを受信できるようにするには、事前にトピックを作成する必要があります (存在しない場合)。
必要な Salesforce ストリーミング API 権限が組織で有効になっている必要があります。
Salesforce Connector と Mule の両方で、オブジェクトストアを使用して自動的なメッセージの返信やメッセージの再配信などの機能に関するデータを保持します。
Mule アプリケーションのオンプレミスデプロイメント用の Mule に含まれるオブジェクトストアにトランザクション制限はありません。
Mule アプリケーションの CloudHub デプロイメント用の Object Store の無料バージョンには 1 秒あたり 10 件のトランザクション制限があります。
オブジェクトストアのバージョンについての詳細は、 「Object Store Notes (オブジェクトストアに関するメモ)」を参照してください。
Replay topic 操作と Replay channel listener 操作には、アプリケーションを再起動する前に受信した最後の再生 ID から続行するオプションがあります。
Mule アプリケーションを初めて起動すると、コネクタによって正常に処理されたメッセージまたは失敗したメッセージに関連するデータを保存するために使用されるオブジェクトストアが作成されます。
正常に処理された各メッセージでは、保存される情報はメッセージに関連付けられた再生 ID です。
失敗した各メッセージでは、保存される情報は処理に失敗した最小の再生 ID を表す番号です。
これらの 2 つの構造により、メッセージが 2 回処理されることがなく、アプリケーションを再起動したときに、失敗したメッセージを再処理することができます。
コネクタがサブスクライブされているトピックまたはストリーミングチャネルを通じてメッセージが送信されるたびに、コネクタでは最大 4 つのトランザクションを使用してオブジェクトストア情報が更新されます。
Replay Topic Listener および Replay Channel Listener ソース操作で最後の再生 ID から再開機能を有効にするには、ソース操作を使用したフローで正常に処理されたイベントに関する一部の情報がコネクタに保存されている必要があります。コネクタは Object Store を使用してこの情報を保存します。
Object Store を使用して最後の再生 ID から再開で必要なデータを保存する方法は、環境によって異なります。
Mule アプリケーションがローカルで実行されている場合、コネクタは永続的なストアをファイルシステムに作成します。これにより、アプリケーションが停止または再起動しても、または再接続をトリガーする接続の問題が発生しても、オブジェクトストアに保存されたデータは失われず、コネクタはメッセージが停止した位置でメッセージ処理を続行できます。
アプリケーションをクラスターで実行する場合、同じマシン上のノードを使用しているか、異なるマシン上のノードを使用しているかにかかわらず、分散されたメモリ内オブジェクトストアが使用されます。これにより、プライマリノード (メッセージをコンシュームしているノード) がシャットダウンしても、プライマリノードを引き継いだノードは同じメッセージを再度コンシュームせず、以前のノードでメッセージ処理が停止された場所から処理を続行します。
クラスターが停止した場合、オブジェクトストアからのデータは失われます、そのため、クラスターが再起動したときに、メッセージはソースレベルの設定に応じて再度処理される可能性があります。
再起動時にアプリケーションがイベントの処理を続行できるようにデータを保持する必要がある場合は、データベースを永続性レイヤーとして使用するようにオブジェクトストアを設定できます。詳細は、「クラスターの作成および管理」を参照してください。
アプリケーションを CloudHub で実行しているときに、複数のワーカーを使用すると、すべてのワーカーがイベントを受信して処理します。
トピックを作成すると、コネクタによって PushTopic が作成されます。これは Salesforce の特殊なオブジェクトで、名前 (この場合はトピックの名前) と Salesforce Object Query Language (SOQL) クエリを一緒にバインドします。トピックを作成したら、そのトピックを名前でサブスクライブできます。
Create (create) または Publish topic (publish-topic) 操作を使用して、トピックを作成できます。次の例では、publish-topic 操作を使用して、トピックを作成します。
<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>
または、Salesforce で、システムログからアクセス可能な [Enter Apex Code (Apex コードの入力)] ウィンドウで次のようなコードを実行して、トピックを作成することも可能です。
PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 23.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);
ストリーミングチャネルを作成するには、組織で適切な Salesforce ストリーミング API 権限が有効になっている必要があります。
ストリーミングチャネルを作成する手順は、次のとおりです。
Salesforce Developer Edition 組織にログインします。
[すべてのタブ] (+) で、[ストリーミングチャネル] を選択します。
[ストリーミングチャネル] タブで、[新規] を選択して新しいストリーミングチャネルを作成します。
[ストリーミングチャネル名] 項目に /u/notifications/ExampleUserChannel と入力します。
説明 (省略可能) を入力します。
コネクタの Create 操作または Publish streaming channel (publish-streaming-channel) 操作を使用して、ストリーミングチャネルを作成することもできます。次の例では、publish-streaming-channel 操作を使用します。
<sfdc:publish-streaming-channel
name="/u/Notifications"
description="General notifications"/>
トピックをサブスクライブするには、Subscribe topic listener (subscribe-topic-listener) または Replay topic listener (replay-topic-listener) をフローの入力元として追加します。入力元はインバウンドエンドポイントとして機能します。サブスクリプションがイベントを受信するたびに、入力元が Mule アプリケーションのフローの残りの部分を実行します。
次の XML の例では、AccountUpdates トピックがイベントを受信すると、Mule が INFO レベルのメッセージをログに出力します。
<flow name="accountUpdatesSubscription">
<!-- INBOUND ENDPOINT -->
<sfdc:subscribe-topic-listener topic="AccountUpdates"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>
過去に Salesforce でパブリッシュされていないトピックをサブスクライブできます。ただし、トピックがパブリッシュされると、再度サブスクライブしないと、そのトピックの通知を受信しなくなります。
フローを通過する各イベントには、変更された Salesforce データに関する情報 (データがいつどのように変更されたのかなど) が含まれています。
Salesforce はイベントを 24 時間 (大量イベントの場合は 72 時間) 保存します。トピックまたはチャネルへのサブスクライバーは、24 時間の保持期間中にそのトピックまたはチャネルに関連するイベントを取得できます。保持期間が終了すると、サブスクライバーは、まだ有効期限が切れていない新しいイベントを取得できます。
Salesforce は、各ブロードキャストイベントに数値の ID を割り当てます。ID は増分されますが、必ずしも連続するイベントごとに 1 つずつ増分するとは限りません。たとえば、ID 999 のイベントに続くイベントの ID は 1025 になる可能性があります。ブロードキャストイベント ID は組織とチャネルで一意です。Salesforce は、削除されたイベントの ID を再利用しません。
ストリーミングチャネルを作成した後にそのチャネルをサブスクライブすれば、イベントの受信を開始できます。subscribe-channel-listener 入力元はインバウンドエンドポイントのように機能します。この例では、/u/TestStreaming のサブスクリプションがイベントを受信するたびに、フローの残りの部分を実行し、メッセージを INFO レベルで記録します。
<flow name="notificationsChannelSubscription">
<!-- INBOUND ENDPOINT -->
<sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Received an event: #[payload]"/>
</flow>
Salesforce 環境で使用可能な変更イベントは、Subscribe channel listener 操作の [Streaming channel (ストリーミングチャネル)] 項目には表示されません。ただし、コネクタはストリーミングチャネルをサブスクライブしてこの情報を取得できます。たとえば、All Change Events チャネルをサブスクライブするには、サブスクライブするチャネル名として /data/ChangeEvents を使用します。
詳細は、『Salesforce Change Data Capture Developer Guide (Salesforce 変更データキャプチャ開発者ガイド)』の 「Subscription Channels (サブスクリプションチャネル)」を参照してください。
サブスクライバーは、どのイベントを受信するか指定できます。デフォルトでは、サブスクライバーはサブスクライブ後に発生したイベントのみを受信します。イベントは 24 時間 (大量イベントの場合は 72 時間) の保持期間を過ぎると破棄されます。
Replay Topic Listener 操作には次のオプションがあります。
ALL
サブスクライバーは、24 時間 (または 72 時間) の保持期間内の過去のイベントと、クライアントがサブスクライブした後に送信された新規イベントを含め、すべてのイベントを受信します。
ONLY_NEW
サブスクライバーが、クライアントのサブスクライブ後にブロードキャストされた新規イベントを受信します。
FROM_REPLAY_ID
サブスクライバーが、指定したイベント replayId より後のすべてのイベントを受信します。
ALL または ONLY_NEW 再生オプションを指定すると、replayId の値は無視されます。
[Resume from the Last Replay Id (最後の再生 ID から再開)] チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が 24 時間の保持期間を経過している場合は、再生オプションがどのイベントを再生するかを決定します。
最後の再生 ID から再開機能をサポートするため、コネクタは永続的なオブジェクトストアを使用して、処理されたメッセージに関するさまざまな詳細を保存します。この機能は Salesforce Connector 10.x では、メッセージ損失の可能性を減らし、重複するメッセージの処理を回避するように拡張されています。Object Store の使用方法についての詳細は、「Object Store の使用法」を参照してください。
次の XML の例では、Replay topic listener 操作が Logger コンポーネントメッセージのインバウンドエンドポイントのように機能します。
<flow name="accountUpdatesReplay">
<!-- INBOUND ENDPOINT -->
<sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
ストリーミングチャネルは、通知を再生できます。Replay channel listener 入力元はインバウンドエンドポイントとして機能します。次の例のように使用できます。
<flow name="flowStreamingChannelReplay">
<!-- INBOUND ENDPOINT -->
<sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
ALL または ONLY_NEW 再生オプションを指定すると、replayId の値は無視されます。
[Resume from the Last Replay Id (最後の再生 ID から再開)] チェックボックスにより、コネクタが最後に処理したイベントの再生 ID に基づいて、保存されているイベントを自動再生するように指定できます。この機能は、サーバーのシャットダウンや接続の切断などでコネクタのリスンが中断しているときに使用できます。保存された再生 ID が 24 時間の保持期間を経過している場合は、再生オプションがどのイベントを再生するかを決定します。
最後の再生 ID から再開機能をサポートするため、コネクタは永続的なオブジェクトストアを使用して、処理されたメッセージに関するさまざまな詳細を保存します。この機能は Salesforce Connector 10.x では、メッセージ損失の可能性を減らし、重複するメッセージの処理を回避するように拡張されています。Object Store の使用方法についての詳細は、「Object Store の使用法」を参照してください。
Salesforce Connector を使用すると、カスタムイベント通知を取得できます。これらの通知は、Salesforce データの変更とは関係のない一般的なイベントに適用されます。
カスタムイベント通知を取得する手順は、次のとおりです。
Publish streaming channel 操作を使用してストリーミングチャネルを作成します。
StreamingChannel は Salesforce の特殊なオブジェクトで、汎用のストリーミング API イベントをリスナーに通知するために使用するチャネルを表します。
Salesforce または ワークベンチを使用してストリーミングチャネルを作成することもできます。
Subscribe channel listener 操作を使用してチャネルをサブスクライブします。
Salesforce Connector は、ストリーミングチャネルのカスタムイベントを Mule イベントに変換します。
ストリーミングチャネルの操作についての詳細は、「Salesforce からデータを受信するストリーミングチャネルの作成」および 「Salesforce Connector を使用したイベントの処理」を参照してください。
Salesforce では、REST API を使用してカスタムイベントを特定のストリーミングチャネルにプッシュできます。これを行うには、 ワークベンチまたはこのコネクタを使用します。
次の例では、コネクタの push-generic-event 操作を使用して、カスタムイベントを ID 0M6j0000000KyjBCAS のチャネルにプッシュします。
<flow name="flowPushGenericEvent">
<!-- INBOUND ENDPOINT -->
<sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
<sfdc:events>
<sfdc:event payload="Notification message text"/>
</sfdc:events>
</sfdc:push-generic-event>
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
チャネル ID は、publish-streaming-channel 操作の応答マップから取得できます。または、Salesforce ページからチャネル ID を取得することもできます。
Salesforce Developer Edition 組織にログインします。
[すべてのタブ] (+) で、[ストリーミングチャネル] を選択します。
チャネルリストにチャネル ID 項目が表示されていない場合は、次の手順を実行します。
[Create New View (新規ビューの作成)] をクリックします。
[Name (名前)] 入力項目にビューの名前を入力します。
[Available Fields (選択可能な項目)] リストで、[Streaming Channel ID (ストリーミングチャネル ID)] を選択し、[Add (追加)] をクリックします。
リストに各ストリーミングチャネルの ID が表示されるはずです。
他の項目を追加します。
[Save (保存)] をクリックします。
push event 操作から応答として受信した JSON は次のようになります。
[
{
"userOnlineStatus": {
},
"fanoutCount": 0
}
]