Salesforce Pub/Sub Connector 1.0 - Salesforce プラットフォームイベントの追跡例

Salesforce Pub/Sub Connector を使用する方法の 1 つとして、Salesforce プラットフォームイベントが MuleSoft から受信され、処理されたかどうかを追跡することができます。Salesforce と Mule Runtime Engine (Mule) の組み合わせを使用して、処理されたメッセージを追跡できます。

メッセージ追跡の詳細な例については、​「Keeping Track of Processed and Received Messages (処理および受信したメッセージの追跡)」​を参照してください。

Replay ID and MuleSoft Object Store (再生 ID と MuleSoft オブジェクトストア)

メッセージ追跡を実装するには、Salesforce Streaming API、Replay ID 属性、および MuleSoft オブジェクトストアの中核的な概念を理解することが重要です。

Replay ID (再生 ID)

Salesforce Event Bus では、各公開イベントメッセージに不透明な ID が割り当てられます。この ID は Replay (​replay_id​) 属性に含まれます。イベントメッセージがサブスクライバーに配信される際に、[Replay ID (再生 ID)] 項目値が入力され、イベントストリーム内のイベントメッセージの位置が参照されます。

イベントストリームは、大量のプラットフォームイベントに対してメッセージを 72 時間永続させるため、再生 ID を使用して、その期間内にあるイベントストリームからイベントメッセージを受信することが可能です。MuleSoft サブスクライバーは、処理された再生 ID を慎重に追跡することで、意図的または非意図的なダウンタイムや再起動によってプロセスイベントが欠落したり、重複したりしないようにする必要があります。

高度な再生 ID 追跡を有効にしてイベントメッセージが正しく受信および処理されるようにするため、コネクタ設定 UI で Salesforce Pub/Sub コネクタの再生動作を制御できます。

次の設定例では、​[Replay (再生)] オプション​のプロパティに ​[Replay id from object store (オブジェクトストアの再生 ID)]​ オプションを使用して、​[Object Store Name (オブジェクトストア名)]​ 項目で専用のオブジェクトストアを指定しています。

Subscribe channel listener の [General (一般)] プロパティ設定パネル

次の表に [Replay ID (再生 ID)] のオプションを示します。

再生オプション 説明 使用方法

Custom replay id (カスタム再生 ID)

サブスクライバーは、再生 ID 値で指定されたイベントより後のすべての保存済みイベントと新規イベントを受信します。

たとえば、接続に失敗した後など、特定のイベントメッセージの後で欠落したイベントを捕捉します。特定の再生 ID でサブスクライブするには、保存済みイベントを取得するための最後のイベントメッセージの再生 ID を保存しておき、再サブスクライブする際にその再生 ID を使用します。

Latest (最新)

再生オプションが指定されていない場合は、これがデフォルトです。サブスクライバーが、クライアントのサブスクライブ後にブロードキャストされた新規イベントを受信します。

新しいイベントメッセージを受信するには、​[Latest (最新)]​ オプションで購読するのが最適です。さらに早期のイベントメッセージを受け取るには、このオプションは使用しないでください。

Earliest (最古)

サブスクライバーが、72 時間の保持期間中の過去のイベントを含め、すべてのイベントを受信します。

接続エラーの後などに、欠落したイベントを捕捉し、すべての保存済みイベントを取得します。イベントメッセージが大量に保存されている場合は、このオプションを使用するとパフォーマンスが低下することがあります。

Replay id from object store (オブジェクトストアの再生 ID)

サブスクライバーは、オブジェクトストア内で再生 ID の値が指定した値より大きいイベントのみを受信します。値が指定されていない場合、デフォルトの ​[Earliest (最古)]​ になります。​[Object Store Name (オブジェクトストア名)]​ および ​[Object Store Key (オブジェクトストアキー)]​ 項目の値を指定して、目的のオブジェクトストアに接続し、再生 ID 値を取得します。

[Replay id from object store (オブジェクトストアの再生 ID)]​ を使用する方法についての詳細は、​「オブジェクトストアの再生 ID を保存する」​を参照してください。

MuleSoft Object Store (MuleSoft オブジェクトストア)

Mule Runtime Engine (Mule) はオブジェクトストアを使用して、以降に取得するデータを保持します。Mule は、さまざまな検索条件、ルーター、およびメッセージ間で状態を保存する必要があるその他のメッセージプロセッサーでオブジェクトストアを使用します。

CloudHub デプロイメントでは、無料の Object Store v2 では、アプリケーションごとに 1 秒あたり 10 件のトランザクション (TPS) に使用が制限されます。Object Store v2 のプレミアムアドオンでは、アプリケーションごとの上限が 100 TPS に増えます。

メッセージの追跡例

Salesforce が提供するイベントベースのインテグレーションの課題の 1 つは、特定のメッセージがサブスクライバーによって受信および処理されたかどうかを知ることです。この課題をクリアするためには、追加のロジックが必要です。

メッセージ追跡を実装するためのロジックの詳細については、​「Keeping Track of Processed and Received Messages (処理および受信したメッセージの追跡)」​を参照してください。

Batch Event Size (バッチイベントサイズ)

Salesforce Pub/Sub Connector の重要な特長として、​Subscribe Channel Listener​ 操作の ​Batch event size​ (​BatcheventSize​) 属性を使用してサーバーバッチに含まれるイベントの総数を指定することで、一定の期間内に受信および処理されるメッセージの数を制御できるという点があります。

Batch Event Size Example (バッチイベントサイズの例)

テストケースでは、Apex コードを使用して Salesforce で 500 件のレコードが作成されました。Salesforce ですべての処理が完了した後、レコード作成時のログエントリから確認メッセージのログエントリまでの「差分」となる時間を分析し、MuleSoft ワーカーの統計情報を確認しました。

その結果を要約すると次のようになります。

  • BatcheventSize​ を 1 に設定した場合と最大値の 100 に設定した場合を比較すると、Salesforce が処理を記録するまでの時間が 3 倍になりますが、それでもミリ秒単位の時間です。

  • この負荷には、CloudHub 0.1 vCore ワーカーで十分対応できます。0.2 コアワーカーを使用しても、パフォーマンスの向上はありません。

このテストケースの詳細については、​「BatcheventSize - しくみ」​を参照してください。