Nav

Kafka Connector

The Anypoint Connector for Apache Kafka allows you to interact with the Apache Kafka messaging system, enabling seamless integration between your Mule applications and an Apache Kafka cluster, using Mule runtime.

Read this user guide to understand how to set up and configure a basic Mule flow with the Apache Kafka connector.

Note: Track feature additions, compatibility, limitations and API version updates with each release of the connector using the Connector Release Notes. Review the connector operations and functionality using the Technical Reference alongside the Demo Mule Applications Using the Connector.

MuleSoft maintains this connector under the Select support policy.

Prerequisites

This document assumes you are familiar with Mule, Anypoint Connectors, and Anypoint Studio Essentials To increase your familiarity with Studio, consider completing one or more Anypoint Studio Tutorials. Further, this page assumes that you have a basic understanding of Mule Flows and Global Elements.

Hardware and Software Requirements

For hardware and software requirements, please visit Mule Hardware and Software Requirements page.

Compatibility

Application/Service Version

Mule Runtime

3.7.0 and higher

Apache Kafka

0.9.0.0

Install the Kafka Connector

You can install this connector in Anypoint Studio using the instructions in Installing a Connector from Anypoint Exchange.

Configure the Kafka Connector Global Element

To use the Apache Kafka connector in your Mule application, you must first configure a global element for the Kafka connector. We show you how this is done using the Anypoint Studio UI.

Notes:

  • This global element can be used by all the Kafka connectors in the application (read more about global elements).

  • Not all connector instances necessarily use the same global element/configuration. It is not uncommon to have multiple connectors in a flow, using different global elements/configurations to connect to one or different instances.

You must provide connection and other details in the Global Element Properties window - these settings are saved in a global element and referenced by applicable connector instances:

Example uses property placeholders - actual values are stored in .properties file

Configuration

In the image above, the placeholder values refer to values saved in a .properties file, which exists by default in the src/main/resources folder of your project (See Configuring Properties).

For easy maintenance and re-usability of your connector properties, we recommend that you use a .properties file. Avoid hardcoding these in the global element if you need to deploy the app to different environments, such as production, development, and QA, where your access credentials might differ. For more, see Deploying to Multiple Environments.

Global Element Properties for Kafka Connector

Field Description

Name

Enter a name for this connector configuration to be able to reference it later.

Bootstrap Servers

Comma-separated host-port pairs for establishing the initial connection to the Kafka cluster — same as bootstrap.servers you provide to Kafka clients (producer/consumer). If provided with Producer/Consumer Properties files this value is ignored and the one from the Properties file is used.

Consumer Properties File

Path to properties file where you can set the Consumer — similar to what you provide to Kafka command line tools. If you do not specify a value for bootstrap.servers within properties file, the value provided with Bootstrap Servers is going to be used. Also if you do not specify a value for key.serializer and value.serializer they will be set to org.apache.kafka.common.serialization.StringDeserializer.

Producer Properties File

Path to properties file where you can set the producer — similar to what you provide to Kafka command line tools. If you do not specify a value for bootstrap.servers within properties file, the value provided with Bootstrap Servers is going to be used. Also if you do not specify a value for key.serializer and value.serializer they will be set to org.apache.kafka.common.serialization.StringSerializer.

Using the Kafka Connector

A Kafka connector is based on the concept of the endpoint. The global element can be configured either as an:

  • Inbound endpoint for consuming messages from a topic, or

  • Outbound connector for pushing new key/message pairs to a topic.

Connector Namespace and Schema

When designing your application in Studio, the act of dragging the connector from the palette onto the Anypoint Studio canvas should automatically populate the Mule application’s XML code with the connector namespace and schema location.

Namespace:

xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"

Schema Location:

xsi:schemaLocation="http://www.mulesoft.org/schema/mule/apachekafka http://www.mulesoft.org/schema/mule/sfdc-composite/current/mule-apachekafka.xsd"

If you are manually coding the Mule application in Studio’s XML editor or other text editor, define the namespace and schema location in the header of your Configuration XML, inside the <mule> tag.


          
       
1
2
3
4
5
6
7
8
9
10
11
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
      xsi:schemaLocation="
               http://www.mulesoft.org/schema/mule/core
               http://www.mulesoft.org/schema/mule/core/current/mule.xsd
               http://www.mulesoft.org/schema/mule/apachekafka http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd">

      <!-- put your global configuration elements and flows here -->

</mule>

Using the Connector in a Mavenized Mule App

If you are coding a Mavenized Mule application, this XML snippet must be included in your pom.xml file.


          
       
1
2
3
4
5
<dependency>
        <groupId>org.mule.modules</groupId>
  <artifactId>mule-module-kafka</artifactId>
  <version>1.0.0</version>
</dependency>

Inside the <version> tags, put the desired version number, the word RELEASE for the latest release, or SNAPSHOT for the latest available version. The available versions to date are:

  • 1.0.0

  • 1.0.1

  • 1.0.2

Kafka Connector Example Use Cases

The example use case walkthroughs are geared toward Anypoint Studio users. For those writing and configuring the application in XML, jump straight to the example Mule application XML code to Consume Messages or Publish Messages to see how the Kafka global element and the connector are configured in the XML in each use case.

Consume Messages from Kafka Topic

