Contact Us 1-800-596-4880

Processing Events With Salesforce Connector 10.10

You can use Anypoint Connector for Salesforce (Salesforce Connector) as an inbound connector to stream data from Salesforce into your application. To use the connector this way, use one of these source operations:

  • Subscribe topic listener

  • Subscribe channel listener

  • Replay topic listener

  • Replay channel listener

When deploying apps on a distributed environment (such as a runtime cluster), the source operation must run on the primary node. If the same source runs on multiple nodes, it causes duplicate event consumption.

Before You Begin

To process events with Salesforce Connector, ensure you meet the following prerequisites:

  • You must have access to the Salesforce target resource.

  • Before you can receive events for Salesforce changes associated with a topic, you must create the topic, if it does not exist.

  • You must have the required Salesforce Streaming API permissions enabled in your organization.

Object Store Usage

Both Salesforce Connector and Mule use an object store to persist data for features such as automatic message replay and message redelivery:

  • The object store included in Mule for on-premises deployments of Mule apps has no transaction limits.

  • The free version of Object Store for CloudHub deployments of Mule apps has a limit of 10 transactions per second.

For more information about object store versions, see Object Store Notes.

Replay Topic and Replay Channel Listener Operations

The Replay topic and Replay channel listener operations have the option to continue from the last replay ID they received before restarting the application.

When a Mule app starts for the first time, the connector creates an object store that is used to save data related to messages that were processed successfully or that failed:

  • For each message that was processed successfully, the information stored consists of a replay ID associated with the message.

  • For each message that failed, the information stored consists of a number that represents the lowest replay ID for which the processing failed.

These two structures ensure that no message is processed twice and that when the application is restarted, it can reprocess the failed messages.

For each message that comes through a topic or streaming channel to which the connector is subscribed, the connector updates the object store information using up to four transactions.

If you have multiple workers deployed in CloudHub, all Mule instances process each message when using the Replay Topic or Replay Channel operations.

To avoid multiple Mule instances processing the same message, you can do one of the following:

  • Modify your app to filter out duplicate messages.

  • Designate a single worker as a receiver and then use a shared queue to pass the messages to a different worker to process the message.

Resume from the Last Replay ID

To enable the Resume from the Last Replay Id functionality for the Replay Topic Listener and Replay Channel Listener source operations, the connector must store some information about the events that were successfully processed by the flow in which the source operation is used. The connector uses Object Store to store that information.

The way Object Store is used to store the necessary data for the Resume from the Last Replay Id varies depending on the environment:

  • When a Mule application is running locally, the connector creates a persistent store in the file system. This way, if the application is stopped and restarted, or if there was a connection issue that triggered a reconnection, the data that was saved in the object store is not lost and the connector can continue processing messages at the point where it previously stopped.

  • When running the application in a cluster, either with the nodes on the same machine, or with the nodes on different machines, a distributed in-memory object store is used. This way, if the primary node (the one consuming the messages) shuts down, the node that takes over for the primary node will not consume the same messages again, but will continue from where the previous node stopped processing the messages.
    If the cluster is stopped, the data from the object store is lost, so when the cluster restarted the messages might be reprocessed again depending on the configuration at the source level.
    If you need data to be persisted so that the application can continue processing events when restarted, the object store can be configured to use a database as the persistence layer. For more information, see Creating and Managing a Cluster).

  • When running the application in CloudHub, if you are using multiple workers, all of the workers receive and process the events.

Multiple Flows That Use the Same Source

It is best that you do not have two or more flows using a source that listens to events on the same channel or topic because the events will be consumed twice from your user quota. If you have a use case that requires processing the same events in two or more different flows, consider implementing that logic in a single flow instead.

Having multiple sources listening to the same events and having the Resume from the Last Replay Id option enabled leads to data corruption and potential loss of events.

Streaming Channel with No Listener

Users can push events to a streaming channel, even if the channel does not have a listener for reading published events. Once a listener is started for the channel, Salesforce Streaming API pushes as many messages it can to the listener, based on the maximum daily limit.

