Apache Kafka Connector 4.9 - 追加設定情報 - Mule 4

Apache Kafka 用 Anypoint Connector (Apache Kafka Connector) の追加設定を定義します。

カスタムパーティショナーを設定する

Mule アプリケーションで Apache Kafka Connector のカスタムパーティショナーを設定できます。

  1. Maven を使用して Java プロジェクトを作成し、それにカスタムパーティショナーを追加します。

    public class CustomPartitioner implements Partitioner {
    // Implementation
    }
    java
  2. Mule アプリケーションに次の連動関係を追加します。

    <dependencies>
     <dependency>
       <groupId>org.example</groupId>
       <artifactId>custom-partitioner</artifactId>
       <version>1.0-SNAPSHOT</version>
     </dependency>
    </dependencies>
    java
  3. この連動関係を ​additionalPluginDependencies​ に追加します。

    <plugin>
     <groupId>org.mule.tools.maven</groupId>
     <artifactId>mule-maven-plugin</artifactId>
     <version>${mule.maven.plugin.version}</version>
     <extensions>true</extensions>
     <configuration>
      <additionalPluginDependencies>
      <plugin>
       <groupId>com.mulesoft.connectors</groupId>
       <artifactId>mule-kafka-connector</artifactId>
        <additionalDependencies>
         <dependency>
          <groupId>io.confluent</groupId>
          <artifactId>kafka-avro-serializer</artifactId>
          <version>7.6.1</version>
         </dependency>
         <dependency>
          <groupId>org.example</groupId>
          <artifactId>custom-partitioner</artifactId>
          <version>1.0-SNAPSHOT</version>
         </dependency>
        </additionalDependencies>
       </plugin>
       </additionalPluginDependencies>
       <classifier>mule-application</classifier>
     </configuration>
    </plugin>
    java
  4. カスタムパーティショナーをプロデューサー設定に追加します。

    <kafka:producer-config name="Apache_Kafka_Producer_configuration"
        doc:name="Apache Kafka Producer configuration"
        doc:id="cb9de3c5-d6d2-4f9b-a255-75a9985bffb7">
        <kafka:producer-plaintext-connection>
            <kafka:bootstrap-servers>
                <kafka:bootstrap-server value="<kafka-url>" />
            </kafka:bootstrap-servers>
            <kafka:additional-properties>
                <kafka:additional-property key="key.serializer"
                    value="org.apache.kafka.common.serialization.StringSerializer" />
                <kafka:additional-property key="value.serializer"
                    value="com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer" />
                <kafka:additional-property key="schema.registry.url"
                    value="<schema-registry-url>" />
                <kafka:additional-property key="basic.auth.credentials.source"
                    value="USER_INFO" />
                <kafka:additional-property key="basic.auth.user.info"
                    value="user:pass" />
                <kafka:additional-property key="auto.register.schemas"
                    value="true" />
                <kafka:additional-property key="value.subject.name.strategy"
                    value="io.confluent.kafka.serializers.subject.TopicNameStrategy" />
                <kafka:additional-property key="partitioner.class"
                    value="org.example.CustomPartitioner" />
            </kafka:additional-properties>
        </kafka:producer-plaintext-connection>
    </kafka:producer-config>
    java

次のステップ

コネクタの設定が完了したら、​『例』​を試すことができます。