<kafka:consumer-config name="Apache_Kafka_Consumer_configuration” >
<kafka:consumer-plaintext-connection groupId="JenkinsTest">
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="localhost:9092" />
</kafka:bootstrap-servers>
<kafka:additional-properties >
<kafka:additional-property key="key.deserializer" value="’io.confluent.kafka.serializers.KafkaAvroDeserializer” />
<kafka:additional-property key="value.deserializer value="io.confluent.kafka.serializers.KafkaAvroDeserializer" />
<kafka:additional-property key="schema.registry.url" value="https://example.us-east-2.aws.confluent.cloud" />
<kafka:additional-property key="basic.auth.credentials.source" value="USER_INFO" />
<kafka:additional-property key="basic.auth.user.info" value="<put your credentials here>" />
<kafka:additional-property key="auto.register.schemas" value="false" />
<kafka:additional-property key="value.subject.name.strategy" value="io.confluent.kafka.serializers.subject.TopicNameStrategy" />
</kafka:additional-properties>
<kafka:topic-patterns >
<kafka:topic-pattern value="InfraTopic" />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
Integrate Confluent Schema Registry with Kafka Connector - Example
The following example shows how to configure the Confluent Schema Registry using Kafka Connector.
Imagine you are developing a Mule 4 app that processes streaming data from various sources. To ensure the data conforms to defined schemas and to manage schema evolution effectively, you integrate Confluent Schema Registry with your Kafka Connector. This setup is crucial for developers who need to enforce data integrity and compatibility across different versions of data producers and consumers.
The configuration consists of two parts:
-
Configuring the Confluent Schema Registry in the Kakfa Connector connection using the Additional Properties field (either for the Consumer or Producer operation)
-
Configuring the Kafka Avro dependency in the POM file of the Mule app
Configure the Confluent Schema Registry in the Kakfa Connector Connection
Use the Additional Properties field to configure the Confluent Schema Registry.
Configure the Consumer Operation
To configure the Consumer operation in Anypoint Studio:
-
Open your Mule project in Anypoint Studio and navigate to the Global Elements tab.
-
Click Create.
-
In the search box, type
kafka
. -
Select Apache Kafka Consumer configuration.
-
Click OK.
-
In the configuration screen under connection, select Advanced.
-
Fill in all the required connection parameters.
-
For Additional properties, select Edit inline.
-
Click the plus sign to add a new additional property.
-
Set Key to
key.deserializer
and Value toio.confluent.kafka.serializers.KafkaAvroDeserializer
. -
Repeat step 9 to add all the required Confluent Schema Registry properties.
In the Configuration XML window, check the complete confluent schema registry configuration under the <kafka:additional-property>
tags:
Configure the Producer Operation
For the Producer operation, you replace key.deserializer
and value.deserializer
with key.serializer
and value.serializer
. To configure the Producer operation in Anypoint Studio:
-
Open your Mule project in Anypoint Studio and navigate to the Global Elements tab.
-
Click Create.
-
In the search box, type
kafka
. -
Select Apache Kafka Producer configuration.
-
Click OK.
-
In the configuration screen under connection, select Advanced.
-
Fill in all the required connection parameters.
-
For Additional properties, select Edit inline.
-
Click the plus sign to add a new additional property.
-
Set Key to
key.serializer
and Value toio.confluent.kafka.serializers.KafkaAvroSerializer
. -
Repeat step 9 to add all the required Confluent Schema Registry properties.
In the Configuration XML window, check the complete Confluent Schema Registry configuration under the <kafka:additional-property>
tags:
<kafka:producer-config name="Apache_Kafka_Consumer_configuration” >
<kafka:producer-plaintext-connection groupId="JenkinsTest">
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="ec2-3-226-204-173.compute-1.amazonaws.com:9092" />
</kafka:bootstrap-servers>
<kafka:additional-properties >
<kafka:additional-property key="key.serializer" value="’io.confluent.kafka.serializers.KafkaAvroSerializer” />
<kafka:additional-property key="value.serializer value="io.confluent.kafka.serializers.KafkaAvroSerializer" />
<kafka:additional-property key="schema.registry.url" value="https://example.us-east-2.aws.confluent.cloud" />
<kafka:additional-property key="basic.auth.credentials.source" value="USER_INFO" />
<kafka:additional-property key="basic.auth.user.info" value="<put your credentials here> />
<kafka:additional-property key="auto.register.schemas" value="false" />
<kafka:additional-property key="value.subject.name.strategy" value="io.confluent.kafka.serializers.subject.TopicNameStrategy" />
</kafka:additional-properties>
<kafka:topic-patterns >
<kafka:topic-pattern value="InfraTopic" />
</kafka:topic-patterns>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
Configure the Kafka Avro dependency in the POM File
In the POM file of the Mule app, configure the Kafka Avro dependency that contains the class to use to serialize and deserialize. You must update the <dependencies>
, <plugin>
, and <repository>
sections of the POM file.
Configure the <dependencies> Section
In the <dependencies>
section, set kafka-avro-serializer
:
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.4.0</version>
</dependency>
</dependencies>
Configure the <plugin> Section
In the <plugin>
section, set kafka-avro-serializer
as a <sharedLibrary>
:
<plugin>
<groupId>org.mule.tools.maven</groupId>
<artifactId>mule-maven-plugin</artifactId>
<version>${mule.maven.plugin.version}</version>
<extensions>true</extensions>
<configuration>
<additionalPluginDependencies>
<plugin>
<groupId>com.mulesoft.connectors</groupId>
<artifactId>mule-kafka-connector</artifactId>
<additionalDependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.6.1</version>
</dependency>
</additionalDependencies>
</plugin>
</additionalPluginDependencies>
<sharedLibraries>
<sharedLibrary>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
</sharedLibrary>
</sharedLibraries>
<classifier>mule-application</classifier>
</configuration>
</plugin>
Then, set the avro-maven-plugin
plugin. This dependency from the org.apache.avro
group is a plugin for Apache Maven that facilitates handling Avro schema files in Java projects. The plugin automatically generates Java classes from Avro schemas (.avsc), protocols (.avpr), and IDL files (.avdl). This process simplifies development by eliminating the need to manually write Java classes that represent Avro schemas, ensuring that the Java code is always synchronized with the Avro schemas.
The <sourceDirectory>
specifices the folder where the Avro schemas live, for example <sourceDirectory>${project.basedir}/PATH_TO_SCHEMAS</sourceDirectory>
.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.10.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/PATH_TO_SCHEMAS</sourceDirectory>
</configuration>
</execution>
</executions>
</plugin>