Flex Gateway新着情報
Governance新着情報
Monitoring API Managerこの例では、Confluent Schema Registry を Kafka Connector と統合することで avro メッセージをパブリッシュおよびコンシュームし、メッセージのサイズを縮小する方法を示します。この例には次の 2 つのフローが含まれます。
最初のフローでは、スキーマが含まれる avro メッセージを生成して Kafka トピックに送信します。[Transform Message] コンポーネントではスキーマが含まれる avro メッセージを作成し、[Replace AVRO schema with id] 操作ではメッセージに埋め込まれたスキーマを Confluent Schema Registry からの ID で置き換えます。
2 番目のフローでは、Kafka トピックから avro メッセージを読み取り、非シリアル化された avro メッセージのコンテンツをコンソールに出力します。[Replace id with AVRO schema] 操作では、受信したメッセージ内の ID を、Confluent Schema Registry から取得した実際のスキーマに置き換えます。
以下が必要です。
Java 8
Anypoint Studio 7.7.0 以降
Mule runtime Engine (Mule) 4.2.x 以降
DataWeave
プロデューサーフローでは、設定済みの Kafka トピックにメッセージをパブリッシュします。次のスクリーンショットは、Anypoint Studio のプロデューサーフローを示しています。
このフローでは、次を設定します。
HTTP の [Listener] コンポーネント
[Transform Message] コンポーネント
[Logger] コンポーネント
[Replace AVRO schema with id] 操作
[Publish] 操作
2 つ目の [Transform Message] コンポーネント
localhost
のポート 8082
で /
アカウントパスへのコールが行われたときに Mule フローを開始するように HTTP Listener を設定します。
この例では一部の項目値に変数を使用しています。次のいずれかを実行できます。
コード内で変数を値に置き換える。
プロパティファイルで各変数の値を指定しておき、コネクタ設定からそのファイルを参照する。
プロパティファイルの使用方法についての詳細は、「プロパティプレースホルダーの設定」を参照してください。
HTTP リスナーを設定するには、次の手順に従います。
Studio で新しい Mule プロジェクトを作成します。
[Mule Palette (Mule パレット)] で「http
」を検索し、[Listener] 操作を選択します。
[Listener] 操作をキャンバスにドラッグします。
[Listener] 設定で、[Connector configuration (コネクタ設定)] 項目の横にある [Add (追加)] アイコンをクリックしてグローバル要素を追加します。
[OK] をクリックして、デフォルトを受け入れます。
[Path (パス)] 項目を /
に設定します。
最初の [Transform Message] コンポーネントでは、スキーマが含まれる avro メッセージを生成します。
[Mule Palette (Mule パレット)] で、「transform message
」を検索します。
[Transform Message] コンポーネントをキャンバスの [Listener] コンポーネントの横にドラッグします。
Transform Message 設定で、[Output (出力)] セクションの括弧を次の XML で上書きします。
%dw 2.0 output application/avro schemaUrl='classpath://schema.json' --- { "f1": "demo_message" }
xml
この例で使用するスキーマのコンテンツを次に示します。
{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}
このスキーマでは、f1
という名前の項目と型 string
が含まれる JSON オブジェクトを記述しています。
[Mule Palette (Mule パレット)] で、「logger
」を検索します。
[Logger] コンポーネントをキャンバスの [Transform Message] コンポーネントの横にドラッグします。
次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
ロガーの名前 ( |
Message (メッセージ) |
「 |
Level (レベル) |
INFO (Default) (INFO (デフォルト)) |
[Replace AVRO schema with id] 操作では、avro メッセージ内のスキーマを Confluent Schema Registry からの ID に置き換えて、メッセージのサイズを縮小します。
[Replace AVRO schema with id] 操作をキャンバスの [Listener] コンポーネントの横にドラッグします。
[Replace AVRO schema with id] の設定で、[Connector configuration (コネクタ設定)] ドロップダウンをクリックし、[Confluent_Schema_Registry_Connector_Config] を選択します。
[Replace AVRO schema with id] プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Subject (件名) |
|
Message (メッセージ) |
|
[Publish] 操作では、Kafka トピックにメッセージをパブリッシュします。
[Publish] 操作をキャンバスの [Replace AVRO schema with id] 操作の横にドラッグします。
[Publish] プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
操作の名前 ( |
Topic (トピック) |
|
Key (キー) |
|
[Publish] 操作の [General (一般)] 設定画面で、[Add (追加)] アイコンをクリックして、グローバル要素設定項目にアクセスします。
[Bootstrap server URLs (ブートストラップサーバー URL)] 項目で、[Edit inline (インライン編集)] を選択し、[Add (追加)] アイコンをクリックします。
値 ${config.basic.bootstrapServers}
を入力して [Finish (完了)] をクリックします。
2 つ目の [Transform Message] コンポーネントでは、[Publish] 操作の応答を出力します。
[Mule Palette (Mule パレット)] で、「transform message
」を検索します。
[Transform Message] コンポーネントをキャンバスの [Publish] 操作の横にドラッグします。
Transform Message 設定で、[Output (出力)] セクションの括弧を次の XML で上書きします。
%dw 2.0 output application/json --- payload
xml
コンシューマーフローでは、Kafka トピックからパブリッシュ済みのメッセージをコンシュームしてコンソールに出力します。次のスクリーンショットは、Anypoint Studio のコンシューマーフローを示しています。
このフローでは、次を設定します。
[Message listener]
[Logger] コンポーネント
[Replace id with AVRO schema] 操作
[Transform Message] コンポーネント
2 つ目の [Logger] コンポーネント
[Message listener] では、Kafka トピックからパブリッシュ済みのメッセージをコンシュームします。
[Message listener] をキャンバスにドラッグします。
[Message listener] プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
操作の名前 ( |
Topic (トピック) |
|
Key (キー) |
|
[Connector configuration (コネクタ設定)] 項目の横にある [Add (追加)] アイコンをクリックし、グローバル要素設定項目にアクセスします。
次の項目に入力します。
[Bootstrap server URLs (ブートストラップサーバー URL)] 項目で、[Edit inline (インライン編集)] を選択し、[Add (追加)] アイコンをクリックします。
値 ${config.basic.bootstrapServers}
を入力して [Finish (完了)] をクリックします。
[Group ID (グループ ID)] 項目に ${consumer.groupId}
と入力します。
[Topic Subscription Patterns (トピックサブスクリプションパターン)] 項目で、[Edit inline (インライン編集)] を選択し、[Add (追加)] アイコンをクリックします。
値 ${config.topics}
を入力して [Finish (完了)] をクリックします。
[Mule Palette (Mule パレット)] で、「logger
」を検索します。
[Logger] コンポーネントをキャンバスの [Message listener] の横にドラッグします。
次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
ロガーの名前 ( |
Message (メッセージ) |
「 |
Level (レベル) |
INFO (Default) (INFO (デフォルト)) |
[Replace id with AVRO schema] 操作では、埋め込まれた ID を avro スキーマに置き換えます。
[Replace id with AVRO schema] 操作をキャンバスの [Logger] コンポーネントの横にドラッグします。
[Replace id with AVRO schema] の設定で、[Connector configuration (コネクタ設定)] ドロップダウンをクリックし、[Confluent_Schema_Registry_Connector_Config] を選択します。
[Replace id with AVRO schema] プロパティウィンドウで、次の項目を設定します。
項目 | 値 |
---|---|
Message (メッセージ) |
|
[Transform Message] コンポーネントでは、ペイロードを JSON に変換します。
[Mule Palette (Mule パレット)] で、「transform message
」を検索します。
[Transform Message] コンポーネントをキャンバスの [Replace id with AVRO schema] 操作の横にドラッグします。
Transform Message 設定で、[Output (出力)] セクションの括弧を次の XML で上書きします。
%dw 2.0 output application/json --- payload
xml
[Mule Palette (Mule パレット)] で、「logger
」を検索します。
[Logger] コンポーネントをキャンバスの [Transform Message] の横にドラッグします。
次の項目を設定します。
項目 | 値 |
---|---|
Display Name (表示名) |
ロガーの名前 ( |
Message (メッセージ) |
「 |
Level (レベル) |
INFO (Default) (INFO (デフォルト)) |
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka"
xmlns:confluent-schema-registry="http://www.mulesoft.org/schema/mule/confluent-schema-registry" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/confluent-schema-registry http://www.mulesoft.org/schema/mule/confluent-schema-registry/current/mule-confluent-schema-registry.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
<configuration-properties file="mule-app.properties"/>
<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="ac9811ff-9234-4f44-9dc0-83100cb6c1bd" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<confluent-schema-registry:config name="Confluent_Schema_Registry_Connector_Config" doc:name="Confluent Schema Registry Connector Config" doc:id="faae820f-cd5a-46d0-9692-f28b53ea3bb6" >
<confluent-schema-registry:basic-auth-connection username="${config.user}" password="${config.pass}" baseUri="${config.schemaUrl}" />
</confluent-schema-registry:config>
<kafka:consumer-config name="Apache_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="d0443fc9-b2d3-4bbc-939f-126e98255cf3" >
<kafka:consumer-plaintext-connection groupId="${config.consumerGroup}" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.bootstrapServer}" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value="${config.topic}" />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="f378b3d4-4486-487d-84e2-8cc80aae7295" >
<kafka:producer-plaintext-connection >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.bootstrapServer}" />
</kafka:bootstrap-servers>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
<flow name="demoFlow" doc:id="80c4cd43-2c65-4395-8e3e-52e6f7ed882b" >
<http:listener doc:name="Listener" doc:id="a0c9691a-b202-4aaf-9dcf-6b1385709ee6" config-ref="HTTP_Listener_config" path="/test"/>
<ee:transform doc:name="Transform Message" doc:id="4dc43b6d-f7c7-4cba-bbcd-db733cae1de4" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/avro schemaUrl='classpath://schema.json'
---
{
"f1": "demo_message"
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<confluent-schema-registry:replace-avro-schema-with-id doc:name="Replace AVRO schema with id" doc:id="61b1d3b7-eabe-4d53-b5ca-357fda8bbd1e" config-ref="Confluent_Schema_Registry_Connector_Config" subject="${config.subject}"/>
<kafka:publish doc:name="Publish" doc:id="aad92d55-a388-4680-86f1-e99adfcb14f5" config-ref="Apache_Kafka_Producer_configuration" topic="${config.topic}"/>
<ee:transform doc:name="Transform Message" doc:id="f64a1dd7-d091-47bc-a0c7-b533c4c8d492" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
<flow name="demoFlow1" doc:id="a244ee60-143f-4320-a34e-6b572de50341" >
<kafka:message-listener doc:name="Message listener" doc:id="056db197-1353-407a-ba58-8d90fc68e766" config-ref="Apache_Kafka_Consumer_configuration"/>
<confluent-schema-registry:replace-id-with-avro-schema doc:name="Replace Id With Avro Schema" doc:id="26a15edf-94e2-4138-8acb-385821fc9710" config-ref="Confluent_Schema_Registry_Connector_Config"/>
<ee:transform doc:name="Transform Message" doc:id="a17bd48c-37e7-449e-84a0-d8cd14e4925f" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
</mule>
xml