Confluent Schema Registry 1.0 Examples - Mule 4

This example shows how to publish and consume avro messages by integrating Confluent Schema Registry with Kafka Connector to reduce the size of the messages. The example contains two flows:

  1. The first flow generates an avro message containing the schema and sends it to the Kafka topic. The Transform Message component creates an avro message with the schema and the Replace AVRO schema with id operation replaces the schema embedded in the message with an ID from Confluent Schema Registry.

  2. The second flow reads the avro message from the Kafka topic and prints out the content of the deserialized avro message to the console. The Replace id with AVRO schema operation replaces the ID in the received message with the actual schema retrieved from Confluent Schema Registry.

Prerequisites

You must have:

  • Java 8

  • Anypoint Studio 7.7.0 or later

  • Mule runtime engine (Mule) 4.2.x or later

  • DataWeave

Create the Producer Flow

The Producer flow publishes a message to the configured Kafka topic. The following screenshot shows the Producer flow in Anypoint Studio:

Confluent Schema Registry Connector Producer Flow in Studio

In this flow, you configure:

  • An HTTP Listener component

  • Transform Message component

  • Logger component

  • Replace AVRO schema with id operation

  • Publish operation

  • A second Transform Message component

Configure HTTP Listener

Configure HTTP Listener to initiate a Mule flow when a call is made to the / account path on localhost, port 8082.

This example uses variables for some field values. You can either:

  • Replace the variables with their values in the code.

  • Provide the values for each variable in a properties file and then refer to that file from the connector configuration.

If you don’t know how to use a properties file, see Configuring Property Placeholders.

To configure HTTP Listener:

  1. Create a new Mule project in Studio.

  2. In Mule Palette, search for http and select the Listener operation:

  3. Drag the Listener operation onto the canvas.

  4. In the Listener configuration, click the Add icon next to the Connector configuration field to add a global element.

  5. Click OK to accept the defaults.

  6. Set the Path field to /:

Add the First Transform Message Component

The first Transform Message component generates an avro message containing the schema:

  1. In Mule Palette, search for transform message.

  2. Drag the Transform Message component onto the canvas, next to the Listener component.

  3. In the Transform Message configuration, overlay the brackets in the Output section with this XML:

    %dw 2.0
    output application/avro schemaUrl='classpath://schema.json'
    ---
    {
    	"f1": "demo_message"
    }

The following shows the content of the schema used in this example:

{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}

This schema describes a JSON object that has a field named f1 and has the type string.

Add a Logger Component

  1. In Mule Palette, search for logger.

  2. Drag the Logger component onto the canvas, next to the Transform Message component.

  3. Configure the following fields:

    Field Value

    Display Name

    Name for the logger, such as Log Response

    Message

    Enter #[payload].

    Level

    INFO (Default)

Add the Replace AVRO Schema with ID Operation

The Replace AVRO schema with id operation replaces the schema in an avro message with an ID from Confluent Schema Registry to reduce the size of the message.

  1. Drag the Replace AVRO schema with id operation onto the canvas, next to the Logger component.

  2. In the Replace AVRO schema with id configuration, click the Connector configuration dropdown and select Confluent_Schema_Registry_Connector_Config.

  3. Configure the following fields in the Replace AVRO schema with id properties window:

    Field Value

    Subject

    ${config.subject}

    Message

    payload

Add the Publish Operation

The Publish operation publishes a message to the Kafka topic.

  1. Drag the Publish operation onto the canvas, next to the Replace AVRO schema with id operation.

  2. Configure the following fields in the Publish properties window:

    Field Value

    Display Name

    Name for the operation, such as Producer

    Topic

    #[payload.topic]

    Key

    #[now()]

  3. In the General configuration screen for the Publish operation, click the Add icon to access the global element configuration fields.

    1. In the Bootstrap server URLs field, select Edit inline and then click the Add icon.

    2. Enter the value ${config.basic.bootstrapServers} and click Finish.

Add the Second Transform Message Component

The second Transform Message component prints the response of the Publish operation.

  1. In Mule Palette, search for transform message.

  2. Drag the Transform Message component onto the canvas, next to the Publish operation.

  3. In the Transform Message configuration, overlay the brackets in the Output section with this XML:

    %dw 2.0
    output application/json
    ---
    payload

Create the Consumer Flow

The Consumer flow consumes the published message from the Kafka topic and prints it out to the console. The following screenshot shows the Consumer flow in Anypoint Studio:

Confluent Schema Registry Connector Consumer Flow in Studio

In this flow, you configure:

  • Message listener

  • Logger component

  • Replace id with AVRO schema operation

  • Transform Message component

  • A second Logger component

Add a Message Listener

The Message listener consumes the published message from the Kafka topic.

  1. Drag the Message listener onto the canvas.

  2. Configure the following fields in the Message listener properties window:

    Field Value

    Display Name

    Name for the operation, such as Producer

    Topic

    #[payload.topic]

    Key

    #[now()]

  3. Click the Add icon next to the Connector configuration field to access the global element configuration fields.

  4. Complete these fields:

    1. In the Bootstrap server URLs field, select Edit inline and then click the Add icon.

    2. Enter the value ${config.basic.bootstrapServers} and click Finish.

    3. In the Group ID field, enter ${consumer.groupId}.

    4. In the Topic Subscription Patterns field, select Edit inline and then click the Add icon.

    5. Enter the value ${config.topics} and click Finish.

