Nav

Defining Message Sources

Using DevKit, you can create message sources, which receive or generate new messages for Mule to process.

One of the use cases of message sources is implementing Streaming APIs. The @Source annotation marks a method inside a @Module or @Connector annotated class as callable from a Mule flow and capable of generating Mule events. Each marked method generates a message source. The method must receive a SourceCallback as one of its arguments that represents the next message processor in the chain. The order in which this parameter appears doesn’t matter as long as it is present in the method signature.

Another use case is how to pass inbound properties to the message that comes out of the message source. See Pass Inbound Properties Use Case.

Polling Implementation

The @Source annotation includes the SourceStrategy and pollingPeriod attributes. PollingPeriod defines a default polling period value, which is configurable from an application.


         
      
1
2
3
4
5
@Source(sourceStrategy = SourceStrategy.POLLING, pollingPeriod = 1000)
@ReconnectOn(exceptions = MessageSourceException.class)
public void messageSource(SourceCallback callback) throws Exception {
  callback.process(client.getDateToString());
}

The generated code creates a while loop that controls the thread, which runs the source and calls sleep(pollPeriod):


         
      
1
2
3
4
5
6
public void run() throws Exception {
  while(!Thread.currentThread().isInterrupted()) {
    callback.process(client.getDateToString());
    thread.sleep(pollingPeriod);
  }
}

PollingPeriod Parameter

For SourceStrategy.POLLING, @Source generates a pollingPeriod parameter, in the XSD and the Studio editors.

Verifiers

@Source annotated method with SourceStrategy.POLLING: 

  • Cannot define a pollingPeriod parameter.

  • Must define a default pollingPeriod value.

@Source annotated method without SourceStrategy.POLLING cannot define a default pollingPeriod value.

Reconnection Strategy

The @Source methods provide support for a reconnection strategy using @ReconnectOn.

@InvalidateConnectionOn is deprecated.

Example

The SalesForce Connector supports the SalesForce Streaming API in which users can subscribe to topics and receive notifications when a new event related to that topic occurs.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
@Source
public void subscribeTopic(String topic, final SourceCallback callback) {
  getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
      try {
          callback.process(message.getData());
      } catch (Exception e) {
          LOGGER.error(e);
      }
    }
  });
}

You can invoke this method in Anypoint Studio using:


         
      
1
2
3
4
5
<flow name="myFlow">
   <sfdc:subscribe-topic topic="/someTopic"/>
   <logger level="INFO" message="#[payload]"/>
   ...
</flow>

This method subscribes to a topic with the given parameter name, and when an update is received, it invokes the logger.

Pass Inbound Properties Use Case

This use case describes how to pass inbound properties to the message that comes out of the message source.

Previously this page describes the callback.process(Object payload) call, but this additional call is also available:

callback.process(Object payload, Map<String, Object> properties)

This call sets inbound properties through the map called properties.

Example:


         
      
1
2
3
4
5
6
@Source(sourceStrategy = SourceStrategy.POLLING,pollingPeriod=5000)
public void getNewMessages(SourceCallback callback) throws Exception {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("originalFilename", "123.txt");
callback.process("testMessage", properties);
}

After this is built into a connector, it contains an inbound property called originalFiname with value 123.txt after the message source.