For example, given that the maximum number of delivered event notifications within a 24-hour period for a free Salesforce organization is 10,000, suppose you publish 15,000 events to that channel. When Salesforce Connector subscribes to that channel, the Streaming API attempts to push 10,000 events, thus consuming the daily quota. The API will then attempt to push the remaining 5,000 events the next day before pushing any new events.

In this scenario, the connector streams the events one-by-one into the Mule app. If the app takes too long to process a message, the Salesforce Streaming API might instruct the connector to reconnect. If this happens, the Streaming API drops all of the unprocessed messages.

You can avoid this situation by implementing a reliability pattern, as described in Reliability Patterns.

Create a Topic to Receive Data from Salesforce

When you create a topic, the connector creates a PushTopic, which is a special object in Salesforce that binds a name (in this case, the topic’s name) and a Salesforce Object Query Language (SOQL) query together. After you create a topic, you can then subscribe to it by its name.

You can use either the Create (create) or Publish topic (publish-topic) operation to create a topic. The following example uses the publish-topic operation to create a topic:

<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>

Alternatively, you can create a topic in Salesforce by executing code such as this code from an Enter Apex Code window, which is accessible through the system logs:

PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 23.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);

Create a Streaming Channel to Receive Data from Salesforce

To create a streaming channel, you must have the proper Salesforce Streaming API permissions enabled in your organization.

Follow these steps to create a streaming channel:

  1. Log in to your Salesforce Developer Edition organization.

  2. Under All Tabs (+), select Streaming Channels.

  3. On the Streaming Channels tab, select New to create a new streaming channel.

  4. Enter /u/notifications/ExampleUserChannel in the Streaming Channel Name field.

  5. Enter an optional description.

You can also create a streaming channel by using either the connector Create operation or the connector Publish streaming channel (publish-streaming-channel) operation. The following example uses the publish-streaming-channel operation:

<sfdc:publish-streaming-channel
    name="/u/Notifications"
    description="General notifications"/>

Subscribe to a Topic

To subscribe to a topic, add the Subscribe topic listener (subscribe-topic-listener) or Replay topic listener (replay-topic-listener) as an input source for your flow. The input source acts as an inbound endpoint. Every time the subscription receives an event, the input source executes the rest of the flow in your Mule app.

In the following XML example, Mule prints a message to the log at the INFO level when the AccountUpdates topic receives an event:

<flow name="accountUpdatesSubscription">
    <!-- INBOUND ENDPOINT -->
    <sfdc:subscribe-topic-listener topic="AccountUpdates"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>

You can subscribe to a topic that was not previously published in Salesforce. However, after the topic is published, you do not receive notifications for that topic unless you resubscribe to it.

Each event that travels through your flow contains information about the Salesforce data that changed, including how the data changed and when the change occurred.

Salesforce stores events for 24 hours (or 72 hours for high-volume events). A subscriber to a topic or channel can retrieve events related to that topic or channel during the 24-hour retention window. After the retention window ends, the subscriber can retrieve newer events that have not yet expired.

Salesforce assigns each broadcast event a numeric ID. IDs are incremented, but not necessarily by 1 for each consecutive event. For example, the event following the event with ID 999 can have an ID of 1025. A broadcast event ID is unique for the organization and channel. Salesforce does not reuse the IDs of deleted events.

Subscribe to a Streaming Channel

After you create a streaming channel, you can start receiving events by subscribing to the channel. The subscribe-channel-listener input source acts like an inbound endpoint. In this example, every time a subscription to /u/TestStreaming receives an event, it executes the rest of the flow and logs a message at the INFO level:

<flow name="notificationsChannelSubscription">
  <!-- INBOUND ENDPOINT -->
  <sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
  <!-- REST OF YOUR FLOW -->
  <logger level="INFO" message="Received an event: #[payload]"/>
</flow>

