Kafka コネクタ

Select

Apache Kafka 用 Anypoint コネクタを使用すると、Apache Kafka メッセージングシステムとやりとりでき、Mule Runtime を使用した Mule アプリケーションと Apache Kafka クラスタ間のシームレスなインテグレーションが可能になります。

このユーザガイドに目を通し、Apache Kafka コネクタを使用した基本的な Mule フローのセットアップおよび設定方法を理解してください。

前提条件

このドキュメントは、読者が Mule、Anypoint コネクタ、Anypoint Studio、Mule の概念、Mule フローの要素、グローバル要素に精通していることを前提としています。

対象リソースへの接続をテストするには、ログイン情報が必要です。

ハードウェアとソフトウェアの要件および互換性に関する情報は、「コネクタリリースノート」を参照してください。

Maven でこのコネクタを使用するには、Anypoint Exchange の [Dependency Snippets (連動関係スニペット)] で pom.xml の連動関係情報を確認してください。

このコネクタの新機能

  • 接続設定 - コンシューマとプロデューサの両方の接続種別を次の中から選択できます。

    • 基本 - Kafka が受け入れるプロパティの中から、必要なものを柔軟に設定できる。

    • SSL - 1 つ以上のブローカーで SSL 接続が必要な場合はこの設定を使用する。

    • Kerberos - 1 つ以上のブローカーでテキスト形式接続の Kerberos が必要な場合はこの設定を使用する。

    • Kerberos SS - 1 つ以上のブローカーで SSL 接続の Kerberos が必要な場合はこの設定を使用する。

  • コンシューマトリガ - マップに評価される MEL の代わりに、オフセットのリストを指定する必要がある、パーティションオフセットに使用する。

  • コンシューマトリガ - 生成されるメッセージに、トピック、キー、パーティション、オフセットの属性が関連付けられる。

Design Center での接続方法

  1. Design Center で、[Set Up (セットアップ)] > [Upload (アップロード)] をクリックし、ファイルシステムにあるこのコネクタのドライバを参照して選択し、アップロードします。または、すでにアップロードされているドライバを検索して選択します。

  2. トリガをクリックします。トリガ時にこのコネクタを選択することで、グローバル要素を作成できます。 グローバル要素が不要な場合は、HTTP リスナまたはスケジューラトリガを使用できます。

  3. このコネクタのグローバル要素 (省略可能) を作成するには、次の項目を設定します。

    1. 基本:

      • Bootstrap Servers (ブートストラップサーバ) - Kafka クラスタへの初期接続の確立に使用するコンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサ/コンシューマ) に指定する必要のある bootstrap.servers 値と同じです。

      • Additional properties (追加プロパティ) - 接続に必要なキーと値の追加プロパティ。Kafka がサポートする任意のプロパティを入力できます。

        基本 DC 設定
    2. SSL:

      • 基本設定のすべてのパラメータ。

      • Key Store Type (キーストア種別) - キーストアファイルのファイル形式。省略可能で、デフォルト値は「JKS」です。

      • Key Store Password (キーストアのパスワード) - キーストアファイルのストアのパスワード。省略可能で、「keyStoreLocation」が設定されている場合にのみ必要です。

      • Key Store Location (キーストアの場所) - キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

      • Trust Store Type (トラストストア種別) - トラストストアファイルのファイル形式。

      • Trust Store Password (トラストストアのパスワード) - トラストストアファイルのパスワード。

      • Trust Store Location (トラストストアの場所) - トラストストアファイルの場所。

        SSL DC 設定
    3. Kerberos:

      • 基本設定のすべてのパラメータ。

      • Principal (プリンシパル) - Kerberos プリンシパル。

      • Keytab (キータブ) - 「principal」に関連付けられている keytab ファイルへのパス。

      • Service Name (サービス名) - Kafka ブローカーが実行時に使用する Kerberos プリンシパル名。

      • Additional JAAS Properties (JAAS の追加プロパティ) - キー→値の追加プロパティ。「sasl.jaas.config」で設定する必要があり、通常は JAAS 設定ファイルに格納します。

        Kerberos DC 設定
    4. Kerberos SSL:

      • 基本設定のすべてのパラメータ。

      • SSL 設定のすべてのパラメータ。

      • Kerberos 設定のすべてのパラメータ。

        Kerberos SSL DC 設定
        どの設定でもコンシューマ (トリガ) を設定するときは、次の項目に入力します

        Consumer Partitions (コンシューマパーティション) - このコンシューマに使用するパーティションの数。
        Group Id (グループ ID) - このコンシューマが属するコンシューマグループを識別する一意の文字列。

  4. プラス記号を選択して、コンポーネントを追加します。

  5. コンポーネントとしてコネクタを選択します。

  6. 次の項目を設定します。

    1. コンシューマトリガ:

      • Topic (トピック) - メッセージのコンシューム元の Kafka トピックの名前。

      • Partition offsets (パーティションオフセット) (省略可能) - パーティションオフセット設定を表すオフセットのリスト。リストの要素ごとに、パーティションのインデックスとオフセットを指定する必要があります。

        DC のコンシューマ
    2. Producer 操作:

      • Topic (トピック) - メッセージの送信先のトピック。

      • Key (キー) - 送信するメッセージに属するキー。

      • Message (メッセージ) - 送信するメッセージ。

        DC のプロデューサ

Anypoint Studio 7 での接続

このコネクタを Anypoint Studio で使用するには、まず Exchange からダウンロードして必要に応じて設定します。

