Contact Us 1-800-596-4880

Publish and Then Consume a Topic with Kafka Connector

This example shows how to use two Apache Kafka Connector operations, Publish and Consume, to publish a message to Apache Kafka and then retrieve it. The example contains two flows:

  • The first flow is the Producer flow, which publishes a message to Apache Kafka.

  • The second flow is the Consumer flow, which consumes a message from Apache Kafka.

Apache Kafka Connector Producer flow in Studio
Apache Kafka Connector Consumer flow in Studio

Configure the First Flow

Configure the first flow that publishes a message to Apache Kafka. Creating the first flow involves configuring a Listener component, a Logger component, a Publish operation, and a Set Payload transformer.

Configure the HTTP Listener

Configure the Listener component to initiate a Mule flow when a call is made to the /pushMessage path:

  1. Create a new Mule project in Studio.

  2. From the Mule Palette view, select HTTP and drag the Listener component to the canvas.

  3. In the properties window, click + next to the Connector configuration field to add a global element.

  4. Accept the defaults.

  5. In the properties window, set the Path field value to /pushMessage.

Add the Logger Component

Add the Logger component to display the response in the Mule console:

  1. From the Mule Palette view, select Core and drag a Logger component to the right of Listener.

  2. In the properties window, configure the following fields:

    Field Value

    Display Name

    Name for the logger.

    Message

    #["Message: '" payload.message "' is going to be published to topic: '" payload.topic "'."]

    Level

    INFO (Default)

    The following image shows an example of the Logger configuration in the properties window:

    Logger properties window configuration

Add the Publish Operation

Add the Publish operation to publish a message to the specified Kafka topic:

  1. From the Mule Palette view, select Apache Kafka and drag the Publish operation to the right of Logger.

  2. In the properties window, click + next to the Connector configuration field to add a global element.

  3. Configure the global element as follows:

    1. In the Bootstrap Server URLs field, select Edit inline and then click the green plus sign.

    2. Enter the value ${config.basic.bootstrapServers} and click Finish.

  4. In the properties window, configure the following fields:

    Field Value

    Display Name

    Name for the connector operation.

    Connector configuration

    Global configuration you just created.

    Topic

    \#[payload.topic]

    Key

    #[now()]

    Message

    payload

    The following image shows an example of the Publish configuration in the properties window:

    Publish properties window configuration

Add the Set Payload Transformer

Add the Set Payload transformer to push the response builder:

  1. From the Mule Palette view, select Core and drag the Set Payload transformer to the right of Publish.

  2. In the properties window, configure the following fields:

    Field Value

    Display Name

    Name for the transformer, such as Push response builder.

    Value

    Message successfully sent to Apache Kafka topic.

    The following image shows an example of the Set Payload configuration in the properties window:

    Set Payload properties window configuration

Configure the Second Flow

Configure the second flow that consumes a message from Apache Kafka. Creating the second flow involves configuring a Message listener source and a Logger component.

Add the Message Listener Source

Add the Message listener source to consume a message endpoint:

  1. From the Mule Palette view, select Apache Kafka and drag the Message listener source to the canvas.

  2. In the properties window, click + next to the Connector configuration field to add a global element.

  3. Configure the global element as follows:

    1. In the Bootstrap Server URLs field, select Edit inline and then click the green plus sign.

    2. Enter the value ${config.basic.bootstrapServers} and click Finish.

    3. In the Group ID field, enter ${consumer.groupId}.

    4. In the Topic Subscription Patterns field, select Edit inline and then click the green plus sign.

    5. Enter the value ${topic1} and click Finish.

  4. In the properties window, configure the following fields:

    Field Value

    Display Name

    Name for the connector source.

    Connector configuration

    Global configuration you just created.

    The following image shows an example of the Message listener configuration in the properties window:

    Message listener properties window configuration

Add the Logger Component

Add the Logger component to display the response in the Mule console:

  1. From the Mule Palette view, select Core and drag a Logger component to the right of Message listener.

  2. In the properties window, configure the following fields:

    Field Value

    Display Name

    Name for the logger.

    Message

    'New message arrived: ' payload ", key:" attributes.key ", partition:" attributes.partition ", offset:" ++ attributes.offset

    Level

    INFO (Default)

    The following image shows an example of the Logger configuration in the properties window:

    Logger properties window configuration

XML for This Example

Paste this code into the Studio XML editor to quickly load the flow for this example into your Mule app:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:Kafka="http://www.mulesoft.org/schema/mule/Kafka"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/http
  http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
  http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
	http://www.mulesoft.org/schema/mule/core
	http://www.mulesoft.org/schema/mule/core/current/mule.xsd
	http://www.mulesoft.org/schema/mule/http
	http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
	http://www.mulesoft.org/schema/mule/ee/core
	http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
	http://www.mulesoft.org/schema/mule/kafka
	http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
  <configuration-properties file="mule-app.properties">
	</configuration-properties>
  <http:listener-config name="HTTP_Listener_config"
    doc:name="HTTP Listener config"  >
  <http:listener-connection host="0.0.0.0" port="8081" />
  </http:listener-config>

  <kafka:consumer-config name="Apache_Kafka_Consumer_configuration"
 	  doc:name="Apache Kafka Consumer configuration" >
	  <kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
	  <kafka:bootstrap-servers >
	    <kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
	  </kafka:bootstrap-servers>
	  <kafka:topic-patterns >
	    <kafka:topic-pattern value='${topic1}
	    &lt;kafka:topic-pattern value="topic-1" /&gt;
	    &lt;kafka:topic-pattern value="topic-2" /&gt;
	    &lt;/kafka:topic-patterns&gt;]' />
	  </kafka:topic-patterns>
	  </kafka:consumer-plaintext-connection>
	</kafka:consumer-config>
	<kafka:producer-config name="Apache_Kafka_Producer_configuration"
	  doc:name="Apache Kafka Producer configuration" >
	  <kafka:producer-plaintext-connection >
	  <kafka:bootstrap-servers >
	    <kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
	  </kafka:bootstrap-servers>
	  </kafka:producer-plaintext-connection>
	  </kafka:producer-config>
	<kafka:consumer-config name="Apache_Kafka_Consumer_configuration1" doc:name="Apache Kafka Consumer configuration" >
		<kafka:consumer-plaintext-connection groupId="${consumer.groupId}" >
			<kafka:bootstrap-servers >
				<kafka:bootstrap-server value="${config.basic.bootstrapServers}" />
			</kafka:bootstrap-servers>
			<kafka:topic-patterns >
				<kafka:topic-pattern value="${config.topics}" />
			</kafka:topic-patterns>
		</kafka:consumer-plaintext-connection>
	</kafka:consumer-config>
	<flow name="Producer-Flow" >
		<http:listener doc:name="Push message endpoint" config-ref="HTTP_Listener_config" path="/pushMessage" />
		<logger level="INFO" doc:name="Logger" message="&amp;quot;Message: '&amp;quot; ++ payload.message ++ &amp;quot;' is going to be published to topic: '&amp;quot; ++ payload.topic ++ &amp;quot;'.&amp;quot;" />
		<kafka:publish doc:name="Producer" topic="#[payload.topic]" key="#[now()]" config-ref="Apache_Kafka_Producer_configuration" />
		<set-payload value="Message successfully sent to Apache Kafka topic." doc:name="Push response builder" />
	</flow>
	<flow name="Consumer-Flow" >
		<kafka:message-listener doc:name="Consume message endpoint" config-ref="Apache_Kafka_Consumer_configuration"/>
		<logger level="INFO" doc:name="Logger" message="'New message arrived: ' ++ payload ++ &amp;quot;, key:&amp;quot; ++ attributes.key ++ &amp;quot;, partition:&amp;quot; ++ attributes.partition ++ &amp;quot;, offset:&amp;quot; ++ attributes.offset"/>
	</flow>
</mule>
View on GitHub