Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerApache Kafka Connector の Mule フローの例を示します。
次の例は、Apache Kafka Connector の 2 つの操作 (Publish と Consume) を使用して、メッセージを Apache Kafka にパブリッシュしてから取得する方法を示しています。この例には次の 2 つのフローが含まれます。
最初のフローは、メッセージを Apache Kafka にパブリッシュするプロデューサーフローです。
2 番目のフローは、Apache Kafka からメッセージをコンシュームするコンシューマーフローです。
メッセージを Apache Kafka にパブリッシュする最初のフローを設定します。最初のフローを作成するには、Listener コンポーネント、Logger コンポーネント、Publish 操作、Set Payload トランスフォーマーを設定する必要があります。
/pushMessage
パスへのコールが行われたときに Mule フローを開始するように Listener コンポーネントを設定します。
Studio で新しい Mule プロジェクトを作成します。
[Mule Palette (Mule パレット)] ビューから、[HTTP] を選択して [Listener] コンポーネントをキャンバスにドラッグします。
プロパティウィンドウで、[Connector configuration (コネクタ設定)] 項目の横にある [+] をクリックしてグローバル要素を追加します。
デフォルトを受け入れます。
プロパティウィンドウで、[Path (パス)] 項目値を /pushMessage
に設定します。
Mule コンソールに応答を表示する Logger コンポーネントを追加します。
[Mule Palette (Mule パレット)] ビューから、[Core (コア)] を選択して [Logger] コンポーネントを [Listener] の右側にドラッグします。
プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
ロガーの名前。 |
Message (メッセージ) |
|
Level (レベル) |
INFO (Default) (INFO (デフォルト)) |
次の図は、プロパティウィンドウでの [Logger] 設定の例を示しています。
[Publish] 操作を追加して、指定した Kafka トピックにメッセージをパブリッシュします:
[Mule Palette (Mule パレット)] ビューから、[Apache Kafka] を選択して [Publish] 操作を [Logger] の右側にドラッグします。
プロパティウィンドウで、[Connector configuration (コネクタ設定)] 項目の横にある [+] をクリックしてグローバル要素を追加します。
次のようにグローバル要素を設定します。
[Bootstrap Server URLs (ブートストラップサーバー URL)] 項目で、[Edit inline (インライン編集)] を選択し、緑色のプラス記号をクリックします。
値 ${config.basic.bootstrapServers}
を入力して [Finish (完了)] をクリックします。
プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
コネクタ操作の名前。 |
Connector configuration (コネクタ設定) |
作成したばかりのグローバル設定。 |
Topic (トピック) |
|
Key (キー) |
|
Message (メッセージ) |
|
次の画像は、プロパティウィンドウでの Publish 設定の例を示しています。
応答ビルダーをプッシュする Set Payload トランスフォーマーを追加します。
[Mule Palette (Mule パレット)] ビューから、[Core (コア)] を選択して [Set Payload] トランスフォーマーを [Publish] の右側にドラッグします。
プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
トランスフォーマーの名前 ( |
Value (値) |
|
次の画像は、プロパティウィンドウでの Set Payload 設定の例を示しています。
Apache Kafka からメッセージをコンシュームする 2 番目のフローを設定します。2 番目のフローを作成するには、Message listener ソースと Logger コンポーネントを設定する必要があります。
メッセージエンドポイントをコンシュームする [Message listener] ソースを追加します。
[Mule Palette (Mule パレット)] ビューから、[Apache Kafka] を選択して [Message listener] ソースをキャンバスにドラッグします。
プロパティウィンドウで、[Connector configuration (コネクタ設定)] 項目の横にある [+] をクリックしてグローバル要素を追加します。
次のようにグローバル要素を設定します。
[Bootstrap Server URLs (ブートストラップサーバー URL)] 項目で、[Edit inline (インライン編集)] を選択し、緑色のプラス記号をクリックします。
値 ${config.basic.bootstrapServers}
を入力して [Finish (完了)] をクリックします。
[Group ID (グループ ID)] 項目に ${consumer.groupId}
と入力します。
[Topic Subscription Patterns (トピックサブスクリプションパターン)] 項目で、[Edit inline (インライン編集)] を選択し、緑色のプラス記号をクリックします。
値 ${topic1}
を入力して [Finish (完了)] をクリックします。
プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
コネクタソースの名前。 |
Connector configuration (コネクタ設定) |
作成したばかりのグローバル設定。 |
次の画像は、プロパティウィンドウでの Message listener 設定の例を示しています。
Mule コンソールに応答を表示する Logger コンポーネントを追加します。
[Mule Palette (Mule パレット)] ビューから、[Core (コア)] を選択して [Logger] コンポーネントを [Message listener] の右側にドラッグします。
プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
ロガーの名前。 |
Message (メッセージ) |
|
Level (レベル) |
INFO (Default) (INFO (デフォルト)) |
次の図は、プロパティウィンドウでの [Logger] 設定の例を示しています。
この例のフローをすばやく Mule アプリケーションに読み込むには、次のコードを Studio XML エディターに貼り付けます。
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:Kafka="http://www.mulesoft.org/schema/mule/Kafka"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core
http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/kafka
http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
<configuration-properties file="mule-app.properties">
</configuration-properties>
<http:listener-config name="HTTP_Listener_config"
doc:name="HTTP Listener config" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<kafka:consumer-config name="Apache_Kafka_Consumer_configuration"
doc:name="Apache Kafka Consumer configuration" >
<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value='${topic1}
<kafka:topic-pattern value="topic-1" />
<kafka:topic-pattern value="topic-2" />
</kafka:topic-patterns>]' />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<kafka:producer-config name="Apache_Kafka_Producer_configuration"
doc:name="Apache Kafka Producer configuration" >
<kafka:producer-plaintext-connection >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
</kafka:bootstrap-servers>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
<kafka:consumer-config name="Apache_Kafka_Consumer_configuration1" doc:name="Apache Kafka Consumer configuration" >
<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value="${config.topics}" />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<flow name="Producer-Flow" >
<http:listener doc:name="Push message endpoint" config-ref="HTTP_Listener_config" path="/pushMessage" />
<logger level="INFO" doc:name="Logger" message="&quot;Message: '&quot; ++ payload.message ++ &quot;' is going to be published to topic: '&quot; ++ payload.topic ++ &quot;'.&quot;" />
<kafka:publish doc:name="Producer" topic="#[payload.topic]" key="#[now()]" config-ref="Apache_Kafka_Producer_configuration" />
<set-payload value="Message successfully sent to Apache Kafka topic." doc:name="Push response builder" />
</flow>
<flow name="Consumer-Flow" >
<kafka:message-listener doc:name="Consume message endpoint" config-ref="Apache_Kafka_Consumer_configuration"/>
<logger level="INFO" doc:name="Logger" message="'New message arrived: ' ++ payload ++ &quot;, key:&quot; ++ attributes.key ++ &quot;, partition:&quot; ++ attributes.partition ++ &quot;, offset:&quot; ++ attributes.offset"/>
</flow>
</mule>