Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

Defining Message Sources

Using DevKit, you can create Message Sources, which receive or generate new messages for Mule to process.

One of the use cases of Message Sources is implementing Streaming APIs. The @Source annotation marks a method inside a @Module or @Connector annotated class as callable from a Mule flow and capable of generating Mule events. Each marked method generates a Message Source. The method must receive a SourceCallback as one of its arguments that represents the next message processor in the chain. The order in which this parameter appears doesn’t matter as long as it is present in the method signature.

Polling Implementation

The @Source annotation includes the SourceStrategy and pollingPeriod attributes. PollingPeriod defines a default polling period value, which is configurable from an application.


         
      
1
2
3
4
5
@Source(sourceStrategy = SourceStrategy.POLLING, pollingPeriod = 1000)
@ReconnectOn(exceptions = MessageSourceException.class)
public void messageSource(SourceCallback callback) throws Exception {
  callback.process(client.getDateToString());
}

The generated code creates a while loop that controls the thread, which runs the source and calls sleep(pollPeriod):


         
      
1
2
3
4
5
6
public void run() throws Exception {
  while(!Thread.currentThread().isInterrupted()) {
    callback.process(client.getDateToString());
    thread.sleep(pollingPeriod);
  }
}

PollingPeriod Parameter

For SourceStrategy.POLLING, @Source generates a pollingPeriod parameter, in the XSD and the Studio editors.

Verifiers

@Source annotated method with SourceStrategy.POLLING: 

  • Cannot define a pollingPeriod parameter.

  • Must define a default ` pollingPeriod ` value.

@Source annotated method without SourceStrategy.POLLING cannot define a default pollingPeriod value.

Reconnection Strategy

The @Source methods provide support for a reconnection strategy using @ReconnectOn.

@InvalidateConnectionOn is deprecated.

Example

The SalesForce Connector supports the SalesForce Streaming API in which users can subscribe to topics and receive notifications when a new event related to that topic occurs.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
@Source
public void subscribeTopic(String topic, final SourceCallback callback) {
  getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
      try {
          callback.process(message.getData());
      } catch (Exception e) {
          LOGGER.error(e);
      }
    }
  });
}

You can invoke this method in Anypoint Studio using:


         
      
1
2
3
4
5
<flow name="myFlow">
   <sfdc:subscribe-topic topic="/someTopic"/>
   <logger level="INFO" message="#[payload]"/>
   ...
</flow>

This method subscribes to a topic with the given parameter name, and when an update is received, it invokes the logger.

See Also