PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 23.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);
Processing Events with Salesforce Connector 10.18 - Mule 4
You can use Anypoint Connector for Salesforce (Salesforce Connector) as an inbound connector to stream data from Salesforce into your application. To use the connector this way, use one of these sources:
-
Subscribe topic listener
-
Subscribe channel listener
-
Replay topic listener
-
Replay channel listener
When deploying apps in a distributed environment such as a runtime cluster, the source must run on the primary node. Running the source on multiple nodes causes duplicate event consumption and object store data corruption for the data stored by Salesforce Connector.
Each event that travels through your flow contains information about the Salesforce data that changed, including how the data changed and when the change occurred.
Salesforce stores events for 24 hours (or 72 hours for high-volume events). A subscriber to a topic or channel can retrieve events related to that topic or channel during this retention window, after which the subscriber can retrieve only newer events that have not yet expired.
Salesforce assigns each broadcast event a numeric ID that is unique to the organization and channel. IDs are incremented, but not necessarily consecutively; for example, the event following event 999
might be event 1025
. Salesforce does not reuse the IDs of deleted events.
Before You Begin
To process events with Salesforce Connector, ensure that you have:
-
Access to the Salesforce target resource
-
The required Salesforce Streaming API permissions enabled in your organization
To receive events for Salesforce changes associated with a topic, either create a new topic or use an existing one.
Object Store Usage in Events
Both Salesforce Connector and Mule runtime engine (Mule) use an object store to persist data for features such as automatic message replay and message redelivery:
-
The object store included in Mule for on-premises deployments of Mule apps has no transaction limits.
-
The free version of Object Store for CloudHub deployments of Mule apps has a limit of 10 transactions per second (TPS), however other Anypoint Connectors using the object store internally have a limit of 100 TPS.
For more information about Object Store versions, see Object Store Notes.
Replay Topic Listener and Replay Channel Listener Sources
The Replay topic listener and Replay channel listener sources can continue from the last replay ID they received before restarting the application.
When a Mule app starts for the first time, the connector creates an object store that saves data:
-
If there are no failed events, the source starts from the event associated with the last successfully processed event ID.
-
If there are one or more failed events, the source starts with the event associated with the lowest replay ID that failed.
Saving this change data capture in an object store ensures that no message is processed twice and that when the application restarts, it can reprocess any failed messages. The IDs are stored in the object store for 72 hours, after which they are automatically removed.
The connector contains an in-memory structure that stores the replay IDs of events, including events that process successfully or events that fail to process. This in-memory structure uses the username in the connector configuration to persist into the object store to build a specific key. A redundancy mechanism avoids processing duplicate events by storing the same data under a backup key. Each persist operation consists of deleting the existing key and storing the new value, and if storing the new value fails, the connector retries, thus consuming 2 or more TPS per key.
If you have multiple workers deployed in CloudHub, the Replay topic listener and Replay channel listener sources process each message on all Mule instances. To avoid this situation, you can deploy your application on a single worker and then use a shared queue to pass the messages to a different worker to process the message.
Resume from the Last Replay ID
To enable the resume from the last replay ID functionality for the Replay topic listener and Replay channel listener sources, the connector must store information about the events that were successfully processed by the flow in which the source is used. The connector uses an object store to store that information.
The way that the object store stores the necessary data for the resume from the last replay ID functionality varies depending on the environment:
-
When a Mule app runs locally, the connector creates a persistent store in the file system.
If the application stops and restarts, or if a connection issue triggers a reconnection, the data saved in the object store enables the connector to continue processing messages at the point where it stopped.
-
When a Mule app runs in a cluster with the nodes either on the same machine or on different machines, Mule uses a distributed in-memory object store:
-
If the primary node (the one consuming the messages) shuts down, the node that takes over for the primary node does not reprocess the same messages. Instead, the node continues from where the previous node stopped processing the messages.
-
If the cluster stops, the data from the object store is lost. When the cluster restarts, the configuration at the source level determines whether the connector reprocesses the messages.
To persist data so that the app continues processing events when restarted, create a persistent object store, as described in Create and Manage a Cluster Manually.
-
-
If you use multiple workers when running the app in CloudHub, all of the workers receive and process the events.
Replay Sources Field Configurations
Learn how to handle the priority in the replay sources field setup by using different configurations:
-
Replay failed events if any or resume from last replay id
If enabled, this field overwrites anything selected in the Replay option field and anything entered in the Replay id field.
The Replay failed events if any or resume from last replay id field checks the Object Store for failed event IDs or last known successfully processed event IDs. If there are failed events, processing starts with the lowest failed event, thus receiving all events from
lowestFailedEventId
until now. For example, ifFailedEventIdInObjectStore
is10
andLastSuccessfullyProcessedEventIdInObjectStore
is15
, processing starts with 10 and the streaming client receives events 10 through 15 and consumes the quota. However, these events are not reprocessed by the flow and are filtered by the connector.If there are no failed event IDs stored, then processing starts with the highest known replay ID. If there is nothing saved in the Object Store, then the Replay option field is used.
-
ReplayOption.ALL
Subscribes with
-2
. The user receives all events available in Salesforce. -
ReplayOption.ONLY_NEW
Subscribes with
-1
. The user receives the events created after the connector subscribes. -
ReplayOption.FROM_REPLAY_ID
Subscribes with what the user enters in the Replay id field.
-
ReplayOption.FROM_LAST_REPLAY_ID
Checks the Object Store for the highest received event ID. It doesn’t matter whether the event is processed successfully. This option avoids quota consumption compared to the Replay failed events if any or resume from last replay id field, which starts the source with a failed event ID that is lower than the highest processed event ID.
-
Cache events in memory
If enabled, when the application starts, the connector subscribes from a replay ID. The Salesforce API pushes all events that start with the replay ID that is used to subscribe, thus consuming quota.
If there are many available events and the Mule app processes them slowly, it can take a while to process all of them.
With this option enabled, the events pushed by the API are stored in memory and are consumed sequentially in the order they are received.
If there is a connection issue or the token expires and the connector must resubscribe, the events would be lost without this configuration and the quota would be consumed.
Multiple Flows That Use the Same Source
If you have two or more flows that use a source that listens to events on the same channel or topic, the events are consumed two or more times from your user quota. To avoid this, consider implementing that logic in a single flow instead.
Having multiple sources listening to the same events and having the Resume from the Last Replay Id option enabled can lead to data corruption and the potential loss of events. |
Multiple Flows with Different Sources
Having two or more flows that use different sources with the Replay Failed Events If Any or Resume from Last Replay Id option enabled, under certain conditions, might have performance implications.
To avoid performance implications, use different Salesforce Connector configurations with different Salesforce usernames.
Changing the username for a configuration prevents the connector from loading the last processed and failed replay IDs. Processing resumes from the first event available in Salesforce. |
Avoid using personal user accounts to access Salesforce. |
Work with Topic Events
Work with topic events by creating a topic, subscribing to a topic, and replaying topic messages.
Create a Topic to Receive Data from Salesforce
When you create a topic, the connector creates a PushTopic
, which is a special object in Salesforce that binds a name (in this case, the topic’s name) and a Salesforce Object Query Language (SOQL) query together. After you create a topic, you can subscribe to it by its name.
You can use either the Create (create
) operation or Publish topic (publish-topic
) operation to create a topic. The following example uses the publish-topic
operation to create a topic:
<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>
Alternatively, you can create a topic in Salesforce by executing code from an Enter Apex Code window, which is accessible through the system logs, for example:
Subscribe to a Topic
To subscribe to a topic, add either the Subscribe topic listener (subscribe-topic-listener
) or the Replay topic listener (replay-topic-listener
) as a source for your flow. The source acts as an inbound endpoint. Every time the subscription receives an event, the source executes the rest of the flow in your Mule app.
In the following XML example, Mule prints a message to the log at the INFO level when the AccountUpdates
topic receives an event:
<flow name="accountUpdatesSubscription">
<!-- INBOUND ENDPOINT -->
<sfdc:subscribe-topic-listener topic="AccountUpdates"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>
You can subscribe to a topic that was not previously published in Salesforce. However, after the topic is published, you won’t receive notifications for that topic unless you resubscribe to it.
Replay Messages from a Topic
A subscriber can specify which events to receive. By default, a subscriber receives only the events that occur after subscribing. Events outside of the 24-hour retention period (or 72-hour retention period for high-volume events) are discarded.
The Replay topic listener source provides these options:
-
ALL
Subscriber receives all events, including past events that are within the 24-hour (or 72-hour) retention period and new events that are sent after the client subscribes.
-
ONLY_NEW
Subscriber receives new events that are sent after the client subscribes.
-
FROM_REPLAY_ID
Subscriber receives all events after the specified event
replayId
.
If you specify either the ALL
replay option or ONLY_NEW
replay option, the replayId
value is ignored.
The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the 24-hour retention period, the replay option determines which events to replay.
To support the resume from last replay ID functionality, the connector uses a persistent object store to keep different details regarding the processed messages. This feature reduces the possibility of message loss and avoids processing duplicate messages. For more information about how the object store is used, see Object Store Usage in Events.
In the following XML example, the Replay topic listener operation (replay-topic-listener
) acts like an inbound endpoint for the Logger component message:
<flow name="accountUpdatesReplay">
<!-- INBOUND ENDPOINT -->
<sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
Work with Streaming Channel Events
Work with streaming channel events by creating a streaming channel, subscribing to a streaming channel, replaying streaming channel events, and pushing events to a streaming channel.
Create a Streaming Channel
To create a streaming channel, you must have the proper Salesforce Streaming API permissions enabled in your organization.
Follow these steps to create a streaming channel:
-
Log in to your Salesforce Developer Edition organization.
-
Under All Tabs (+), select Streaming Channels.
-
On the Streaming Channels tab, select New.
-
Enter
/u/notifications/ExampleUserChannel
in the Streaming Channel Name field. -
Enter an optional description.
You can also create a streaming channel by using either the connector Create operation or the connector Publish streaming channel (publish-streaming-channel
) operation. The following example uses the publish-streaming-channel
operation:
<sfdc:publish-streaming-channel
name="/u/Notifications"
description="General notifications"/>
Subscribe to a Streaming Channel
After you create a streaming channel, you can start receiving events by subscribing to the channel. The Subscribe channel listener (subscribe-channel-listener
) source acts like an inbound endpoint. In this example, every time a subscription to /u/TestStreaming
receives an event, it executes the rest of the flow and logs a message at the INFO level:
<flow name="notificationsChannelSubscription">
<!-- INBOUND ENDPOINT -->
<sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Received an event: #[payload]"/>
</flow>
The Streaming channel field of the Subscribe channel listener operation does not display change events that are available in the Salesforce environment. However, your connector can subscribe to a streaming channel to obtain this information. For example, to subscribe to the All Change Events
channel, use /data/ChangeEvents
as the channel name to which to subscribe.
For more information, see Subscription Channels in the Salesforce Change Data Capture Developer Guide.
Replay Messages from a Streaming Channel
A streaming channel can replay notifications. The Replay channel listener (replay-channel-listener
) source acts as an inbound endpoint, for example:
<flow name="flowStreamingChannelReplay">
<!-- INBOUND ENDPOINT -->
<sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
If you specify either the ALL
replay option or the ONLY_NEW
replay option, the replayId
value is ignored.
The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the 24-hour retention period, the replay option determines which events to replay.
To support the resume from the last replay ID feature, the connector uses a persistent object store to keep details regarding the processed messages. This feature reduces the possibility of message loss and avoids processing duplicate messages. For more information about how the object store is used, see Object Store Usage in Events.
Push Events to a Streaming Channel with No Listener
Users can push events to a streaming channel, even if the channel does not have a listener for reading published events. After a listener is started for the channel, Salesforce Streaming API pushes as many messages as it can to the listener, based on the maximum daily limit.
For example, the maximum number of delivered event notifications within a 24-hour period for a free Salesforce organization is 10,000. Suppose you publish 15,000 events to that channel. When Salesforce Connector subscribes to that channel, Streaming API attempts to push 10,000 events, thus consuming the daily quota. The API then attempts to push the remaining 5,000 events the next day, before pushing any new events.
In this scenario, the connector streams the events one-by-one into the Mule app. If the app takes too long to process a message, Streaming API might instruct the connector to reconnect. If this happens, Streaming API drops all of the unprocessed messages. You can avoid this situation by implementing a reliability pattern, as described in Reliability Patterns.
Work with Custom Events
Work with custom events by obtaining custom event notifications and pushing custom events to a streaming channel.
Obtain Custom Event Notifications
Salesforce Connector enables you to obtain custom event notifications, which apply to general events that are not tied to Salesforce data changes.
To obtain custom event notifications:
-
Use the Publish streaming channel operation to create a streaming channel.
StreamingChannel
is a special Salesforce object that represents a channel used to notify listeners of generic Streaming API events.You can also create a streaming channel through Salesforce or Workbench.
-
Use the Subscribe channel listener operation to subscribe to the channel.
Salesforce Connector converts the custom events in your streaming channel to Mule events.
Push Custom Events to a Streaming Channel
Salesforce enables you to push custom events to a specific streaming channel through the REST API. To do this, use Workbench or this connector.
The following example uses the connector’s Push generic event (push-generic-event
) operation to push custom events to the channel with the ID 0M6j0000000KyjBCAS
:
<flow name="flowPushGenericEvent">
<!-- INBOUND ENDPOINT -->
<sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
<sfdc:events>
<sfdc:event payload="Notification message text"/>
</sfdc:events>
</sfdc:push-generic-event>
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
You can retrieve the channel ID from the response map of the Publish streaming channel operation. Alternatively, you can retrieve the channel ID from the Salesforce page:
-
Log in to your Salesforce Developer Edition organization.
-
Under All Tabs (+), select Streaming Channels.
If the channel ID field is not visible on the channel list, follow these steps:
-
Click Create New View.
-
Type a name for the view in the Name input field.
-
In the Available Fields list, select Streaming Channel ID and click Add.
You should see the channel ID for each streaming channel in the list.
-
Add any other fields.
-
Click Save.
The JSON received as a response from the push event operation looks something like this:
[
{
"userOnlineStatus": {
},
"fanoutCount": 0
}
]