<flow name="SampleFlow" doc:id="d25ff96a-aec9-45ee-89f2-74080fb83b45" >
<salesforce:subscribe-channel doc:name="Subscribe channel" doc:id="cb21f452-9280-41f8-ba52-93c49a03ea38" config-ref="Salesforce_Config"
streamingChannel="/event/Cloud_News__e"/>
<logger level="INFO" doc:name="Logger" doc:id="5ebd77bf-87de-4f55-ab81-2af3abbe2bee" message="#[payload]"/>
</flow>
Processing Events with Salesforce Connector 10.22 - Mule 4
You can use Anypoint Connector for Salesforce (Salesforce Connector) as an inbound connector to stream data from Salesforce into your application. To use the connector this way, you can use these sources for any type of events (platform events, change data capture events, and the legacy PushTopic and generic events):
-
Subscribe topic listener
-
Subscribe channel listener
-
Replay topic listener
-
Replay channel listener
If you’re adding a new connector for platform events and change data capture events, you can use Salesforce Pub/Sub Connector, which is based on the Pub/Sub API. |
When deploying apps in a distributed environment such as a runtime cluster, the source must run on the primary node. Running the source on multiple nodes causes duplicate event consumption and object store data corruption for the data stored by Salesforce Connector.
Each event that travels through your flow contains information about the Salesforce data that changed, including how the data changed and when the change occurred.
Salesforce stores events for 72 hours (or 24 hours for legacy PushTopic and generic events). A subscriber to a topic or channel can retrieve events related to that topic or channel during this retention window, after which the subscriber can retrieve only newer events that haven’t yet expired.
Salesforce assigns a replay ID to each broadcast event, which you can use to replay stored event messages. For more information, refer to Message Durability in the Streaming API Developer Guide.
Migrating a Salesforce instance to Hyperforce can result in an invalid replay ID error. This occurs if the Mule app tries to access a replay ID from an object store that is no longer valid on the new Hyperforce instance. To avoid this error, see this knowledge article. |
Before You Begin
To process events with Salesforce Connector, ensure that you have:
-
Access to the Salesforce target resource
-
The required Salesforce Streaming API permissions enabled in your organization
To receive events for Salesforce changes associated with a topic, either create a new topic or use an existing one.
Object Store Usage in Events
Both Salesforce Connector and Mule runtime engine (Mule) use an object store to persist data for features such as automatic message replay and message redelivery:
-
The object store included in Mule for on-premises deployments of Mule apps has no transaction limits.
-
The free version of Object Store for CloudHub and CloudHub 2.0 deployments of Mule apps has a limit of 10 transactions per second (TPS), however other Anypoint Connectors using the object store internally have a limit of 100 TPS.
For more information about Object Store versions, see Object Store Notes.
Replay Topic Listener and Replay Channel Listener Sources
The Replay topic listener and Replay channel listener sources can continue from the last replay ID they received before restarting the application.
When a Mule app starts for the first time, the connector creates an object store that saves data:
-
If there are no failed events, the source starts from the event associated with the last successfully processed event ID.
-
If there are one or more failed events, the source starts with the event associated with the lowest replay ID that failed.
Saving this change data capture in an object store ensures that no message is processed twice and that when the application restarts, it can reprocess any failed messages. The IDs are stored in the object store for 72 hours, after which they are automatically removed.
The connector contains an in-memory structure that stores the replay IDs of events, including events that process successfully or events that fail to process. This in-memory structure uses the username in the connector configuration to persist into the object store to build a specific key. A redundancy mechanism avoids processing duplicate events by storing the same data under a backup key. Each persist operation consists of deleting the existing key and storing the new value, and if storing the new value fails, the connector retries, thus consuming 2 or more TPS per key.
If you have multiple workers deployed in CloudHub, the Replay topic listener and Replay channel listener sources process each message on all Mule instances. To avoid this situation, you can deploy your application on a single worker and then use a shared queue to pass the messages to a different worker to process the message.
Resume from the Last Replay ID
To enable the resume from the last replay ID functionality for the Replay topic listener and Replay channel listener sources, the connector must store information about the events that were successfully processed by the flow in which the source is used. The connector uses an object store to store that information.
The way that the object store stores the necessary data for the resume from the last replay ID functionality varies depending on the environment:
-
When a Mule app runs locally, the connector creates a persistent store in the file system.
If the application stops and restarts, or if a connection issue triggers a reconnection, the data saved in the object store enables the connector to continue processing messages at the point where it stopped.
-
When a Mule app runs in a cluster with the nodes either on the same machine or on different machines, Mule uses a distributed in-memory object store:
-
If the primary node (the one consuming the messages) shuts down, the node that takes over for the primary node does not reprocess the same messages. Instead, the node continues from where the previous node stopped processing the messages.
-
If the cluster stops, the data from the object store is lost. When the cluster restarts, the configuration at the source level determines whether the connector reprocesses the messages.
To persist data so that the app continues processing events when restarted, create a persistent object store, as described in Create and Manage a Cluster Manually.
-
-
If you use multiple workers when running the app in CloudHub, all of the workers receive and process the events.
Replay Sources Field Configurations
Learn how to handle the priority in the replay sources field setup by using different configurations:
-
Replay failed events if any or resume from last replay id
If enabled, this field overwrites anything selected in the Replay option field and anything entered in the Replay id field.
The Replay failed events if any or resume from last replay id field checks the Object Store for failed event IDs or last known successfully processed event IDs. If there are failed events, processing starts with the lowest failed event, thus receiving all events from
lowestFailedEventId
until now. For example, ifFailedEventIdInObjectStore
is10
andLastSuccessfullyProcessedEventIdInObjectStore
is15
, processing starts with 10 and the streaming client receives events 10 through 15 and consumes the quota. However, these events are not reprocessed by the flow and are filtered by the connector.If there are no failed event IDs stored, then processing starts with the highest known replay ID. If there is nothing saved in the Object Store, then the Replay option field is used.
-
ReplayOption.ALL
Subscribes with
-2
. The user receives all events available in Salesforce. -
ReplayOption.ONLY_NEW
Subscribes with
-1
. The user receives the events created after the connector subscribes. -
ReplayOption.FROM_REPLAY_ID
Subscribes with what the user enters in the Replay id field.
-
ReplayOption.FROM_LAST_REPLAY_ID
Checks the Object Store for the highest received event ID. It doesn’t matter whether the event is processed successfully. This option avoids quota consumption compared to the Replay failed events if any or resume from last replay id field, which starts the source with a failed event ID that is lower than the highest processed event ID.
-
Cache events in memory
If enabled, when the application starts, the connector subscribes from a replay ID. The Salesforce API pushes all events that start with the replay ID that is used to subscribe, thus consuming quota.
If there are many available events and the Mule app processes them slowly, it can take a while to process all of them.
With this option enabled, the events pushed by the API are stored in memory and are consumed sequentially in the order they are received.
If there is a connection issue or the token expires and the connector must resubscribe, the events would be lost without this configuration and the quota would be consumed.
The connector interacts with Object Store to store and retrieve data only when the Replay Option field is set to In CloudHub and CloudHub 2.0, when the Replay Option field is set to |
Multiple Flows That Use the Same Source
If you have two or more flows that use a source that listens to events on the same channel or topic, the events are consumed two or more times from your user quota. To avoid this, consider implementing that logic in a single flow instead.
Having multiple sources listening to the same events and having the Resume from the Last Replay Id option enabled can lead to data corruption and the potential loss of events. |
Multiple Flows with Different Sources
Having two or more flows that use different sources with the Replay Failed Events If Any or Resume from Last Replay Id option enabled, under certain conditions, might have performance implications.
To avoid performance implications, use different Salesforce Connector configurations with different Salesforce usernames.
Changing the username for a configuration prevents the connector from loading the last processed and failed replay IDs. Processing resumes from the first event available in Salesforce. |
Avoid using personal user accounts to access Salesforce. |
Working with Platform Events
Subscribe to platform events by using Salesforce Connector. The following example is based on the platform events in the Define and Publish Platform Events Trailhead.
To receive platform events from Salesforce by using Salesforce Connector:
-
Log in to Salesforce and create a Salesforce platform event, such as
Cloud_News__e
. -
Go to Anypoint Studio and create a Mule app.
-
Drag the Subscribe Channel Listener source to the canvas and in the Streaming Channel field, specify
/event/Cloud_News__e
. -
Drag a Logger component to the canvas to show the payload so you can see the received message on the console, for example:
-
Publish a platform event message to
Cloud_News__e
. You can use Apex code, process, Salesforce Flow, or Salesforce APIs. For example, you can send a POST request as follows:{ "Location__c" : "Mountain City", "Urgent__c" : true, "News_Content__c" : "Lake Road is closed due to mudslides." }
The console appears as follows:
INFO 2019-10-26 16:11:50,483 [[MuleRuntime].cpuLight.05: [test].SampleFlow.CPU_LITE @2b42bef0] [event: e00096e0-f7bf-11e9-b534-8c85907d741e] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {data={schema=eGRz2Sfoy-YO9mVvH8J4fg, payload= {News_Content__c=Lake Road is closed due to mudslides., CreatedById=0050o00000U3Q8vAAF, CreatedDate=2019-10-26T07:12:01.026Z, Location__c=Mountain City, Urgent__c=true}, event={replayId=49544589}}, channel=/event/Cloud_News__e}
Working with Change Data Capture Events
Subscribe to change data capture events, such as Salesforce object creations, updates, or deletions, by using Salesforce Connector. This example uses the Replay Channel Listener source, which enables you to replay events. You can use the Subscribe Channel Listener source instead if you don’t want to replay any events.
-
Log in to Salesforce.
-
Go to Setup > Integrations > Change Data Capture and select a Salesforce object, such as Account.
-
Go to Anypoint Studio and create a Mule app.
-
Drag the Replay Channel Listener source to the canvas and in the Streaming Channel field, specify
/data/AccountChangeEvent
. For a list of channel names, refer to Subscription Channels. -
Drag a Logger component to the canvas to show the payload so you can see the received message on the console, for example:
<flow name="ytaoka-salesforce-replaychannelFlow" doc:id="01d0fd5c-f777-4eda-a167-a931ef240f65" > <salesforce:replay-channel streamingChannel="/data/AccountChangeEvent" replayOption="ONLY_NEW" doc:name="Replay channel" doc:id="c036e7c5-86ed-4904-ae34-185ea42319e9" config-ref="Salesforce_Config" replayId="-1" autoReplay="true"/> <logger level="INFO" doc:name="Logger" doc:id="978f0aad-ab09-4910-bc54-a7c3dcc5935c" message="#[payload]"/> </flow>
-
Log in to Salesforce and update any Account record; for example, update the name of the account to
TestName2
. The console appears as follows:INFO 2019-11-12 08:17:27,496 [[MuleRuntime].cpuLight.05: [sample].sampleFlow.CPU_LITE @14741f50] [event: xxx] org.mule.runtime.core.internal.processor.LoggerMessageProcessor: {data={schema=CEjkFTwpfASSecY9UGNoOg, payload={LastModifiedDate=2019-11-11T23:17:30.000Z, ChangeEventHeader={commitNumber=10743571519745, commitUser=0050o00000XTesxAAD, sequenceNumber=1, entityName=Account, changeType=UPDATE, changedFields=[Ljava.lang.Object;@4f738b9d, changeOrigin=com/salesforce/api/soap/47.0;client=SfdcInternalAPI/, transactionKey=0002463d-1e88-1d80-5638-15c821f06b79, commitTimestamp=1573514251000, recordIds=[Ljava.lang.Object;@6e812151}, Name=TestName2}, event={replayId=1065378}}, channel=/data/AccountChangeEvent}
Working with PushTopic Events (Legacy)
Work with PushTopic events by creating a PushTopic, subscribing to a PushTopic, and replaying PushTopic messages.
PushTopic events is a legacy product. Salesforce no longer enhances PushTopic events with new features and provides limited support for this product. Instead of PushTopic events, use Change Data Capture events, as described in the Change Data Capture Developer Guide and the Change Data Capture Basics Trailhead module. |
Creating a PushTopic to Receive Data from Salesforce
When you create a topic, the connector creates a PushTopic
, which is a special object in Salesforce that binds a name (in this case, the topic’s name) and a Salesforce Object Query Language (SOQL) query together. After creating a PushTopic, you can subscribe to it by name.
You can use either the Create (create
) operation or Publish topic (publish-topic
) operation to create a PushTopic. The following example uses the publish-topic
operation to create a PushTopic:
<sfdc:publish-topic name="AccountUpdates" query="SELECT Id, Name FROM Account"/>
Alternatively, you can create a topic in Salesforce by executing code from an Enter Apex Code window, which is accessible through the system logs, for example:
PushTopic pushTopic = new PushTopic();
pushTopic.ApiVersion = 50.0;
pushTopic.Name = 'AllAccounts';
pushTopic.Description = 'All records for the Account object';
pushTopic.Query = 'SELECT Id, Name FROM Account';
insert pushTopic;
System.debug('Created new PushTopic: '+ pushTopic.Id);
Subscribing to a PushTopic
To subscribe to a PushTopic, add either the Subscribe topic listener (subscribe-topic-listener
) or the Replay topic listener (replay-topic-listener
) as a source for your flow. The source acts as an inbound endpoint. Each time the subscription receives an event, the source executes the rest of the flow in your Mule app.
In the following XML example, Mule prints a message to the log at the INFO level when the AccountUpdates
PushTopic receives an event:
<flow name="accountUpdatesSubscription">
<!-- INBOUND ENDPOINT -->
<sfdc:subscribe-topic-listener topic="AccountUpdates"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Received an event for Salesforce Object ID #[map-payload:Id]"/>
</flow>
You can subscribe to a PushTopic that was not previously published in Salesforce. However, after the PushTopic is published, you won’t receive notifications for that PushTopic unless you resubscribe to it.
Replaying Messages from a PushTopic
A subscriber can specify which events to receive. By default, a subscriber receives only the events that occur after subscribing. Events outside of the retention period are discarded.
The Replay topic listener source provides these options:
-
ALL
Subscriber receives all events, including past events that are within the retention period and new events that are sent after the client subscribes.
-
ONLY_NEW
Subscriber receives new events that are sent after the client subscribes.
-
FROM_REPLAY_ID
Subscriber receives all events after the specified event
replayId
. -
FROM_LAST_REPLAY_ID
Subscriber uses the highest replay ID stored in the object store regardless of whether it’s processed successfully or not.
If you specify either the ALL
replay option or ONLY_NEW
replay option, the replayId
value is ignored.
The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the retention period, the replay option determines which events to replay.
To support the resume from last replay ID functionality, the connector uses a persistent object store to keep different details regarding the processed messages. This feature reduces the possibility of message loss and avoids processing duplicate messages. For more information about how the object store is used, see Object Store Usage in Events.
In the following XML example, the Replay topic listener operation (replay-topic-listener
) acts like an inbound endpoint for the Logger component message:
<flow name="accountUpdatesReplay">
<!-- INBOUND ENDPOINT -->
<sfdc:replay-topic-listener topic="AccountUpdates" replayId="1" replayOption="ALL" autoReplay="true"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
Working with Generic Events (Legacy)
Work with generic events by creating a streaming channel, subscribing to a streaming channel, replaying generic events, and pushing events to a streaming channel.
Generic Events is a legacy product. Salesforce no longer enhances Generic Events with new features and provides limited support for this product. Instead of Generic Events, you can use Platform Events, as described in the Platform Events Developer Guide and the Platform Events Basics Trailhead module. |
Creating a Streaming Channel
To create a streaming channel, you must have the proper Salesforce Streaming API permissions enabled in your organization.
Follow these steps to create a streaming channel:
-
Log in to your Salesforce Developer Edition organization.
-
Under All Tabs (+), select Streaming Channels.
-
On the Streaming Channels tab, select New.
-
Enter
/u/notifications/ExampleUserChannel
in the Streaming Channel Name field. -
Enter an optional description.
You can also create a streaming channel by using either the connector Create operation or the connector Publish streaming channel (publish-streaming-channel
) operation. The following example uses the publish-streaming-channel
operation:
<sfdc:publish-streaming-channel
name="/u/Notifications"
description="General notifications"/>
Subscribing to a Streaming Channel
After you create a streaming channel, you can start receiving events by subscribing to the channel. The Subscribe channel listener (subscribe-channel-listener
) source acts like an inbound endpoint. In this example, every time a subscription to /u/TestStreaming
receives an event, it executes the rest of the flow and logs a message at the INFO level:
<flow name="notificationsChannelSubscription">
<!-- INBOUND ENDPOINT -->
<sfdc:subscribe-channel-listener streamingChannel="/u/TestStreaming"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Received an event: #[payload]"/>
</flow>
Replaying Messages from a Streaming Channel
A streaming channel can replay notifications. The Replay channel listener (replay-channel-listener
) source acts as an inbound endpoint, for example:
<flow name="flowStreamingChannelReplay">
<!-- INBOUND ENDPOINT -->
<sfdc:replay-channel-listener streamingChannel="/u/Notifications" replayId="1" replayOption="ALL"/>
<!-- REST OF YOUR FLOW -->
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
If you specify either the ALL
replay option or the ONLY_NEW
replay option, the replayId
value is ignored.
The Resume from the Last Replay Id checkbox enables you to specify an automatic replay of stored events based on the Replay ID of the last event processed by the connector. You can use this functionality when the connector stops listening, such as a during a server shutdown or dropped connection. If the stored Replay ID is outside the retention period, the replay option determines which events to replay.
To support the resume from the last replay ID feature, the connector uses a persistent object store to keep details regarding the processed messages. This feature reduces the possibility of message loss and avoids processing duplicate messages. For more information about how the object store is used, see Object Store Usage in Events.
Pushing Events to a Streaming Channel with No Listener
Users can push events to a streaming channel, even if the channel does not have a listener for reading published events. After a listener is started for the channel, Salesforce Streaming API pushes as many messages as it can to the listener, based on the maximum daily limit.
For example, the maximum number of delivered event notifications within a 24-hour period for a free Salesforce organization is 10,000. Suppose you publish 15,000 events to that channel. When Salesforce Connector subscribes to that channel, Streaming API attempts to push 10,000 events, thus consuming the daily quota. The API then attempts to push the remaining 5,000 events the next day, before pushing any new events.
In this scenario, the connector streams the events one-by-one into the Mule app. If the app takes too long to process a message, Streaming API might instruct the connector to reconnect. If this happens, Streaming API drops all of the unprocessed messages. You can avoid this situation by implementing a reliability pattern, as described in Reliability Patterns.
Obtaining Generic Event Notifications
Salesforce Connector enables you to obtain custom event notifications, which apply to general events that are not tied to Salesforce data changes.
To obtain generic event notifications:
-
Use the Publish streaming channel operation to create a streaming channel.
StreamingChannel
is a special Salesforce object that represents a channel used to notify listeners of generic Streaming API events.You can also create a streaming channel through Salesforce.
-
Use the Subscribe channel listener operation to subscribe to the channel.
Salesforce Connector converts the custom events in your streaming channel to Mule events.
Pushing Generic Events to a Streaming Channel
Salesforce enables you to push custom events to a specific streaming channel through the REST API. To do this, use this connector.
The following example uses the connector’s Push generic event (push-generic-event
) operation to push custom events to the channel with the ID 0M6j0000000KyjBCAS
:
<flow name="flowPushGenericEvent">
<!-- INBOUND ENDPOINT -->
<sfdc:push-generic-event channelId="0M6j0000000KyjBCAS">
<sfdc:events>
<sfdc:event payload="Notification message text"/>
</sfdc:events>
</sfdc:push-generic-event>
<logger level="INFO" message="Replayed events: #[payload]"/>
</flow>
You can retrieve the channel ID from the response map of the Publish streaming channel operation. Alternatively, you can retrieve the channel ID from the Salesforce page:
-
Log in to your Salesforce Developer Edition organization.
-
Under All Tabs (+), select Streaming Channels.
If the channel ID field is not visible on the channel list, follow these steps:
-
Click Create New View.
-
Type a name for the view in the Name input field.
-
In the Available Fields list, select Streaming Channel ID and click Add.
You should see the channel ID for each streaming channel in the list.
-
Add any other fields.
-
Click Save.
The JSON received as a response from the push event operation looks something like this:
[
{
"userOnlineStatus": {
},
"fanoutCount": 0
}
]