Studio でコネクタをインストールする

  1. Anypoint Studio で、Studio タスクバーの Exchange アイコンをクリックします。

  2. Anypoint Exchange で [Login (ログイン)] をクリックします。

  3. このコネクタを検索して [Install (インストール)] をクリックします。

  4. 画面の指示に従ってこのコネクタをインストールします。

Studio の更新がある場合、右下隅にメッセージが表示されます。メッセージをクリックすると、更新をインストールできます。

Studio で設定する

  1. コネクタをドラッグして Studio キャンバスにドロップします。

  2. コネクタのグローバル要素を作成するには、次の項目を設定します。

    1. 基本:

      • Bootstrap Servers (ブートストラップサーバ) - Kafka クラスタへの初期接続の確立に使用するコンマ区切りのホストとポートのペア。Kafka クライアント (プロデューサ/コンシューマ) に指定する必要のある「bootstrap.servers」値と同じです。

      • Additional properties (追加プロパティ) - 接続に必要なキーと値の追加プロパティ。Kafka がサポートする任意のプロパティを入力できます。

        基本設定
    2. SSL:

      • 基本設定のすべてのパラメータ。

      • Key Store Type (キーストア種別) - キーストアファイルのファイル形式。省略可能で、デフォルト値は「JKS」です。

      • Key Store Password (キーストアのパスワード) - キーストアファイルのストアのパスワード。省略可能で、「keyStoreLocation」が設定されている場合にのみ必要です。

      • Key Store Location (キーストアの場所) - キーストアファイルの場所。省略可能で、コネクタの双方向認証に使用できます。

      • Trust Store Type (トラストストア種別) - トラストストアファイルのファイル形式。

      • Trust Store Password (トラストストアのパスワード) - トラストストアファイルのパスワード。

      • Trust Store Location (トラストストアの場所) - トラストストアファイルの場所。

        SSL 設定
    3. Kerberos:

      • 基本設定のすべてのパラメータ。

      • Principal (プリンシパル) - Kerberos プリンシパル。

      • Keytab (キータブ) - 「principal」に関連付けられている keytab ファイルへのパス。

      • Service Name (サービス名) - Kafka ブローカーが実行時に使用する Kerberos プリンシパル名。

      • Additional JAAS Properties (JAAS の追加プロパティ) - キー→値の追加プロパティ。「sasl.jaas.config」で設定する必要があり、通常は JAAS 設定ファイルに格納します。

        Kerberos 設定
    4. Kerberos SSL:

      • 基本設定のすべてのパラメータ。

      • SSL 設定のすべてのパラメータ。

      • Kerberos 設定のすべてのパラメータ。

        Kerberos SSL 設定
  3. キャンバスにドラッグした操作に基づいて、以下の項目を設定します。

    1. コンシューマトリガ:

      • Topic (トピック) - メッセージのコンシューム元の Kafka トピックの名前。

      • Partition offsets (パーティションオフセット) (省略可能) - パーティションオフセット設定を表すオフセットのリスト。リストの要素ごとに、パーティションのインデックスとオフセットを指定する必要があります。

        コンシューマの Studio 設定
    2. Producer 操作:

      • Topic (トピック) - メッセージの送信先のトピック。

      • Key (キー) - 送信するメッセージに属するキー。

      • Message (メッセージ) - 送信するメッセージ。

        プロデューサの Studio 設定

ユースケース: トピックのコンシューム

Kafka コネクタの用途は、特定のトピックからメッセージをコンシュームしてそのメッセージのフローをフィードすることや、メッセージをトピックにプロジュースすることです。

Studio には次のように表示されます。

コンシューマ:

キャンバスのコンシューマ

プロデューサ:

キャンバスのプロデューサ

ユースケース: XML

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans
      http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core
http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/kafka
http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
    <configuration-properties file="mule-app.properties"></configuration-properties>
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config"  >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>

    <kafka:kafka-consumer-config name="consumer-basic" doc:name="Consumer Basic" >
    <kafka:basic-kafka-consumer-connection consumerPartitions="${consumer.topic.partitions}"
    groupId="${consumer.groupId}" bootstrapServers="${config.basic.bootstrapServers}" />
  </kafka:kafka-consumer-config>
  <kafka:kafka-producer-config name="producer-basic" doc:name="Producer Basic" >
    <kafka:basic-kafka-producer-connection bootstrapServers="${config.basic.bootstrapServers}" />
  </kafka:kafka-producer-config>

    <flow name="consumer-flow" >
        <kafka:consumer config-ref="consumer-krb-plain" topic="${consumer.topic.name}"
        doc:name="Consumer" />
        <logger level="INFO" doc:name="Logger"
        message="#['New message arrived: ' ++ payload ++ &quot;, key:&quot; ++ attributes.key ++ &quot;, partition:&quot; ++ attributes.partition ++ &quot;, offset:&quot; ++ attributes.offset ]"/>
    </flow>
  <flow name="producer-flow" >
        <http:listener config-ref="HTTP_Listener_config" path="/pushMessage" doc:name="Push message endpoint" doc:id="DOC_ID" />
        <logger level="INFO" doc:name="Logger" doc:id="DOC_ID"
        message="#[&quot;Message: '&quot; ++ payload.message ++ &quot;' is going to be published to topic: '&quot; ++ payload.topic ++ &quot;'.&quot;]" />
        <kafka:producer config-ref="producer-krb-plain" topic="#[payload.topic]"
                        key="#[now()]"
                        doc:name="Producer" >
            <kafka:message ><![CDATA[#[payload.message]]]></kafka:message>
        </kafka:producer>
        <set-payload value="Message successfully sent to Kafka topic." doc:name="Push response builder" />
    </flow>
</mule>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub