Apache Kafka 4.7 の例 - Mule 4

Apache Kafka Connector の Mule フローの例を示します。

トピックのパブリッシュとコンシューム

次の例は、Apache Kafka Connector の 2 つの操作 (​Publish​ と ​Consume​) を使用して、メッセージを Apache Kafka にパブリッシュしてから取得する方法を示しています。この例には次の 2 つのフローが含まれます。

  • 最初のフローは、メッセージを Apache Kafka にパブリッシュするプロデューサーフローです。

  • 2 番目のフローは、Apache Kafka からメッセージをコンシュームするコンシューマーフローです。

Studio の Apache Kafka Connector のプロデューサーフロー
Studio の Apache Kafka Connector のコンシューマーフロー

最初のフローを設定する

メッセージを Apache Kafka にパブリッシュする最初のフローを設定します。最初のフローを作成するには、​Listener​ コンポーネント、​Logger​ コンポーネント、​Publish​ 操作、​Set Payload​ トランスフォーマーを設定する必要があります。

HTTP リスナーを設定する

/pushMessage​ パスへのコールが行われたときに Mule フローを開始するように ​Listener​ コンポーネントを設定します。

  1. Studio で新しい Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)]​ ビューから、​[HTTP]​ を選択して ​[Listener]​ コンポーネントをキャンバスにドラッグします。

  3. プロパティウィンドウで、​[Connector configuration (コネクタ設定)]​ 項目の横にある ​[+]​ をクリックしてグローバル要素を追加します。

  4. デフォルトを受け入れます。

  5. プロパティウィンドウで、​[Path (パス)]​ 項目値を ​/pushMessage​ に設定します。

Logger コンポーネントを追加する

Mule コンソールに応答を表示する ​Logger​ コンポーネントを追加します。

  1. [Mule Palette (Mule パレット)]​ ビューから、​[Core (コア)]​ を選択して ​[Logger]​ コンポーネントを ​[Listener]​ の右側にドラッグします。

  2. プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    ロガーの名前。

    Message (メッセージ)

    #["Message: '" payload.message "' is going to be published to topic: '" payload.topic "'."]

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

    次の図は、プロパティウィンドウでの ​[Logger]​ 設定の例を示しています。

    [Logger] プロパティウィンドウ設定

[Publish] 操作を追加する

[Publish]​ 操作を追加して、指定した Kafka トピックにメッセージをパブリッシュします:

  1. [Mule Palette (Mule パレット)]​ ビューから、​[Apache Kafka]​ を選択して ​[Publish]​ 操作を ​[Logger]​ の右側にドラッグします。

  2. プロパティウィンドウで、​[Connector configuration (コネクタ設定)]​ 項目の横にある ​[+]​ をクリックしてグローバル要素を追加します。

  3. 次のようにグローバル要素を設定します。

    1. [Bootstrap Server URLs (ブートストラップサーバー URL)]​ 項目で、​[Edit inline (インライン編集)]​ を選択し、緑色のプラス記号をクリックします。

    2. 値 ​${config.basic.bootstrapServers}​ を入力して ​[Finish (完了)]​ をクリックします。

  4. プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    コネクタ操作の名前。

    Connector configuration (コネクタ設定)

    作成したばかりのグローバル設定。

    Topic (トピック)

    \#[payload.topic]

    Key (キー)

    #[now()]

    Message (メッセージ)

    payload

    次の画像は、プロパティウィンドウでの ​Publish​ 設定の例を示しています。

    Publish プロパティウィンドウ設定

Set Payload トランスフォーマーを追加する

応答ビルダーをプッシュする ​Set Payload​ トランスフォーマーを追加します。

  1. [Mule Palette (Mule パレット)]​ ビューから、​[Core (コア)]​ を選択して ​[Set Payload]​ トランスフォーマーを ​[Publish]​ の右側にドラッグします。

  2. プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    トランスフォーマーの名前 (​Push response builder​ など)。

    Value (値)

    Message successfully sent to Apache Kafka topic.

    次の画像は、プロパティウィンドウでの ​Set Payload​ 設定の例を示しています。

    Set Payload プロパティウィンドウ設定

2 番目のフローを設定する

Apache Kafka からメッセージをコンシュームする 2 番目のフローを設定します。2 番目のフローを作成するには、​Message listener​ ソースと ​Logger​ コンポーネントを設定する必要があります。

Message Listener ソースを追加する

