Contact Us 1-800-596-4880

Processing Events With Salesforce Connector 10.8

You can use Anypoint Connector for Salesforce (Salesforce Connector) as an inbound connector to stream data from Salesforce into your application. To use the connector this way, use one of these source operations:

  • Subscribe topic listener

  • Subscribe channel listener

  • Replay topic listener

  • Replay channel listener

When deploying apps on a distributed environment (such as a runtime cluster), the source operation must run on the primary node. If the same source runs on multiple nodes, it causes duplicate event consumption.

Before You Begin

To process events with Salesforce Connector, ensure you meet the following prerequisites:

  • You must have access to the Salesforce target resource.

  • Before you can receive events for Salesforce changes associated with a topic, you must create the topic, if it does not exist.

  • You must have the required Salesforce Streaming API permissions enabled in your organization.

Object Store Usage

Both Salesforce Connector and Mule use an object store to persist data for features such as automatic message replay and message redelivery:

  • The object store included in Mule for on-premises deployments of Mule apps has no transaction limits.

  • The free version of Object Store for CloudHub deployments of Mule apps has a limit of 10 transactions per second.

For more information about object store versions, see Object Store Notes.

Replay Topic and Replay Channel Listener Operations

The Replay topic and Replay channel listener operations have the option to continue from the last replay ID they received before restarting the application.

When a Mule app starts for the first time, the connector creates an object store that is used to save data related to messages that were processed successfully or that failed:

  • For each message that was processed successfully, the information stored consists of a replay ID associated with the message.

  • For each message that failed, the information stored consists of a number that represents the lowest replay ID for which the processing failed.

These two structures ensure that no message is processed twice and that when the application is restarted, it can reprocess the failed messages.

For each message that comes through a topic or streaming channel to which the connector is subscribed, the connector updates the object store information using up to four transactions.

Resume from the Last Replay ID

To enable the Resume from the Last Replay Id functionality for the Replay Topic Listener and Replay Channel Listener source operations, the connector must store some information about the events that were successfully processed by the flow in which the source operation is used. The connector uses Object Store to store that information.

The way Object Store is used to store the necessary data for the Resume from the Last Replay Id varies depending on the environment:

  • When a Mule application is running locally, the connector creates a persistent store in the file system. This way, if the application is stopped and restarted, or if there was a connection issue that triggered a reconnection, the data that was saved in the object store is not lost and the connector can continue processing messages at the point where it previously stopped.

  • When running the application in a cluster, either with the nodes on the same machine, or with the nodes on different machines, a distributed in-memory object store is used. This way, if the primary node (the one consuming the messages) shuts down, the node that takes over for the primary node will not consume the same messages again, but will continue from where the previous node stopped processing the messages.
    If the cluster is stopped, the data from the object store is lost, so when the cluster restarted the messages might be reprocessed again depending on the configuration at the source level.
    If you need data to be persisted so that the application can continue processing events when restarted, the object store can be configured to use a database as the persistence layer. For more information, see Creating and Managing a Cluster).

  • When running the application in CloudHub, if you are using multiple workers, all of the workers receive and process the events.

Create a Topic to Receive Data from Salesforce

When you create a topic, the connector creates a PushTopic, which is a special object in Salesforce that binds a name (in this case, the topic’s name) and a Salesforce Object Query Language (SOQL) query together. After you create a topic, you can then subscribe to it by its name.

You can use either the Create (create) or Publish topic (publish-topic) operation to create a topic. The following example uses the publish-topic operation to create a topic:

<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>

Alternatively, you can create a topic in Salesforce by executing code such as this code from an Enter Apex Code window, which is accessible through the system logs:

PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 23.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);

Create a Streaming Channel to Receive Data from Salesforce

To create a streaming channel, you must have the proper Salesforce Streaming API permissions enabled in your organization.

Follow these steps to create a streaming channel:

  1. Log in to your Salesforce Developer Edition organization.

  2. Under All Tabs (+), select Streaming Channels.

  3. On the Streaming Channels tab, select New to create a new streaming channel.

  4. Enter /u/notifications/ExampleUserChannel in the Streaming Channel Name field.

  5. Enter an optional description.

You can also create a streaming channel by using either the connector Create operation or the connector Publish streaming channel (publish-streaming-channel) operation. The following example uses the publish-streaming-channel operation:

<sfdc:publish-streaming-channel
    name="/u/Notifications"
    description="General notifications"/>

Subscribe to a Topic

To subscribe to a topic, add the Subscribe topic listener (subscribe-topic-listener) or Replay topic listener (replay-topic-listener) as an input source for your flow. The input source acts as an inbound endpoint. Every time the subscription receives an event, the input source executes the rest of the flow in your Mule app.

In the following XML example, Mule prints a message to the log at the INFO level when the AccountUpdates topic receives an event:

<flow name="accountUpdatesSubscription">
    <!-- INBOUND ENDPOINT -->
    <sfdc:subscribe-topic-listener topic="AccountUpdates"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>

You can subscribe to a topic that was not previously published in Salesforce. However, after the topic is published, you do not receive notifications for that topic unless you resubscribe to it.

Each event that travels through your flow contains information about the Salesforce data that changed, including how the data changed and when the change occurred.

Salesforce stores events for 24 hours (or 72 hours for high-volume events). A subscriber to a topic or channel can retrieve events related to that topic or channel during the 24-hour retention window. After the retention window ends, the subscriber can retrieve newer events that have not yet expired.

Salesforce assigns each broadcast event a numeric ID. IDs are incremented, but not necessarily by 1 for each consecutive event. For example, the event following the event with ID 999 can have an ID of 1025. A broadcast event ID is unique for the organization and channel. Salesforce does not reuse the IDs of deleted events.

Subscribe to a Streaming Channel

After you create a streaming channel, you can start receiving events by subscribing to the channel. The subscribe-channel-listener input source acts like an inbound endpoint. In this example, every time a subscription to /u/TestStreaming receives an event, it executes the rest of the flow and logs a message at the INFO level:

<flow name="notificationsChannelSubscription">
  <!-- INBOUND ENDPOINT -->
  <sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
  <!-- REST OF YOUR FLOW -->
  <logger level="INFO" message="Received an event: #[payload]"/>
</flow>

The Streaming channel field of the Subscribe channel listener operation does not display change events that are available in the Salesforce environment. However, your connector can subscribe to a streaming channel to obtain this information. For example, to subscribe to the All Change Events channel, use /data/ChangeEvents as the channel name to which to subscribe.

For more information, see Subscription Channels in the Salesforce Change Data Capture Developer Guide.

Replay Messages From a Topic

A subscriber can specify which events to receive. By default, a subscriber receives only the events that occur after subscribing. Events outside the 24-hour (or 72-hour for high-volume events) retention period are discarded.

The Replay Topic Listener operation provides these options:

  • ALL

    Subscriber receives all events, including past events that are within the 24-hour (or 72-hour) retention period and new events sent after the client subscribes.

  • ONLY_NEW

    Subscriber receives new events that are broadcast after the client subscribes.

  • FROM_REPLAY_ID

    Subscriber receives all events after the specified event replayId.

If you specify either the ALL replay option or ONLY_NEW replay option, the replayId value is ignored.

The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events, based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the 24-hour retention period, the replay option determines what events to replay.

To support the Resume from Last Replay ID feature, the connector uses a persistent object store to keep different details regarding the processed messages. This feature is enhanced in Salesforce Connector 10.x to reduce message loss probability and to avoid processing duplicate messages. For more details about how Object Store is used, check the Object Store Usage section.

In the following XML example, the Replay topic listener operation acts like an inbound endpoint for the Logger component message:

<flow name="accountUpdatesReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

Replay Messages From a Streaming Channel

A streaming channel can replay notifications. The Replay channel listener input source acts as an inbound endpoint. You can use it as shown in the following example:

<flow name="flowStreamingChannelReplay">
    <!-- INBOUND ENDPOINT -->
    <sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
    <!-- REST OF YOUR FLOW -->
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

If you specify either the ALL replay option or the ONLY_NEW replay option, the replayId value is ignored.

The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events, based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the 24-hour retention period, the replay option determines what events to replay.

To support the Resume from Last Replay ID feature, the connector uses a persistent object store to keep different details regarding the processed messages. This feature is enhanced in Salesforce Connector 10.x to reduce message loss probability and to avoid processing duplicate messages. For more details about how Object Store is used, check the Object Store Usage section.

Custom Event Notifications

Salesforce Connector enables you to obtain custom event notifications. These notifications apply to general events that are not tied to Salesforce data changes.

To obtain custom event notifications:

  1. Use the Publish streaming channel operation to create a streaming channel.

    StreamingChannel is a special Salesforce object that represents a channel used to notify listeners of generic Streaming API events.

    You can also create a streaming channel through Salesforce or Workbench.

  2. Use the Subscribe channel listener operation to subscribe to the channel.

    Salesforce Connector converts the custom events in your streaming channel to Mule events.

For more information about working with streaming channels, see Create a Streaming Channel to Receive Data from Salesforce.

Push Events to a Streaming Channel

Salesforce enables you to push custom events to a specific streaming channel through the REST API. To do this, use Workbench or this connector.

The following example uses the connector’s push-generic-event operation to push custom events to the channel with the ID 0M6j0000000KyjBCAS:

<flow name="flowPushGenericEvent">
    <!-- INBOUND ENDPOINT -->
    <sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
      <sfdc:events>
            <sfdc:event payload="Notification message text"/>
        </sfdc:events>
  </sfdc:push-generic-event>
    <logger level="INFO" message="Replayed events: #[payload]"/>
</flow>

You can retrieve the channel ID from the response map of the publish-streaming-channel operation. Alternatively, you can retrieve the channel ID from the Salesforce page:

  1. Log in to your Salesforce Developer Edition organization.

  2. Under All Tabs (+), select Streaming Channels.

If the channel ID field is not visible on the channel list, follow these steps:

  1. Click Create New View.

  2. Type a name for the view in the Name input field.

  3. In the Available Fields list, select Streaming Channel ID and click Add.

    You should see the channel ID for each streaming channel in the list.

  4. Add any other fields.

  5. Click Save.

The JSON received as a response from the push event operation looks something like this:

[
  {
  "userOnlineStatus": {
  },
  "fanoutCount": 0
  }
]
View on GitHub