See how to use the connector to consume messages from a topic and log each consumed message to console in the following format: "New message arrived: <message>".

  1. Create a new Mule Project by clicking on File > New > Mule Project.

  2. With your project open, search the Studio palette for the Kafka connector you should have already installed. Drag and drop a new Apache Kafka connector onto the canvas.

    The Kafka Connector is going to be configured to consume messages from a topic in this case.
  3. Drag and drop a Logger after the Apache Kafka element to log incoming messages in the console.

    Unconfigured consumer flow

  4. Double click on the flow’s header and rename it consumer-flow.

    Consumer flow configuration

  5. Double click on the Apache Kafka connector element, and configure its properties as below.

    Field Value

    Display Name

    Kafka consumer

    Consumer Configuration

    "Apache_Kafka__Configuration" (default name of a configuration, or any other configuration that you configured as explained in Configuration section

    Operation

    Consumer

    Topic

    ${consumer.topic}

    Partitions

    ${consumer.topic.partitions}

    Kafka consumer configuration

  6. Select the logger and set its fields like so:

    Consumer logger configuration

  7. Enter your valid Apache Kafka properties in /src/main/app/mule-app.properties and identify them there using property placeholders:

    1. If you configured Kafka global element as explained within the Configure the Kafka Connector Global Element section then provide values for config.bootstrapServers, config.consumerPropertiesFile and config.producerPropertiesFile.

    2. Set consumer.topic to the name of an existing topic that you want to consume messages from.

    3. Set consumer.topic.partitions to the number of partitions that you have set at topic creation for the topic that you want to consume messages from.

  8. Now you should be ready to deploy the app on Studio’s embedded Mule runtime (Run As > Mule Application). When a new message is pushed into the topic you set consumer.topic to, you should see it logged in the console.

Consume Messages from Kafka Topic - XML

Run this Mule application featuring the connector as a consumer using the full XML code that would be generated by the Studio work you did in the previous section:


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka
http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd">
    <apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="${config.bootstrapServers}" consumerPropertiesFile="${config.consumerPropertiesFile}" producerPropertiesFile="${config.producerPropertiesFile}" doc:name="Apache Kafka: Configuration"/>
    <flow name="new-projectFlow">
        <apachekafka:consumer config-ref="Apache_Kafka__Configuration" topic="${consumer.topic}" partitions="${consumer.topic.partitions}" doc:name="Kafka consumer"/>
        <logger message="New message arrived: #[payload]" level="INFO" doc:name="Consumed message logger"/>
    </flow>
</mule>

Publish Messages to Kafka Topic

Use the connector to publish messages to a topic.

  1. Create a new Mule Project by clicking on File > New > Mule Project.

  2. Navigate through the project’s structure and double-click on src/main/app/project-name.xml and follow the steps below:

  3. Drag and drop a new HTTP element onto the canvas. This element is going to be the entry point for the flow and will provide data to be sent to the topic.

  4. Drag and drop a new Apache Kafka element after the HTTP listener.

  5. Drag and drop a new Set Payload element after Apache Kafka. This Set Payload element is going to set the response to the HTTP request.

    Unconfigured producer flow

  6. Double click on flow header (blue line) and change the name of the flow to "producer-flow".

    Producer flow configuration

  7. Select the HTTP element.

  8. Click on the plus sign next to the "Connector Configuration" dropdown.

  9. A pop-up appears, accept the default configurations and click OK.

  10. Set Path to push.

  11. Set Display Name to Push http endpoint.

    Push http configuration

  12. Select the Apache Kafka connector and set its properties as below:

    Display Name Kafka producer

    Consumer Configuration

    "Apache_Kafka__Configuration" (default name of a configuration, or any other configuration that you configured as explained in Configuring the Kafka Connector Global Element section)

    Operation

    Producer

    Topic

    #[payload.topic]

    Key

    #[server.dateTime.getMilliSeconds()]

    Message

    #[payload.message]

  13. For the Set Payload element:

    1. Set Display Name to Set push response

    2. Set Value to Message successfully sent.

      Producer response configuration

  14. Now we have to provide values for placeholders.

  15. Open /src/main/app/mule-app.properties and provide values for following properties:

    1. If you configured the Kafka global element as explained within the Configuration section then provide values for config.bootstrapServers, config.consumerPropertiesFile and config.producerPropertiesFile

  16. Now you can deploy the app. (Run As > Mule Application)

  17. To trigger the flow and push a message to a topic, use an HTTP client app and send a POST request with content-type "application/x-www-form-urlencoded" and body in urlencoded format to localhost:8081/push. The request should contain values for topic and message.

You can use the following CURL command:

curl -X POST -d "topic=<topic-name-to-send-to>" -d "message=<message to push>" localhost:8081/push

You can use the other example app defined in Consume Messages from Kafka Topic example to consume the messages you are producing, and test that everything works.

Publish Messages to Kafka Topic - XML

Run this application featuring the connector as a message publisher using the full XML code that would be generated by the Studio work you did in the previous section:


          
       
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka
http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
    <apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="${config.bootstrapServers}" consumerPropertiesFile="${config.consumerPropertiesFile}" producerPropertiesFile="${config.producerPropertiesFile}" doc:name="Apache Kafka: Configuration"/>
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
    <flow name="producer-flow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/push" doc:name="Push http endpoint"/>
        <apachekafka:producer config-ref="Apache_Kafka__Configuration" topic="#[payload.topic]" key="#[server.dateTime.getMilliSeconds()]" message="#[payload.message]" doc:name="Apache Kafka"/>
        <set-payload value="Message successfully sent." doc:name="Set push response"/>
    </flow>
</mule>