メッセージソースの定義

DevKit を使用して、Mule で処理する新しいメッセージを受信または生成するメッセージソースを作成できます。

メッセージソースのユースケースの 1 つとして、ストリーミング API の実装があります。@Source アノテーションでは、@Module または @Connector アノテーション付きクラス内のメソッドが、Mule フローからのコールが可能、および Mule イベントの生成が可能としてマークされます。マークされた各メソッドでメッセージソースが生成されます。メソッドは、その引数の 1 つとして、チェーン内の次のメッセージプロセッサを表す SourceCallback を受け取る必要があります。このパラメータがメソッドの署名に存在する限り、このパラメータが表示される順序は重要ではありません。

もう 1 つのユースケースとして、メッセージソースから出力されたメッセージにインバウンドプロパティを渡す方法があります。 [Pass Inbound Properties Use Case]を参照してください。

ポーリングの実装

@Source アノテーションには SourceStrategy および pollingPeriod 属性が含まれます。PollingPeriod では、アプリケーションから設定可能なデフォルトのポーリング期間の値を定義します。

@Source(sourceStrategy = SourceStrategy.POLLING, pollingPeriod = 1000)
@ReconnectOn(exceptions = MessageSourceException.class)
public void messageSource(SourceCallback callback) throws Exception {
  callback.process(client.getDateToString());
}

生成されたコードでは、ソースの実行と sleep(pollPeriod) のコールを行うスレッドを制御する while ループが作成されます。

public void run() throws Exception {
  while(!Thread.currentThread().isInterrupted()) {
    callback.process(client.getDateToString());
    thread.sleep(pollingPeriod);
  }
}

PollingPeriod パラメータ

SourceStrategy.POLLING, の場合、@Source は XSD および Studio エディタの pollingPeriod パラメータを生成します。

ベリファイア

SourceStrategy.POLLING: を使用する @Source アノテーション付きメソッド

  • pollingPeriod パラメータを定義できません。

  • デフォルトの pollingPeriod 値を定義する必要があります。

@Source使用しない SourceStrategy.POLLING アノテーション付きメソッドはデフォルトの pollingPeriod 値を定義できません。

再接続戦略

@Source メソッドは、@ReconnectOn を使用する再接続戦略をサポートします。

@InvalidateConnectionOn は非推奨になります。

SalesForce コネクタは SalesForce ストリーミング API をサポートします。この API で、ユーザはトピックをサブスクライブし、そのトピックに関連する新しいイベントの発生時に通知を受け取ることができます。

@Source
public void subscribeTopic(String topic, final SourceCallback callback) {
  getBayeuxClient().subscribe(topic, new ClientSessionChannel.MessageListener() {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
      try {
          callback.process(message.getData());
      } catch (Exception e) {
          LOGGER.error(e);
      }
    }
  });
}

Anypoint Studio で次を使用してこのメソッドを呼び出すことができます。

<flow name="myFlow">
   <sfdc:subscribe-topic topic="/someTopic"/>
   <logger level="INFO" message="#[payload]"/>
   ...
</flow>

このメソッドでは、指定されたパラメータ名を持つトピックをサブスクライブし、更新を受信するとロガーを呼び出します。

インバウンドプロパティを渡すユースケース

このユースケースでは、メッセージソースから出力されたメッセージにインバウンドプロパティを渡す方法を説明します。

以前は、このページでは callback.process(Object payload) コールについて説明していましたが、次の追加のコールも使用可能になりました。

callback.process(Object payload, Map<String, Object> properties)

このコールでは、properties と呼ばれるマップを介してインバウンドプロパティを設定します。

例:

@Source(sourceStrategy = SourceStrategy.POLLING,pollingPeriod=5000)
public void getNewMessages(SourceCallback callback) throws Exception {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put("originalFilename", "123.txt");
callback.process("testMessage", properties);
}

これをコネクタに組み込むと、メッセージソースの後に値 123.txt を持つ originalFiname というインバウンドプロパティが含まれます。

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub