Contact Us 1-800-596-4880

Integrate Confluent Schema Registry with Kafka Connector - Example

The following example shows how to configure the Confluent Schema Registry using Kafka Connector.

Imagine you are developing a Mule 4 app that processes streaming data from various sources. To ensure the data conforms to defined schemas and to manage schema evolution effectively, you integrate Confluent Schema Registry with your Kafka Connector. This setup is crucial for developers who need to enforce data integrity and compatibility across different versions of data producers and consumers.

The configuration consists of two parts:

  • Configuring the Confluent Schema Registry in the Kakfa Connector connection using the Additional Properties field (either for the Consumer or Producer operation)

  • Configuring the Kafka Avro dependency in the POM file of the Mule app

Configure the Confluent Schema Registry in the Kakfa Connector Connection

Use the Additional Properties field to configure the Confluent Schema Registry.

Configure the Consumer Operation

To configure the Consumer operation in Anypoint Studio:

  1. Open your Mule project in Anypoint Studio and navigate to the Global Elements tab.

  2. Click Create.

  3. In the search box, type kafka.

  4. Select Apache Kafka Consumer configuration.

  5. Click OK.

  6. In the configuration screen under connection, select Advanced.

  7. Fill in all the required connection parameters.

  8. For Additional properties, select Edit inline.

  9. Click the plus sign to add a new additional property.

  10. Set Key to key.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

  11. Repeat step 9 to add all the required Confluent Schema Registry properties.

In the Configuration XML window, check the complete confluent schema registry configuration under the <kafka:additional-property> tags:

<kafka:consumer-config name="Apache_Kafka_Consumer_configuration” >
<kafka:consumer-plaintext-connection groupId="JenkinsTest">
<kafka:bootstrap-servers >
	<kafka:bootstrap-server value="localhost:9092" />
	</kafka:bootstrap-servers>
<kafka:additional-properties >
  <kafka:additional-property key="key.deserializer" value="’io.confluent.kafka.serializers.KafkaAvroDeserializer” />
    <kafka:additional-property key="value.deserializer value="io.confluent.kafka.serializers.KafkaAvroDeserializer" />
       <kafka:additional-property key="schema.registry.url" value="https://example.us-east-2.aws.confluent.cloud" />
      <kafka:additional-property key="basic.auth.credentials.source" value="USER_INFO" />
      <kafka:additional-property key="basic.auth.user.info" value="<put your credentials here>" />
       <kafka:additional-property key="auto.register.schemas" value="false" />
     <kafka:additional-property key="value.subject.name.strategy" value="io.confluent.kafka.serializers.subject.TopicNameStrategy" />
        </kafka:additional-properties>
        <kafka:topic-patterns >
		<kafka:topic-pattern value="InfraTopic" />
	    </kafka:topic-patterns>
        </kafka:consumer-plaintext-connection>
	    </kafka:consumer-config>

Configure the Producer Operation

For the Producer operation, you replace key.deserializer and value.deserializer with key.serializer and value.serializer. To configure the Producer operation in Anypoint Studio:

  1. Open your Mule project in Anypoint Studio and navigate to the Global Elements tab.

  2. Click Create.

  3. In the search box, type kafka.

  4. Select Apache Kafka Producer configuration.

  5. Click OK.

  6. In the configuration screen under connection, select Advanced.

  7. Fill in all the required connection parameters.

  8. For Additional properties, select Edit inline.

  9. Click the plus sign to add a new additional property.

  10. Set Key to key.serializer and Value to io.confluent.kafka.serializers.KafkaAvroSerializer.

  11. Repeat step 9 to add all the required Confluent Schema Registry properties.

In the Configuration XML window, check the complete Confluent Schema Registry configuration under the <kafka:additional-property> tags:

<kafka:producer-config name="Apache_Kafka_Consumer_configuration” >
<kafka:producer-plaintext-connection groupId="JenkinsTest">
<kafka:bootstrap-servers >
		<kafka:bootstrap-server value="ec2-3-226-204-173.compute-1.amazonaws.com:9092" />
	</kafka:bootstrap-servers>
<kafka:additional-properties >
  <kafka:additional-property key="key.serializer" value="’io.confluent.kafka.serializers.KafkaAvroSerializer” />
       <kafka:additional-property key="value.serializer value="io.confluent.kafka.serializers.KafkaAvroSerializer" />
       <kafka:additional-property key="schema.registry.url" value="https://example.us-east-2.aws.confluent.cloud" />
      <kafka:additional-property key="basic.auth.credentials.source" value="USER_INFO" />
      <kafka:additional-property key="basic.auth.user.info" value="<put your credentials here> />
       <kafka:additional-property key="auto.register.schemas" value="false" />
     <kafka:additional-property key="value.subject.name.strategy" value="io.confluent.kafka.serializers.subject.TopicNameStrategy" />

</kafka:additional-properties>
<kafka:topic-patterns >
		<kafka:topic-pattern value="InfraTopic" />
	</kafka:topic-patterns>
</kafka:producer-plaintext-connection>
	</kafka:producer-config>

Configure the Kafka Avro dependency in the POM File

In the POM file of the Mule app, configure the Kafka Avro dependency that contains the class to use to serialize and deserialize. You must update the <dependencies>, <plugin>, and <repository> sections of the POM file.

Configure the <dependencies> Section

In the <dependencies> section, set kafka-avro-serializer:

<dependencies>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>7.4.0</version>
        </dependency>
</dependencies>

Configure the <plugin> Section

In the <plugin> section, set kafka-avro-serializer as a <sharedLibrary>:

<plugin>
	<groupId>org.mule.tools.maven</groupId>
	<artifactId>mule-maven-plugin</artifactId>
	<version>${mule.maven.plugin.version}</version>
		<extensions>true</extensions>
				<configuration>
					<additionalPluginDependencies>
						<plugin>
							<groupId>com.mulesoft.connectors</groupId>
							<artifactId>mule-kafka-connector</artifactId>
							<additionalDependencies>
								<dependency>
									<groupId>io.confluent</groupId>
									<artifactId>kafka-avro-serializer</artifactId>
									<version>7.6.1</version>
								</dependency>
							</additionalDependencies>
						</plugin>
					</additionalPluginDependencies>
					<sharedLibraries>
						<sharedLibrary>
							<groupId>io.confluent</groupId>
							<artifactId>kafka-avro-serializer</artifactId>
						</sharedLibrary>
					</sharedLibraries>
					<classifier>mule-application</classifier>
   </configuration>
</plugin>

Then, set the avro-maven-plugin plugin. This dependency from the org.apache.avro group is a plugin for Apache Maven that facilitates handling Avro schema files in Java projects. The plugin automatically generates Java classes from Avro schemas (.avsc), protocols (.avpr), and IDL files (.avdl). This process simplifies development by eliminating the need to manually write Java classes that represent Avro schemas, ensuring that the Java code is always synchronized with the Avro schemas.

The <sourceDirectory> specifices the folder where the Avro schemas live, for example <sourceDirectory>${project.basedir}/PATH_TO_SCHEMAS</sourceDirectory>.

<plugin>
<groupId>org.apache.avro</groupId>
	<artifactId>avro-maven-plugin</artifactId>
	<version>1.10.2</version>
	<executions>
		<execution>
			<phase>generate-sources</phase>
			<goals>
				<goal>schema</goal>
			</goals>
			<configuration>
            <sourceDirectory>${project.basedir}/PATH_TO_SCHEMAS</sourceDirectory>
			</configuration>
		</execution>
	</executions>
</plugin>

Configure the <repository> Section

In the <repository> section, configure the repository of the dependency:

<repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
 </repository>
View on GitHub