Contact Free trial Login

Apache Kafka Examples v4.2 - Mule 4

Publish and then Consume a Topic

This example shows how to use two Anypoint Connector for Apache Kafka (Apache Kafka Connector) operations, Publish and Consume, to publish a message to Apache Kafka and then retrieve it.

Create the Producer flow

The Producer flow publishes a message to Apache Kafka. The following screenshot shows the Producer flow in Anypoint Studio:

Apache Kafka Connector Producer flow in Studio
Figure 1. Create a flow like this one to publish a message to Apache Kafka.

To create the Producer flow:

  1. Drag HTTP Listener to the canvas.

  2. Change the flow name to Producer-Flow.

  3. Set the Display Name of Listener to Push message endpoint and the Path to /pushMessage.

  4. Drag a Logger component to the right of Push message endpoint on the Studio canvas and set the message to:

    #["Message: '" ++ payload.message ++ "' is going to be published to topic: '" ++ payload.topic ++ "'."]
  5. Drag the Kafka Publish operation to the right of Logger on the Studio canvas.

  6. Set the Display Name field to Producer, the Topic field to #[payload.topic], and the Key field to #[now()].

  7. Click the green plus icon to the right of the Connector configuration field to access the global element configuration fields.

    1. In the Bootstrap server URLs field, select Edit inline and then click the green plus sign.

    2. Enter the value ${config.basic.bootstrapServers} and click Finish.

  8. Declare and complete the variables previously described in the corresponding YAML file (for example: src/main/resources/properties/kafka-producer.yaml), and load that properties file as a global element.

    For more information about using a properties file, see Configuring Property Placeholders

  9. Drag the Set Payload component to the right of Producer on the Studio canvas.

  10. Change the Display Name field to Push response builder.

  11. Set the Value to Message successfully sent to Kafka topic.

Create the Consumer Flow

The Consumer flow consumes a message from Apache Kafka. The following screenshot shows the Consumer flow in Studio:

Apache Kafka Connector Consumer flow in Studio
Figure 2. Create a flow like this one to retrieve a message from Apache Kafka.

To create the Consumer flow:

  1. Drag the Kafka Consume Message input source to the Anypoint Studio canvas.

  2. Change the flow name to Consumer-Flow.

  3. Set the Display Name field of Message Listener to Consume message endpoint.

  4. Click the green plus icon to the right of the Connector configuration field to access the global element configuration fields.

  5. Complete these fields:

    1. In the Bootstrap server URLs field, select Edit inline and then click the green plus sign.

    2. Enter the value ${config.basic.bootstrapServers} and click Finish.

    3. In the Group ID field, enter ${consumer.groupId}.

    4. In the Topic Subscription Patterns field, select Edit inline and then click the green plus sign.

    5. Enter the value ${config.topics} and click Finish.

  6. Declare and complete the variables previously described in the corresponding YAML file (for example: src/main/resources/properties/Kafka-consumer.yaml), and load that properties file as a global element.

    For more information about using a properties file, see Configuring Property Placeholders

  7. Drag a Logger component to the right of Consume message endpoint on the Studio canvas, and set the message to:

    'New message arrived: ' ++ payload ++ ", key:" ++ attributes.key ++ ", partition:" ++ attributes.partition ++ ", offset:" ++ attributes.offset

XML for Consuming and Publishing a Topic

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:Kafka="http://www.mulesoft.org/schema/mule/Kafka"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/http
  http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
  http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
	http://www.mulesoft.org/schema/mule/core
	http://www.mulesoft.org/schema/mule/core/current/mule.xsd
	http://www.mulesoft.org/schema/mule/http
	http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
	http://www.mulesoft.org/schema/mule/ee/core
	http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
	http://www.mulesoft.org/schema/mule/kafka
	http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
  <configuration-properties file="mule-app.properties">
	</configuration-properties>
  <http:listener-config name="HTTP_Listener_config"
    doc:name="HTTP Listener config"  >
  <http:listener-connection host="0.0.0.0" port="8081" />
  </http:listener-config>

  <kafka:consumer-config name="Apache_Kafka_Consumer_configuration"
 	  doc:name="Apache Kafka Consumer configuration" >
	  <kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
	  <kafka:bootstrap-servers >
	    <kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
	  </kafka:bootstrap-servers>
	  <kafka:topic-patterns >
	    <kafka:topic-pattern value='${topic1}
	    &lt;kafka:topic-pattern value="topic-1" /&gt;
	    &lt;kafka:topic-pattern value="topic-2" /&gt;
	    &lt;/kafka:topic-patterns&gt;]' />
	  </kafka:topic-patterns>
	  </kafka:consumer-plaintext-connection>
	</kafka:consumer-config>
	<kafka:producer-config name="Apache_Kafka_Producer_configuration"
	  doc:name="Apache Kafka Producer configuration" >
	  <kafka:producer-plaintext-connection >
	  <kafka:bootstrap-servers >
	    <kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
	  </kafka:bootstrap-servers>
	  </kafka:producer-plaintext-connection>
	  </kafka:producer-config>
	<kafka:consumer-config name="Apache_Kafka_Consumer_configuration1" doc:name="Apache Kafka Consumer configuration" >
		<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
			<kafka:bootstrap-servers >
				<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
			</kafka:bootstrap-servers>
			<kafka:topic-patterns >
				<kafka:topic-pattern value="${config.topics}" />
			</kafka:topic-patterns>
		</kafka:consumer-plaintext-connection>
	</kafka:consumer-config>
	<flow name="Producer-Flow" >
		<http:listener doc:name="Push message endpoint" config-ref="HTTP_Listener_config" path="/pushMessage" />
		<logger level="INFO" doc:name="Logger" message="&amp;quot;Message: '&amp;quot; ++ payload.message ++ &amp;quot;' is going to be published to topic: '&amp;quot; ++ payload.topic ++ &amp;quot;'.&amp;quot;" />
		<kafka:publish doc:name="Producer" topic="#[payload.topic]" key="#[now()]" config-ref="Apache_Kafka_Producer_configuration" />
		<set-payload value="Message successfully sent to Apache Kafka topic." doc:name="Push response builder" />
	</flow>
	<flow name="Consumer-Flow" >
		<kafka:message-listener doc:name="Consume message endpoint" config-ref="Apache_Kafka_Consumer_configuration"/>
		<logger level="INFO" doc:name="Logger" message="'New message arrived: ' ++ payload ++ &amp;quot;, key:&amp;quot; ++ attributes.key ++ &amp;quot;, partition:&amp;quot; ++ attributes.partition ++ &amp;quot;, offset:&amp;quot; ++ attributes.offset"/>
	</flow>
</mule>