The Streaming channel field of the Subscribe channel listener operation does not display change events that are available in the Salesforce environment. However, your connector can subscribe to a streaming channel to obtain this information. For example, to subscribe to the All Change Events channel, use /data/ChangeEvents as the channel name to which to subscribe.

For more information, see Subscription Channels in the Salesforce Change Data Capture Developer Guide.

Replay Messages From a Topic

A subscriber can specify which events to receive. By default, a subscriber receives only the events that occur after subscribing. Events outside the 24-hour (or 72-hour for high-volume events) retention period are discarded.

The Replay Topic Listener operation provides these options:

  • ALL

    Subscriber receives all events, including past events that are within the 24-hour (or 72-hour) retention period and new events sent after the client subscribes.

  • ONLY_NEW

    Subscriber receives new events that are broadcast after the client subscribes.

  • FROM_REPLAY_ID

    Subscriber receives all events after the specified event replayId.

If you specify either the ALL replay option or ONLY_NEW replay option, the replayId value is ignored.

The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events, based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the 24-hour retention period, the replay option determines what events to replay.

To support the Resume from Last Replay ID feature, the connector uses a persistent object store to keep different details regarding the processed messages. This feature is enhanced in Salesforce Connector 10.x to reduce message loss probability and to avoid processing duplicate messages. For more details about how Object Store is used, check the Object Store Usage section.

In the following XML example, the Replay topic listener operation acts like an inbound endpoint for the Logger component message:

<flow name="accountUpdatesReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

Replay Messages From a Streaming Channel

A streaming channel can replay notifications. The Replay channel listener input source acts as an inbound endpoint. You can use it as shown in the following example:

<flow name="flowStreamingChannelReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

If you specify either the ALL replay option or the ONLY_NEW replay option, the replayId value is ignored.

The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events, based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the 24-hour retention period, the replay option determines what events to replay.

To support the Resume from Last Replay ID feature, the connector uses a persistent object store to keep different details regarding the processed messages. This feature is enhanced in Salesforce Connector 10.x to reduce message loss probability and to avoid processing duplicate messages. For more details about how Object Store is used, check the Object Store Usage section.

Custom Event Notifications

Salesforce Connector enables you to obtain custom event notifications. These notifications apply to general events that are not tied to Salesforce data changes.

To obtain custom event notifications:

  1. Use the Publish streaming channel operation to create a streaming channel.

    StreamingChannel is a special Salesforce object that represents a channel used to notify listeners of generic Streaming API events.

    You can also create a streaming channel through Salesforce or Workbench.

  2. Use the Subscribe channel listener operation to subscribe to the channel.

    Salesforce Connector converts the custom events in your streaming channel to Mule events.

For more information about working with streaming channels, see Create a Streaming Channel to Receive Data from Salesforce.

Push Events to a Streaming Channel

Salesforce enables you to push custom events to a specific streaming channel through the REST API. To do this, use Workbench or this connector.

The following example uses the connector’s push-generic-event operation to push custom events to the channel with the ID 0M6j0000000KyjBCAS:

<flow name="flowPushGenericEvent">
    <!-- INBOUND ENDPOINT -->
    <sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
      <sfdc:events>
            <sfdc:event payload="Notification message text"/>
        </sfdc:events>
  </sfdc:push-generic-event>
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

You can retrieve the channel ID from the response map of the publish-streaming-channel operation. Alternatively, you can retrieve the channel ID from the Salesforce page:

  1. Log in to your Salesforce Developer Edition organization.

  2. Under All Tabs (+), select Streaming Channels.

If the channel ID field is not visible on the channel list, follow these steps:

  1. Click Create New View.

  2. Type a name for the view in the Name input field.

  3. In the Available Fields list, select Streaming Channel ID and click Add.

    You should see the channel ID for each streaming channel in the list.

  4. Add any other fields.

  5. Click Save.

The JSON received as a response from the push event operation looks something like this:

[
  {
  "userOnlineStatus": {
  },
  "fanoutCount": 0
  }
]
View on GitHub