Contact Free trial Login

Kafka Examples - Mule 4

Consume a Topic

You can use the Kafka connector to consume messages from a topic and then feed the flow with those messages, or to produce a message into a topic. This example uses the two Kafka source operations, Message Consumer and Publish Message to push a message to Kafka and then retrieve it.

Construct a flow as follows:

  1. Drag the Kafka Message Consumer operation to the Studio canvas.

  2. Click the green plus icon to the right of Connector Configuration.

  3. Set these fields for the Consumer:

    Field Value Description

    Consumer partitions

    ${consumer.topic.partitions}

    The number of partitions to use for the consumer.

    Group id

    ${consumer.groupId}

    A unique string that identifies the consumer group this consumer belongs to.

    Bootstrap servers

    ${config.basic.bootstrapServers}

    Comma-separated host-port pairs used to establish the initial connection to the Kafka cluster.

  4. Add the values for each field in the mule-app.properties file.

  5. Drag Logger to the canvas and set the message to:

    #['New message arrived: ' ++ payload ++ ", key:" ++ attributes.key ++ ", partition:" ++ attributes.partition ++ ", offset:" ++ attributes.offset ]

    The Consumer flow appears in Studio as:

    kafka consumer studio flow
  6. In the next flow, drag HTTP Listener to the canvas, set the Path to /pushMessage, and use the defaults for the configuration.

  7. Drag Logger to the canvas and set the message to:

    #["Message: '" ++ payload.message ++ "' is going to be published to topic: '" ++ payload.topic ++ "'."]
  8. Drag the Kafka Publish Message operation to the Studio canvas, set Topic to the #[payload.topic] value, and set Key to the #[now()] value.

  9. Drag Set Payload to the canvas and set the Value to Message successfully sent to Kafka topic.

    The Producer flow appears in Studio as:

    kafka producer studio flow

XML for Consuming a Topic

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka"
	xmlns:http="http://www.mulesoft.org/schema/mule/http"
	xmlns="http://www.mulesoft.org/schema/mule/core"
	xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
	xmlns:spring="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans
	http://www.springframework.org/schema/beans/spring-beans-current.xsd
	http://www.mulesoft.org/schema/mule/core
	http://www.mulesoft.org/schema/mule/core/current/mule.xsd
	http://www.mulesoft.org/schema/mule/http
	http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
	http://www.mulesoft.org/schema/mule/ee/core
	http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd
	http://www.mulesoft.org/schema/mule/kafka
	http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
    <configuration-properties file="mule-app.properties"></configuration-properties>
    <http:listener-config name="HTTP_Listener_config"
      doc:name="HTTP Listener config"  >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>

    <kafka:kafka-consumer-config name="consumer-basic" doc:name="Consumer Basic" >
    <kafka:basic-kafka-consumer-connection
    consumerPartitions="${consumer.topic.partitions}"
    groupId="${consumer.groupId}"
    bootstrapServers="${config.basic.bootstrapServers}" />
  </kafka:kafka-consumer-config>
  <kafka:kafka-producer-config name="producer-basic" doc:name="Producer Basic" >
    <kafka:basic-kafka-producer-connection
    bootstrapServers="${config.basic.bootstrapServers}" />
  </kafka:kafka-producer-config>

    <flow name="consumer-flow" >
        <kafka:consumer config-ref="consumer-krb-plain"
        topic="${consumer.topic.name}"
        doc:name="Consumer" />
        <logger level="INFO" doc:name="Logger"
        message="#['New message arrived: ' ++ payload ++ &quot;, key:&quot; ++ attributes.key ++ &quot;, partition:&quot; ++ attributes.partition ++ &quot;, offset:&quot; ++ attributes.offset ]"/>
    </flow>
  <flow name="producer-flow" >
        <http:listener config-ref="HTTP_Listener_config"
        path="/pushMessage" doc:name="Push message endpoint" doc:id="DOC_ID" />
        <logger level="INFO" doc:name="Logger" doc:id="DOC_ID"
        message="#[&quot;Message: '&quot; ++ payload.message ++ &quot;' published to topic: '&quot; ++ payload.topic ++ &quot;'.&quot;]" />
        <kafka:producer config-ref="producer-krb-plain" topic="#[payload.topic]"
                        key="#[now()]"
                        doc:name="Producer" >
            <kafka:message ><![CDATA[#[payload.message]]]></kafka:message>
        </kafka:producer>
        <set-payload value="Message successfully sent to Kafka topic."
        doc:name="Push response builder" />
    </flow>
</mule>

See Also

We use cookies to make interactions with our websites and services easy and meaningful, to better understand how they are used and to tailor advertising. You can read more and make your cookie choices here. By continuing to use this site you are giving us your consent to do this.