Apache Kafka 4.5 Examples - Mule 4
Publish and then Consume a Topic
This example shows how to use two Anypoint Connector for Apache Kafka (Apache Kafka Connector) operations, Publish and Consume, to publish a message to Apache Kafka and then retrieve it.
Create the Producer flow
The Producer flow publishes a message to Apache Kafka. The following screenshot shows the Producer flow in Anypoint Studio:
To create the Producer flow:
-
Drag HTTP Listener to the canvas.
-
Change the flow name to
Producer-Flow
. -
Set the Display Name of Listener to
Push message endpoint
and the Path to/pushMessage
. -
Drag a Logger component to the right of Push message endpoint on the Studio canvas and set the message to:
#["Message: '" ++ payload.message ++ "' is going to be published to topic: '" ++ payload.topic ++ "'."]
-
Drag the Kafka Publish operation to the right of Logger on the Studio canvas.
-
Set the Display Name field to
Producer
, the Topic field to#[payload.topic]
, and the Key field to#[now()]
. -
Click the green plus icon to the right of the Connector configuration field to access the global element configuration fields.
-
In the Bootstrap server URLs field, select Edit inline and then click the green plus sign.
-
Enter the value
${config.basic.bootstrapServers}
and click Finish.
-
-
Declare and complete the variables previously described in the corresponding YAML file (for example:
src/main/resources/properties/kafka-producer.yaml
), and load that properties file as a global element.For more information about using a properties file, see Configuring Property Placeholders
-
Drag the Set Payload component to the right of Producer on the Studio canvas.
-
Change the Display Name field to
Push response builder
. -
Set the Value to
Message successfully sent to Kafka topic.
Create the Consumer Flow
The Consumer flow consumes a message from Apache Kafka. The following screenshot shows the Consumer flow in Studio:
To create the Consumer flow:
-
Drag the Kafka Consume Message input source to the Anypoint Studio canvas.
-
Change the flow name to
Consumer-Flow
. -
Set the Display Name field of Message Listener to
Consume message endpoint
. -
Click the green plus icon to the right of the Connector configuration field to access the global element configuration fields.
-
Complete these fields:
-
In the Bootstrap server URLs field, select Edit inline and then click the green plus sign.
-
Enter the value
${config.basic.bootstrapServers}
and click Finish. -
In the Group ID field, enter
${consumer.groupId}
. -
In the Topic Subscription Patterns field, select Edit inline and then click the green plus sign.
-
Enter the value
${config.topics}
and click Finish.
-
-
Declare and complete the variables previously described in the corresponding YAML file (for example:
src/main/resources/properties/Kafka-consumer.yaml
), and load that properties file as a global element.For more information about using a properties file, see Configuring Property Placeholders
-
Drag a Logger component to the right of Consume message endpoint on the Studio canvas, and set the message to:
'New message arrived: ' ++ payload ++ ", key:" ++ attributes.key ++ ", partition:" ++ attributes.partition ++ ", offset:" ++ attributes.offset
XML for Consuming and Publishing a Topic
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:Kafka="http://www.mulesoft.org/schema/mule/Kafka"
xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/ee/core
http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
http://www.mulesoft.org/schema/mule/kafka
http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
<configuration-properties file="mule-app.properties">
</configuration-properties>
<http:listener-config name="HTTP_Listener_config"
doc:name="HTTP Listener config" >
<http:listener-connection host="0.0.0.0" port="8081" />
</http:listener-config>
<kafka:consumer-config name="Apache_Kafka_Consumer_configuration"
doc:name="Apache Kafka Consumer configuration" >
<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value='${topic1}
<kafka:topic-pattern value="topic-1" />
<kafka:topic-pattern value="topic-2" />
</kafka:topic-patterns>]' />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<kafka:producer-config name="Apache_Kafka_Producer_configuration"
doc:name="Apache Kafka Producer configuration" >
<kafka:producer-plaintext-connection >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
</kafka:bootstrap-servers>
</kafka:producer-plaintext-connection>
</kafka:producer-config>
<kafka:consumer-config name="Apache_Kafka_Consumer_configuration1" doc:name="Apache Kafka Consumer configuration" >
<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
<kafka:bootstrap-servers >
<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
</kafka:bootstrap-servers>
<kafka:topic-patterns >
<kafka:topic-pattern value="${config.topics}" />
</kafka:topic-patterns>
</kafka:consumer-plaintext-connection>
</kafka:consumer-config>
<flow name="Producer-Flow" >
<http:listener doc:name="Push message endpoint" config-ref="HTTP_Listener_config" path="/pushMessage" />
<logger level="INFO" doc:name="Logger" message="&quot;Message: '&quot; ++ payload.message ++ &quot;' is going to be published to topic: '&quot; ++ payload.topic ++ &quot;'.&quot;" />
<kafka:publish doc:name="Producer" topic="#[payload.topic]" key="#[now()]" config-ref="Apache_Kafka_Producer_configuration" />
<set-payload value="Message successfully sent to Apache Kafka topic." doc:name="Push response builder" />
</flow>
<flow name="Consumer-Flow" >
<kafka:message-listener doc:name="Consume message endpoint" config-ref="Apache_Kafka_Consumer_configuration"/>
<logger level="INFO" doc:name="Logger" message="'New message arrived: ' ++ payload ++ &quot;, key:&quot; ++ attributes.key ++ &quot;, partition:&quot; ++ attributes.partition ++ &quot;, offset:&quot; ++ attributes.offset"/>
</flow>
</mule>