メッセージソースの定義

DevKit は、Studio 6 および Mule 3 とのみ互換性があります。Mule 4 Connector を作成するには、 「Mule SDK」ドキュメント​を参照してください。

DevKit を使用して、Mule で処理する新しいメッセージを受信または生成するメッセージソースを作成できます。

メッセージソースのユースケースの 1 つとして、ストリーミング API の実装があります。​@Source​ アノテーションでは、​@Module​ または ​@Connector​ アノテーション付きクラス内のメソッドが、Mule フローからのコールが可能、および Mule イベントの生成が可能としてマークされます。マークされた各メソッドでメッセージソースが生成されます。メソッドは、その引数の 1 つとして、チェーン内の次のメッセージプロセッサを表す ​SourceCallback​ を受け取る必要があります。このパラメータがメソッドの署名に存在する限り、このパラメータが表示される順序は重要ではありません。

もう 1 つのユースケースとして、メッセージソースから出力されたメッセージにインバウンドプロパティを渡す方法があります。 「​インバウンドプロパティを渡すユースケース​」を参照してください。

ポーリングの実装

@Source​ アノテーションには ​SourceStrategy​ および ​pollingPeriod​ 属性が含まれます。​PollingPeriod​ では、アプリケーションから設定可能なデフォルトのポーリング期間の値を定義します。

@Source(sourceStrategy = SourceStrategy.POLLING, pollingPeriod = 1000)
@ReconnectOn(exceptions = MessageSourceException.class)
public void messageSource(SourceCallback callback) throws Exception {
  callback.process(client.getDateToString());
}

生成されたコードでは、ソースの実行と ​sleep(pollPeriod)​ のコールを行うスレッドを制御する ​while​ ループが作成されます。

public void run() throws Exception {
  while(!Thread.currentThread().isInterrupted()) {
    callback.process(client.getDateToString());
    thread.sleep(pollingPeriod);
  }
}

PollingPeriod パラメータ

SourceStrategy.POLLING,​ の場合、​@Source​ は XSD および Studio エディタの ​pollingPeriod​ パラメータを生成します。

ベリファイア

SourceStrategy.POLLING:​ を使用する ​@Source​ アノテーション付きメソッド

  • pollingPeriod​ パラメータを定義できません。

  • デフォルトの ​pollingPeriod​ 値を定義する必要があります。

@Source​ を​使用しない​ ​SourceStrategy.POLLING​ アノテーション付きメソッドはデフォルトの ​pollingPeriod​ 値を定義できません。

再接続戦略

@Source​ メソッドは、​@ReconnectOn​ を使用する再接続戦略をサポートします。

@InvalidateConnectionOn​ は非推奨になります。

Salesforce Connector​ は SalesForce ストリーミング API をサポートします。この API で、ユーザはトピックをサブスクライブし、そのトピックに関連する新しいイベントの発生時に通知を受け取ることができます。

@Source
public void subscribeTopic(String topic, final SourceCallback callback) {
  getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
      try {
          callback.process(message.getData());
      } catch (Exception e) {
          LOGGER.error(e);
      }
    }
  });
}

Anypoint Studio で次を使用してこのメソッドを呼び出すことができます。

<flow name="myFlow">
   <sfdc:subscribe-topic topic="/someTopic"/>
   <logger level="INFO" message="#[payload]"/>
   ...
</flow>

このメソッドでは、指定されたパラメータ名を持つトピックをサブスクライブし、更新を受信するとロガーを呼び出します。

インバウンドプロパティを渡すユースケース

このユースケースでは、メッセージソースから出力されたメッセージにインバウンドプロパティを渡す方法を説明します。

以前は、このページでは ​callback.process(Object payload)​ コールについて説明していましたが、次の追加のコールも使用可能になりました。

callback.process(Object payload, Map<String, Object> properties)

このコールでは、​properties​ と呼ばれるマップを介してインバウンドプロパティを設定します。

例:

@Source(sourceStrategy = SourceStrategy.POLLING,pollingPeriod=5000)
public void getNewMessages(SourceCallback callback) throws Exception {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("originalFilename", "123.txt");
callback.process("testMessage", properties);
}

これをコネクタに組み込むと、メッセージソースの後に値 ​123.txt​ を持つ originalFilename というインバウンドプロパティが含まれます。