メッセージエンドポイントをコンシュームする ​[Message listener]​ ソースを追加します。

  1. [Mule Palette (Mule パレット)]​ ビューから、​[Apache Kafka]​ を選択して ​[Message listener]​ ソースをキャンバスにドラッグします。

  2. プロパティウィンドウで、​[Connector configuration (コネクタ設定)]​ 項目の横にある ​[+]​ をクリックしてグローバル要素を追加します。

  3. 次のようにグローバル要素を設定します。

    1. [Bootstrap Server URLs (ブートストラップサーバー URL)]​ 項目で、​[Edit inline (インライン編集)]​ を選択し、緑色のプラス記号をクリックします。

    2. 値 ​${config.basic.bootstrapServers}​ を入力して ​[Finish (完了)]​ をクリックします。

    3. [Group ID (グループ ID)]​ 項目に ​${consumer.groupId}​ と入力します。

    4. [Topic Subscription Patterns (トピックサブスクリプションパターン)]​ 項目で、​[Edit inline (インライン編集)]​ を選択し、緑色のプラス記号をクリックします。

    5. 値 ​${topic1}​ を入力して ​[Finish (完了)]​ をクリックします。

  4. プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    コネクタソースの名前。

    Connector configuration (コネクタ設定)

    作成したばかりのグローバル設定。

    次の画像は、プロパティウィンドウでの ​Message listener​ 設定の例を示しています。

    Message listener プロパティウィンドウ設定

Logger コンポーネントを追加する

Mule コンソールに応答を表示する ​Logger​ コンポーネントを追加します。

  1. [Mule Palette (Mule パレット)]​ ビューから、​[Core (コア)]​ を選択して ​[Logger]​ コンポーネントを ​[Message listener]​ の右側にドラッグします。

  2. プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    ロガーの名前。

    Message (メッセージ)

    'New message arrived: ' payload ", key:" attributes.key ", partition:" attributes.partition ", offset:" ++ attributes.offset

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

    次の図は、プロパティウィンドウでの ​[Logger]​ 設定の例を示しています。

    [Logger] プロパティウィンドウ設定

この例の XML

この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:Kafka="http://www.mulesoft.org/schema/mule/Kafka"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/http
  http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
  http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
	http://www.mulesoft.org/schema/mule/core
	http://www.mulesoft.org/schema/mule/core/current/mule.xsd
	http://www.mulesoft.org/schema/mule/http
	http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
	http://www.mulesoft.org/schema/mule/ee/core
	http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
	http://www.mulesoft.org/schema/mule/kafka
	http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
  <configuration-properties file="mule-app.properties">
	</configuration-properties>
  <http:listener-config name="HTTP_Listener_config"
    doc:name="HTTP Listener config"  >
  <http:listener-connection host="0.0.0.0" port="8081" />
  </http:listener-config>

  <kafka:consumer-config name="Apache_Kafka_Consumer_configuration"
 	  doc:name="Apache Kafka Consumer configuration" >
	  <kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
	  <kafka:bootstrap-servers >
	    <kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
	  </kafka:bootstrap-servers>
	  <kafka:topic-patterns >
	    <kafka:topic-pattern value='${topic1}
	    &lt;kafka:topic-pattern value="topic-1" /&gt;
	    &lt;kafka:topic-pattern value="topic-2" /&gt;
	    &lt;/kafka:topic-patterns&gt;]' />
	  </kafka:topic-patterns>
	  </kafka:consumer-plaintext-connection>
	</kafka:consumer-config>
	<kafka:producer-config name="Apache_Kafka_Producer_configuration"
	  doc:name="Apache Kafka Producer configuration" >
	  <kafka:producer-plaintext-connection >
	  <kafka:bootstrap-servers >
	    <kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
	  </kafka:bootstrap-servers>
	  </kafka:producer-plaintext-connection>
	  </kafka:producer-config>
	<kafka:consumer-config name="Apache_Kafka_Consumer_configuration1" doc:name="Apache Kafka Consumer configuration" >
		<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
			<kafka:bootstrap-servers >
				<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
			</kafka:bootstrap-servers>
			<kafka:topic-patterns >
				<kafka:topic-pattern value="${config.topics}" />
			</kafka:topic-patterns>
		</kafka:consumer-plaintext-connection>
	</kafka:consumer-config>
	<flow name="Producer-Flow" >
		<http:listener doc:name="Push message endpoint" config-ref="HTTP_Listener_config" path="/pushMessage" />
		<logger level="INFO" doc:name="Logger" message="&amp;quot;Message: '&amp;quot; ++ payload.message ++ &amp;quot;' is going to be published to topic: '&amp;quot; ++ payload.topic ++ &amp;quot;'.&amp;quot;" />
		<kafka:publish doc:name="Producer" topic="#[payload.topic]" key="#[now()]" config-ref="Apache_Kafka_Producer_configuration" />
		<set-payload value="Message successfully sent to Apache Kafka topic." doc:name="Push response builder" />
	</flow>
	<flow name="Consumer-Flow" >
		<kafka:message-listener doc:name="Consume message endpoint" config-ref="Apache_Kafka_Consumer_configuration"/>
		<logger level="INFO" doc:name="Logger" message="'New message arrived: ' ++ payload ++ &amp;quot;, key:&amp;quot; ++ attributes.key ++ &amp;quot;, partition:&amp;quot; ++ attributes.partition ++ &amp;quot;, offset:&amp;quot; ++ attributes.offset"/>
	</flow>
</mule>