@Source(sourceStrategy = SourceStrategy.POLLING, pollingPeriod = 1000)
@ReconnectOn(exceptions = MessageSourceException.class)
public void messageSource(SourceCallback callback) throws Exception {
callback.process(client.getDateToString());
}
Defining Message Sources
DevKit is compatible only with Studio 6 and Mule 3. To build Mule 4 connectors, see the Mule SDK documentation. |
Using DevKit, you can create message sources, which receive or generate new messages for Mule to process.
One of the use cases of message sources is implementing Streaming APIs. The @Source
annotation marks a method inside a @Module
or @Connector
annotated class as callable from a Mule flow and capable of generating Mule events. Each marked method generates a message source. The method must receive a SourceCallback
as one of its arguments that represents the next message processor in the chain. The order in which this parameter appears doesn’t matter as long as it is present in the method signature.
Another use case is how to pass inbound properties to the message that comes out of the message source. See Pass Inbound Properties Use Case.
Polling Implementation
The @Source
annotation includes the SourceStrategy
and pollingPeriod
attributes. PollingPeriod
defines a default polling period value, which is configurable from an application.
The generated code creates a while
loop that controls the thread, which runs the source and calls sleep(pollPeriod)
:
public void run() throws Exception {
while(!Thread.currentThread().isInterrupted()) {
callback.process(client.getDateToString());
thread.sleep(pollingPeriod);
}
}
PollingPeriod Parameter
For SourceStrategy.POLLING,
@Source
generates a pollingPeriod
parameter, in the XSD and the Studio editors.
Verifiers
@Source
annotated method with SourceStrategy.POLLING:
-
Cannot define a
pollingPeriod
parameter. -
Must define a default
pollingPeriod
value.
@Source
annotated method without SourceStrategy.POLLING
cannot define a default pollingPeriod
value.
Reconnection Strategy
The @Source
methods provide support for a reconnection strategy using @ReconnectOn
.
@InvalidateConnectionOn
is deprecated.
Example
The SalesForce Connector supports the SalesForce Streaming API in which users can subscribe to topics and receive notifications when a new event related to that topic occurs.
@Source
public void subscribeTopic(String topic, final SourceCallback callback) {
getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
@Override
public void onMessage(ClientSessionChannel channel, Message message) {
try {
callback.process(message.getData());
} catch (Exception e) {
LOGGER.error(e);
}
}
});
}
You can invoke this method in Anypoint Studio using:
<flow name="myFlow">
<sfdc:subscribe-topic topic="/someTopic"/>
<logger level="INFO" message="#[payload]"/>
...
</flow>
This method subscribes to a topic with the given parameter name, and when an update is received, it invokes the logger.
Pass Inbound Properties Use Case
This use case describes how to pass inbound properties to the message that comes out of the message source.
Previously this page describes the callback.process(Object payload)
call, but this additional call is also available:
callback.process(Object payload, Map<String, Object> properties)
This call sets inbound properties through the map called properties
.
Example:
@Source(sourceStrategy = SourceStrategy.POLLING,pollingPeriod=5000)
public void getNewMessages(SourceCallback callback) throws Exception {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("originalFilename", "123.txt");
callback.process("testMessage", properties);
}
After this is built into a connector, it contains an inbound property called originalFilename with value 123.txt
after the message source.