Add the First Logger Component

  1. In Mule Palette, search for logger.

  2. Drag the Logger component onto the canvas, next to Message listener.

  3. Configure the following fields:

    Field Value

    Display Name

    Name for the logger, such as Log Response

    Message

    Enter #[payload].

    Level

    INFO (Default)

Add the Replace ID with AVRO Schema Operation

The Replace id with AVRO schema operation replaces the embedded ID with the avro schema.

  1. Drag the Replace id with AVRO schema operation onto the canvas, next to the Logger component.

  2. In the Replace id with AVRO schema configuration, click the Connector configuration dropdown and select Confluent_Schema_Registry_Connector_Config.

  3. Configure the following fields in the Replace id with AVRO schema properties window:

    Field Value

    Message

    payload

Add the Transform Message Component

The Transform Message component transforms the payload into JSON.

  1. In Mule Palette, search for transform message.

  2. Drag the Transform Message component onto the canvas, next to the Replace id with AVRO schema operation.

  3. In the Transform Message configuration, overlay the brackets in the Output section with this XML:

    %dw 2.0
    output application/json
    ---
    payload

Add the Second Logger Component

  1. In Mule Palette, search for logger.

  2. Drag the Logger component onto the canvas, next to Transform Message.

  3. Configure the following fields:

    Field Value

    Display Name

    Name for the logger, such as Log Response

    Message

    Enter #[payload[0]].

    Level

    INFO (Default)

XML for Publishing and Consuming Avro Messages

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core" xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka"
	xmlns:confluent-schema-registry="http://www.mulesoft.org/schema/mule/confluent-schema-registry" xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/confluent-schema-registry http://www.mulesoft.org/schema/mule/confluent-schema-registry/current/mule-confluent-schema-registry.xsd
http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd">
	<configuration-properties file="mule-app.properties"/>

	<http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="ac9811ff-9234-4f44-9dc0-83100cb6c1bd" >
		<http:listener-connection host="0.0.0.0" port="8081" />
	</http:listener-config>
	<confluent-schema-registry:config name="Confluent_Schema_Registry_Connector_Config" doc:name="Confluent Schema Registry Connector Config" doc:id="faae820f-cd5a-46d0-9692-f28b53ea3bb6" >
		<confluent-schema-registry:basic-auth-connection username="${config.user}" password="${config.pass}" baseUri="${config.schemaUrl}" />
	</confluent-schema-registry:config>
	<kafka:consumer-config name="Apache_Kafka_Consumer_configuration" doc:name="Apache Kafka Consumer configuration" doc:id="d0443fc9-b2d3-4bbc-939f-126e98255cf3" >
		<kafka:consumer-plaintext-connection groupId="${config.consumerGroup}" >
			<kafka:bootstrap-servers >
				<kafka:bootstrap-server value="${config.bootstrapServer}" />
			</kafka:bootstrap-servers>
			<kafka:topic-patterns >
				<kafka:topic-pattern value="${config.topic}" />
			</kafka:topic-patterns>
		</kafka:consumer-plaintext-connection>
	</kafka:consumer-config>

	<kafka:producer-config name="Apache_Kafka_Producer_configuration" doc:name="Apache Kafka Producer configuration" doc:id="f378b3d4-4486-487d-84e2-8cc80aae7295" >
		<kafka:producer-plaintext-connection >
			<kafka:bootstrap-servers >
				<kafka:bootstrap-server value="${config.bootstrapServer}" />
			</kafka:bootstrap-servers>
		</kafka:producer-plaintext-connection>
	</kafka:producer-config>
	<flow name="demoFlow" doc:id="80c4cd43-2c65-4395-8e3e-52e6f7ed882b" >
		<http:listener doc:name="Listener" doc:id="a0c9691a-b202-4aaf-9dcf-6b1385709ee6" config-ref="HTTP_Listener_config" path="/test"/>
		<ee:transform doc:name="Transform Message" doc:id="4dc43b6d-f7c7-4cba-bbcd-db733cae1de4" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/avro schemaUrl='classpath://schema.json'
---
{
	"f1": "demo_message"
}]]></ee:set-payload>
			</ee:message>
		</ee:transform>
		<confluent-schema-registry:replace-avro-schema-with-id doc:name="Replace AVRO schema with id" doc:id="61b1d3b7-eabe-4d53-b5ca-357fda8bbd1e" config-ref="Confluent_Schema_Registry_Connector_Config" subject="${config.subject}"/>
		<kafka:publish doc:name="Publish" doc:id="aad92d55-a388-4680-86f1-e99adfcb14f5" config-ref="Apache_Kafka_Producer_configuration" topic="${config.topic}"/>
		<ee:transform doc:name="Transform Message" doc:id="f64a1dd7-d091-47bc-a0c7-b533c4c8d492" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
			</ee:message>
		</ee:transform>
	</flow>
	<flow name="demoFlow1" doc:id="a244ee60-143f-4320-a34e-6b572de50341" >
		<kafka:message-listener doc:name="Message listener" doc:id="056db197-1353-407a-ba58-8d90fc68e766" config-ref="Apache_Kafka_Consumer_configuration"/>
		<confluent-schema-registry:replace-id-with-avro-schema doc:name="Replace Id With Avro Schema" doc:id="26a15edf-94e2-4138-8acb-385821fc9710" config-ref="Confluent_Schema_Registry_Connector_Config"/>
		<ee:transform doc:name="Transform Message" doc:id="a17bd48c-37e7-449e-84a0-d8cd14e4925f" >
			<ee:message >
				<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
			</ee:message>
		</ee:transform>
	</flow>
</mule>