Confluent Schema Registry 1.0 の例 - Mule4

この例では、Confluent Schema Registry を Kafka Connector と統合することで avro メッセージをパブリッシュおよびコンシュームし、メッセージのサイズを縮小する方法を示します。この例には次の 2 つのフローが含まれます。

  1. 最初のフローでは、スキーマが含まれる avro メッセージを生成して Kafka トピックに送信します。​[Transform Message]​ コンポーネントではスキーマが含まれる avro メッセージを作成し、​[Replace AVRO schema with id]​ 操作ではメッセージに埋め込まれたスキーマを Confluent Schema Registry からの ID で置き換えます。

  2. 2 番目のフローでは、Kafka トピックから avro メッセージを読み取り、非シリアル化された avro メッセージのコンテンツをコンソールに出力します。​[Replace id with AVRO schema]​ 操作では、受信したメッセージ内の ID を、Confluent Schema Registry から取得した実際のスキーマに置き換えます。

始める前に

以下が必要です。

  • Java 8

  • Anypoint Studio 7.7.0 以降

  • Mule runtime Engine (Mule) 4.2.x 以降

  • DataWeave

プロデューサーフローを作成する

プロデューサーフローでは、設定済みの Kafka トピックにメッセージをパブリッシュします。次のスクリーンショットは、Anypoint Studio のプロデューサーフローを示しています。

Studio での Confluent Schema Registry Connector のプロデューサーフロー

このフローでは、次を設定します。

  • HTTP の ​[Listener]​ コンポーネント

  • [Transform Message]​ コンポーネント

  • [Logger]​ コンポーネント

  • [Replace AVRO schema with id]​ 操作

  • [Publish]​ 操作

  • 2 つ目の ​[Transform Message]​ コンポーネント

HTTP リスナーを設定する

localhost​ のポート ​8082​ で ​/​ アカウントパスへのコールが行われたときに Mule フローを開始するように ​HTTP Listener​ を設定します。

この例では一部の項目値に変数を使用しています。次のいずれかを実行できます。

  • コード内で変数を値に置き換える。

  • プロパティファイルで各変数の値を指定しておき、コネクタ設定からそのファイルを参照する。

プロパティファイルの使用方法についての詳細は、​「プロパティプレースホルダーの設定」​を参照してください。

HTTP リスナー​を設定するには、次の手順に従います。

  1. Studio で新しい Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)]​ で「​http​」を検索し、​[Listener]​ 操作を選択します。

  3. [Listener]​ 操作をキャンバスにドラッグします。

  4. [Listener]​ 設定で、​[Connector configuration (コネクタ設定)]​ 項目の横にある ​[Add (追加)]​ アイコンをクリックしてグローバル要素を追加します。

  5. [OK]​ をクリックして、デフォルトを受け入れます。

  6. [Path (パス)]​ 項目を ​/​ に設定します。

最初の [Transform Message] コンポーネントを追加する

最初の ​[Transform Message]​ コンポーネントでは、スキーマが含まれる avro メッセージを生成します。

  1. [Mule Palette (Mule パレット)]​ で、「​transform message​」を検索します。

  2. [Transform Message]​ コンポーネントをキャンバスの ​[Listener]​ コンポーネントの横にドラッグします。

  3. Transform Message​ 設定で、​[Output (出力)]​ セクションの括弧を次の XML で上書きします。

    %dw 2.0
    output application/avro schemaUrl='classpath://schema.json'
    ---
    {
    	"f1": "demo_message"
    }
    xml

この例で使用するスキーマのコンテンツを次に示します。

{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}

このスキーマでは、​f1​ という名前の項目と型 ​string​ が含まれる JSON オブジェクトを記述しています。

Logger コンポーネントを追加する

  1. [Mule Palette (Mule パレット)]​ で、「​logger​」を検索します。

  2. [Logger]​ コンポーネントをキャンバスの ​[Transform Message]​ コンポーネントの横にドラッグします。

  3. 次の項目を設定します。

    項目

    Display Name (表示名)

    ロガーの名前 (​Log Response​ など)

    Message (メッセージ)

    「​#[payload]​」と入力します。

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

[Replace AVRO Schema with ID] 操作を追加する

[Replace AVRO schema with id]​ 操作では、avro メッセージ内のスキーマを Confluent Schema Registry からの ID に置き換えて、メッセージのサイズを縮小します。

  1. [Replace AVRO schema with id]​ 操作をキャンバスの ​[Listener]​ コンポーネントの横にドラッグします。

  2. [Replace AVRO schema with id]​ の設定で、​[Connector configuration (コネクタ設定)]​ ドロップダウンをクリックし、​[Confluent_Schema_Registry_Connector_Config]​ を選択します。

  3. [Replace AVRO schema with id]​ プロパティウィンドウで、次の項目を設定します。

    項目

    Subject (件名)

    ${config.subject}

    Message (メッセージ)

    payload

[Publish] 操作を追加する

