Confluent Schema Registry と Kafka Connector の統合 - 例

次の例は、Kafka Connector を使用して Confluent Schema Registry を設定する方法を示しています。

さまざまなソースからのストリーミングデータを処理する Mule 4 アプリケーションを開発していると想像してください。データが定義済みスキーマに準拠していることを確認し、スキーマの進化を効果的に管理するには、Confluent Schema Registry を Kafka Connector と統合します。このセットアップは、データの整合性、およびデータのプロデューサーとコンシューマーの異なるバージョン間での互換性を適用する必要がある開発者にとって重要です。

この設定は 2 つの部分で構成されます。

  • Consumer​ 操作の ​[Additional Properties (追加プロパティ)]​ 項目を使用した Kakfa Connector 接続での Confluent Schema Registry の設定。

  • Mule アプリケーションの POM ファイルでの Kafka Avro 連動関係の設定

Kakfa Connector 接続で Confluent Schema Registry を設定する

[Additional Properties (追加プロパティ)]​ 項目を使用して、Confluent Schema Registry を設定します。

Consume 操作を設定する

Anypoint Studio で ​Consumer​ 操作を設定する手順は、次のとおりです。

  1. Anypoint Studio で Mule プロジェクトを開き、​[Global Elements (グローバル要素)]​ タブに移動します。

  2. [Create (作成)]​ をクリックします。

  3. 検索ボックスに​「kafka」​と入力します。

  4. [Apache Kafka Consumer configuration (Apache Kafka コンシューマー設定)]​ を選択します。

  5. [OK]​ をクリックします。

  6. 接続の下の設定画面で、​[Advanced (詳細)]​ を選択します。

  7. 必須接続パラメーターを入力します。

  8. [Additional Properties (追加プロパティ)]​ で、​[Edit Inline (インライン編集)]​ を選択します。

  9. プラス記号をクリックして、新規プロパティを追加します。

  10. [Key (キー)]​ を ​key.deserializer​ に、​[Value (値)]​ を ​io.confluent.kafka.serializers.KafkaAvroDeserializer​ に設定します。

  11. ステップ 9 を繰り返して、必要な Confluent Schema Registry プロパティをすべて追加します。

[Configuration XML (設定 XML)]​ ウィンドウで、​<kafka:additional-property>​ タグの下の完全な Confluent Schema Registry 設定を確認します。

<kafka:consumer-config name="Apache_Kafka_Consumer_configuration” >
<kafka:consumer-plaintext-connection groupId="JenkinsTest">
<kafka:bootstrap-servers >
	<kafka:bootstrap-server value="localhost:9092" />
	</kafka:bootstrap-servers>
<kafka:additional-properties >
  <kafka:additional-property key="key.deserializer" value="’io.confluent.kafka.serializers.KafkaAvroDeserializer” />
    <kafka:additional-property key="value.deserializer value="io.confluent.kafka.serializers.KafkaAvroDeserializer" />
       <kafka:additional-property key="schema.registry.url" value="https://example.us-east-2.aws.confluent.cloud" />
      <kafka:additional-property key="basic.auth.credentials.source" value="USER_INFO" />
      <kafka:additional-property key="basic.auth.user.info" value="<put your credentials here>" />
       <kafka:additional-property key="auto.register.schemas" value="false" />
     <kafka:additional-property key="value.subject.name.strategy" value="io.confluent.kafka.serializers.subject.TopicNameStrategy" />
        </kafka:additional-properties>
        <kafka:topic-patterns >
		<kafka:topic-pattern value="InfraTopic" />
	    </kafka:topic-patterns>
        </kafka:consumer-plaintext-connection>
	    </kafka:consumer-config>
xml

POM ファイルで Kafka Avro 連動関係を設定する

Mule アプリケーションの POM ファイルで、シリアル化および非シリアル化に使用するクラスを含む Kafka Avro 連動関係を設定します。POM ファイルの ​<dependencies>​、​<plugin>​、および ​<repository>​ セクションを更新する必要があります。

<dependencies> セクションを設定する

<dependencies>​ セクションで、​kafka-avro-serializer​ を設定します。

<dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>7.4.0</version>
        </dependency>
</dependencies>
xml

<plugin> セクションを設定する

<plugin>​ セクションで、​<sharedLibrary>​ として ​kafka-avro-serializer​ を設定します。

<plugin>
	<groupId>org.mule.tools.maven</groupId>
	<artifactId>mule-maven-plugin</artifactId>
	<version>${mule.maven.plugin.version}</version>
		<extensions>true</extensions>
				<configuration>
					<additionalPluginDependencies>
						<plugin>
							<groupId>com.mulesoft.connectors</groupId>
							<artifactId>mule-kafka-connector</artifactId>
							<additionalDependencies>
								<dependency>
									<groupId>io.confluent</groupId>
									<artifactId>kafka-avro-serializer</artifactId>
									<version>7.6.1</version>
								</dependency>
							</additionalDependencies>
						</plugin>
					</additionalPluginDependencies>
					<sharedLibraries>
						<sharedLibrary>
							<groupId>io.confluent</groupId>
							<artifactId>kafka-avro-serializer</artifactId>
						</sharedLibrary>
					</sharedLibraries>
					<classifier>mule-application</classifier>
   </configuration>
</plugin>
xml

次に、​avro-maven-plugin​ プラグインを設定します。​org.apache.avro​ グループのこの連動関係は、Java プロジェクトでの Avro スキーマファイルの処理を容易にする Apache Maven のプラグインです。このプラグインは、Avro スキーマ (.avsc)、プロトコル (.avpr)、および IDL ファイル (.avdl) から Java クラスを自動的に生成します。このプロセスにより、Avro スキーマを表す Java クラスを手動で記述する必要がなくなり、Java コードが常に Avro スキーマと同期されるため、開発が簡略化されます。

<sourceDirectory>​ では、Avro スキーマが存在するフォルダー (​<sourceDirectory>${project.basedir}/PATH_TO_SCHEMAS</sourceDirectory>​ など) を指定します。

<plugin>
<groupId>org.apache.avro</groupId>
	<artifactId>avro-maven-plugin</artifactId>
	<version>1.10.2</version>
	<executions>
		<execution>
			<phase>generate-sources</phase>
			<goals>
				<goal>schema</goal>
			</goals>
			<configuration>
            <sourceDirectory>${project.basedir}/PATH_TO_SCHEMAS</sourceDirectory>
			</configuration>
		</execution>
	</executions>
</plugin>
xml

<repository> セクションを設定する

<repository>​ セクションで、連動関係のリポジトリを設定します。

<repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
 </repository>
xml