[Publish]​ 操作では、Kafka トピックにメッセージをパブリッシュします。

  1. [Publish]​ 操作をキャンバスの ​[Replace AVRO schema with id]​ 操作の横にドラッグします。

  2. [Publish]​ プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    操作の名前 (​Producer​ など)

    Topic (トピック)

    #[payload.topic]

    Key (キー)

    #[now()]

  3. [Publish]​ 操作の ​[General (一般)]​ 設定画面で、​[Add (追加)]​ アイコンをクリックして、グローバル要素設定項目にアクセスします。

    1. [Bootstrap server URLs (ブートストラップサーバー URL)]​ 項目で、​[Edit inline (インライン編集)]​ を選択し、​[Add (追加)]​ アイコンをクリックします。

    2. 値 ​${config.basic.bootstrapServers}​ を入力して ​[Finish (完了)]​ をクリックします。

2 つ目の [Transform Message] コンポーネントを追加する

2 つ目の ​[Transform Message]​ コンポーネントでは、​[Publish]​ 操作の応答を出力します。

  1. [Mule Palette (Mule パレット)]​ で、「​transform message​」を検索します。

  2. [Transform Message]​ コンポーネントをキャンバスの ​[Publish]​ 操作の横にドラッグします。

  3. Transform Message​ 設定で、​[Output (出力)]​ セクションの括弧を次の XML で上書きします。

    %dw 2.0
    output application/json
    ---
    payload
    xml

コンシューマーフローを作成する

コンシューマーフローでは、Kafka トピックからパブリッシュ済みのメッセージをコンシュームしてコンソールに出力します。次のスクリーンショットは、Anypoint Studio のコンシューマーフローを示しています。

Studio での Confluent Schema Registry Connector のコンシューマーフロー

このフローでは、次を設定します。

  • [Message listener]

  • [Logger]​ コンポーネント

  • [Replace id with AVRO schema]​ 操作

  • [Transform Message]​ コンポーネント

  • 2 つ目の ​[Logger]​ コンポーネント

[Message Listener] を追加する

[Message listener]​ では、Kafka トピックからパブリッシュ済みのメッセージをコンシュームします。

  1. [Message listener]​ をキャンバスにドラッグします。

  2. [Message listener]​ プロパティウィンドウで、次の項目を設定します。

    項目

    Display Name (表示名)

    操作の名前 (​Producer​ など)

    Topic (トピック)

    #[payload.topic]

    Key (キー)

    #[now()]

  3. [Connector configuration (コネクタ設定)]​ 項目の横にある ​[Add (追加)]​ アイコンをクリックし、グローバル要素設定項目にアクセスします。

  4. 次の項目に入力します。

    1. [Bootstrap server URLs (ブートストラップサーバー URL)]​ 項目で、​[Edit inline (インライン編集)]​ を選択し、​[Add (追加)]​ アイコンをクリックします。

    2. 値 ​${config.basic.bootstrapServers}​ を入力して ​[Finish (完了)]​ をクリックします。

    3. [Group ID (グループ ID)]​ 項目に ​${consumer.groupId}​ と入力します。

    4. [Topic Subscription Patterns (トピックサブスクリプションパターン)]​ 項目で、​[Edit inline (インライン編集)]​ を選択し、​[Add (追加)]​ アイコンをクリックします。

    5. 値 ​${config.topics}​ を入力して ​[Finish (完了)]​ をクリックします。

最初の [Logger] コンポーネントを追加する

  1. [Mule Palette (Mule パレット)]​ で、「​logger​」を検索します。

  2. [Logger]​ コンポーネントをキャンバスの ​[Message listener]​ の横にドラッグします。

  3. 次の項目を設定します。

    項目

    Display Name (表示名)

    ロガーの名前 (​Log Response​ など)

    Message (メッセージ)

    「​#[payload]​」と入力します。

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

[Replace ID with AVRO Schema] 操作を追加する

[Replace id with AVRO schema]​ 操作では、埋め込まれた ID を avro スキーマに置き換えます。

  1. [Replace id with AVRO schema]​ 操作をキャンバスの ​[Logger]​ コンポーネントの横にドラッグします。

  2. [Replace id with AVRO schema]​ の設定で、​[Connector configuration (コネクタ設定)]​ ドロップダウンをクリックし、​[Confluent_Schema_Registry_Connector_Config]​ を選択します。

  3. [Replace id with AVRO schema]​ プロパティウィンドウで、次の項目を設定します。

    項目

    Message (メッセージ)

    payload

[Transform Message] コンポーネントを追加する

[Transform Message]​ コンポーネントでは、ペイロードを JSON に変換します。

  1. [Mule Palette (Mule パレット)]​ で、「​transform message​」を検索します。

  2. [Transform Message]​ コンポーネントをキャンバスの ​[Replace id with AVRO schema]​ 操作の横にドラッグします。

  3. Transform Message​ 設定で、​[Output (出力)]​ セクションの括弧を次の XML で上書きします。

    %dw 2.0
    output application/json
    ---
    payload
    xml

2 つ目の [Logger] コンポーネントを追加する

  1. [Mule Palette (Mule パレット)]​ で、「​logger​」を検索します。

  2. [Logger]​ コンポーネントをキャンバスの ​[Transform Message]​ の横にドラッグします。

  3. 次の項目を設定します。

    項目

    Display Name (表示名)

    ロガーの名前 (​Log Response​ など)

    Message (メッセージ)

    「​#[payload[0]]​」と入力します。

    Level (レベル)

    INFO (Default) (INFO (デフォルト))

Avro メッセージをパブリッシュおよびコンシュームするための XML