Contact Us 1-800-596-4880

Kafka Connector

The Anypoint Connector for Apache Kafka allows you to interact with the Apache Kafka messaging system, enabling seamless integration between your Mule applications and an Apache Kafka cluster, using Mule runtime.

Read this user guide to understand how to set up and configure a basic Mule flow with the Apache Kafka connector.

Note: Track feature additions, compatibility, limitations and API version updates with each release of the connector using the Connector Release Notes. Review the connector operations and functionality using the Technical Reference and in Anypoint Exchange.

MuleSoft maintains this connector under the Select support policy.

Before You Begin

This document assumes you are familiar with Mule, Anypoint Connectors, and Anypoint Studio Essentials To increase your familiarity with Studio, consider completing one or more Anypoint Studio Tutorials. Further, this page assumes that you have a basic understanding of Mule Flows and Global Elements.

Hardware and Software Requirements

For hardware and software requirements, see Mule Hardware and Software Requirements page.

Compatibility

Application/Service Version

Mule Runtime

3.7.0 and later

Apache Kafka

0.10.2.0

Install the Kafka Connector

You can install this connector in Anypoint Studio using the instructions in To Import an Asset from Exchange.

Configure the Kafka Connector Global Element

To use the Apache Kafka connector in your Mule application, you must first configure a global element for the Kafka connector. We show you how this is done using the Anypoint Studio UI.

Notes:

  • This global element can be used by all the Kafka connectors in the application (read more about global elements).

  • Not all connector instances necessarily use the same global element/configuration. It is not uncommon to have multiple connectors in a flow, using different global elements/configurations to connect to one or different instances.

You must provide connection and other details in the Global Element Properties window - these settings are saved in a global element and referenced by applicable connector instances:

Configuration
Figure 1. Example uses property placeholders - actual values are stored in .properties file

In the image above, the placeholder values refer to values saved in a .properties file, which exists by default in the src/main/resources folder of your project (See Configuring Properties).

For easy maintenance and re-usability of your connector properties, we recommend that you use a .properties file. Avoid hardcoding these in the global element if you need to deploy the app to different environments, such as production, development, and QA, where your access credentials might differ. For more, see Deploying to Multiple Environments.

Global Element Properties for Kafka Connector

Field Description

Name

Enter a name for this connector configuration to be able to reference it later.

Bootstrap Servers

Comma-separated host-port pairs for establishing the initial connection to the Kafka cluster — same as bootstrap.servers you provide to Kafka clients (producer/consumer). If provided with Producer/Consumer Properties files this value is ignored and the one from the Properties file is used.

Consumer Properties File

Path to properties file where you can set the Consumer — similar to what you provide to Kafka command line tools. If you do not specify a value for bootstrap.servers within properties file, the value provided with Bootstrap Servers is going to be used. Also if you do not specify a value for key.serializer and value.serializer they will be set to org.apache.kafka.common.serialization.StringDeserializer.

Producer Properties File

Path to properties file where you can set the producer — similar to what you provide to Kafka command line tools. If you do not specify a value for bootstrap.servers within properties file, the value provided with Bootstrap Servers is going to be used. Also if you do not specify a value for key.serializer and value.serializer they will be set to org.apache.kafka.common.serialization.StringSerializer.

Using the Kafka Connector

A Kafka connector is based on the concept of the endpoint. The global element can be configured either as an:

  • Inbound endpoint for consuming messages from a topic, or

  • Outbound connector for pushing new key/message pairs to a topic.

Connector Namespace and Schema

When designing your application in Studio, the act of dragging the connector from the palette onto the Anypoint Studio canvas should automatically populate the Mule application’s XML code with the connector namespace and schema location.

Namespace:

xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"

Schema Location:

xsi:schemaLocation="http://www.mulesoft.org/schema/mule/apachekafka http://www.mulesoft.org/schema/mule/sfdc-composite/current/mule-apachekafka.xsd"

If you are manually coding the Mule application in Studio’s XML editor or other text editor, define the namespace and schema location in the header of your Configuration XML, inside the <mule> tag.

<mule xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
      xsi:schemaLocation="
               http://www.mulesoft.org/schema/mule/core
               http://www.mulesoft.org/schema/mule/core/current/mule.xsd
               http://www.mulesoft.org/schema/mule/apachekafka http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd">

      <!-- put your global configuration elements and flows here -->

</mule>

Using the Connector in a Mavenized Mule App

If you are coding a Mavenized Mule application, this XML snippet must be included in your pom.xml file.

<dependency>
  <groupId>org.mule.modules</groupId>
  <artifactId>mule-module-kafka</artifactId>
  <version>2.0.1</version>
</dependency>

Inside the <version> tags, put the desired version number. The available versions to date are:

  • 2.0.1

  • 2.0.0

  • 1.0.2

  • 1.0.1

  • 1.0.0

Kafka Connector Example Use Cases

The example use case walkthroughs are geared toward Anypoint Studio users. For those writing and configuring the application in XML, jump straight to the example Mule application XML code to Consume Messages or Publish Messages to see how the Kafka global element and the connector are configured in the XML in each use case.

Consume Messages from Kafka Topic

See how to use the connector to consume messages from a topic and log each consumed message to console in the following format: "New message arrived: <message>".

  1. Create a new Mule Project by clicking on File > New > Mule Project.

  2. With your project open, search the Studio palette for the Kafka connector you should have already installed. Drag and drop a new Apache Kafka connector onto the canvas.

    The Kafka Connector is going to be configured to consume messages from a topic in this case.
  3. Drag and drop a Logger after the Apache Kafka element to log incoming messages in the console.

    Unconfigured consumer flow
  4. Double click the flow’s header and rename it consumer-flow.

    Consumer flow configuration
  5. Double click the Apache Kafka connector element, and configure its properties as below.

    Field Value

    Display Name

    Kafka consumer

    Consumer Configuration

    "Apache_Kafka__Configuration" (default name of a configuration, or any other configuration that you configured as explained in Configuration section

    Operation

    Consumer

    Topic

    ${consumer.topic}

    Number of Partitions

    ${consumer.topic.partitions}

    Partition Offsets MEL

    #[["0":"1","1":"2"]]

    • You can use the MEL expression to pass an offset or offsets to Kafka to re-retrieve messages from the specified offset or offsets. For example, #[["0":"1","1":"2"]] means resetting the offset of partition 0 to 1 and the offset of partition 1 to 2

      Kafka consumer configuration
  6. Select the logger and set its fields like so:

    Consumer logger configuration
  7. Enter your valid Apache Kafka properties in /src/main/app/mule-app.properties and identify them there using property placeholders:

    1. If you configured Kafka global element as explained within the Configure the Kafka Connector Global Element section then provide values for config.bootstrapServers, config.consumerPropertiesFile and config.producerPropertiesFile.

    2. Set consumer.topic to the name of an existing topic that you want to consume messages from.

    3. Set consumer.topic.partitions to the number of partitions that you have set at topic creation for the topic that you want to consume messages from.

  8. Now you should be ready to deploy the app on Studio’s embedded Mule runtime (Run As > Mule Application). When a new message is pushed into the topic you set consumer.topic to, you should see it logged in the console.

Consume Messages from Kafka Topic - XML

Run this Mule application featuring the connector as a consumer using the full XML code that would be generated by the Studio work you did in the previous section:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka
http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd">
    <apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="${config.bootstrapServers}" consumerPropertiesFile="${config.consumerPropertiesFile}" producerPropertiesFile="${config.producerPropertiesFile}" doc:name="Apache Kafka: Configuration"/>
    <flow name="new-projectFlow">
        <apachekafka:consumer config-ref="Apache_Kafka__Configuration" topic="${consumer.topic}" numberOfPartitions="${consumer.topic.partitions}" partitionOffsetsMEL="#[[&quot;0&quot;:&quot;1&quot;,&quot;1&quot;:&quot;2&quot;]]" doc:name="Kafka consumer"/>
        <logger message="New message arrived: #[payload]" level="INFO" doc:name="Consumed message logger"/>
    </flow>
</mule>

Publish Messages to Kafka Topic

Use the connector to publish messages to a topic.

  1. Create a new Mule Project by clicking on File > New > Mule Project.

  2. Navigate through the project’s structure and double-click on src/main/app/project-name.xml and follow the steps below:

  3. Drag and drop a new HTTP element onto the canvas. This element is going to be the entry point for the flow and will provide data to be sent to the topic.

  4. Drag and drop a new Apache Kafka element after the HTTP listener.

  5. Drag and drop a new Set Payload element after Apache Kafka. This Set Payload element is going to set the response to the HTTP request.

    Unconfigured producer flow
  6. Double click the flow header (blue line) and change the name of the flow to "producer-flow".

    Producer flow configuration
  7. Select the HTTP element.

  8. Click the plus sign next to the "Connector Configuration" dropdown.

  9. A pop-up appears, accept the default configurations and click OK.

  10. Set Path to push.

  11. Set Display Name to Push http endpoint.

    Push http configuration
  12. Select the Apache Kafka connector and set its properties as below:

    Display Name Kafka producer

    Consumer Configuration

    "Apache_Kafka__Configuration" (default name of a configuration, or any other configuration that you configured as explained in Configuring the Kafka Connector Global Element section)

    Operation

    Producer

    Topic

    #[payload.topic]

    Key

    #[server.dateTime.getMilliSeconds()]

    Message

    #[payload.message]

  13. For the Set Payload element:

    1. Set Display Name to Set push response

    2. Set Value to Message successfully sent.

      Producer response configuration
  14. Now we have to provide values for placeholders.

  15. Open /src/main/app/mule-app.properties and provide values for following properties:

    1. If you configured the Kafka global element as explained within the Configuration section then provide values for config.bootstrapServers, config.consumerPropertiesFile and config.producerPropertiesFile

  16. Now you can deploy the app. (Run As > Mule Application)

  17. To trigger the flow and push a message to a topic, use an HTTP client app and send a POST request with content-type "application/x-www-form-urlencoded" and body in urlencoded format to localhost:8081/push. The request should contain values for topic and message.

You can use the following CURL command:

curl -X POST -d "topic=<topic-name-to-send-to>" -d "message=<message to push>" localhost:8081/push

You can use the other example app defined in Consume Messages from Kafka Topic example to consume the messages you are producing, and test that everything works.

Publish Messages to Kafka Topic - XML

Run this application featuring the connector as a message publisher using the full XML code that would be generated by the Studio work you did in the previous section:

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka
http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
    <apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="${config.bootstrapServers}" consumerPropertiesFile="${config.consumerPropertiesFile}" producerPropertiesFile="${config.producerPropertiesFile}" doc:name="Apache Kafka: Configuration"/>
    <http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
    <flow name="producer-flow">
        <http:listener config-ref="HTTP_Listener_Configuration" path="/push" doc:name="Push http endpoint"/>
        <apachekafka:producer config-ref="Apache_Kafka__Configuration" topic="#[payload.topic]" key="#[server.dateTime.getMilliSeconds()]" message="#[payload.message]" doc:name="Apache Kafka"/>
        <set-payload value="Message successfully sent." doc:name="Set push response"/>
    </flow>
</mule>

To Configure Kafka to Use Kerberos

  1. Download and install Kerberos KDC and Zookeeper.

    After installing, ensure you have the following principals zookeeper/localhost@LOCALHOST and kafka/localhost@LOCALHOST. This is an example for localhost and realm LOCALHOST depending on your KDC it might differ in the last part for localhost@LOCALHOST. You need to have the associated keytab files saved so that you can they can be accessed by the processes that are started for Zookeeper and Kafka.

  2. Start Kafka server. This assumes you have downloaded Kafka server and KAFKA_HOME represents home directory for that server.

  3. Create the zookeeper_server_jaas.conf file under KAFKA_HOME/config with the following content:

    Server {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      useTicketCache=true
      storeKey=true
      debug=true
      keyTab=PATH_TO_ZOOKEEPER_KEYTAB/zookeeper.keytab"
      principal="zookeeper/localhost@LOCALHOST";
    };

    Replace PATH_TO_ZOOKEEPER_KEYTAB with the correct folder path above and in the code blocks that follow.

    In the default configuration it is very important to use Server as an identifier for your configuration.

    Important: Keep in mind that if you are going to use the Kafka Producer with the Reuse Producer option, you should configure the useTicketCache=false. Otherwise the connector needs a kinit module installed on the machine your application runs on to refresh the Kerberos tickets when they expire.

  4. Create the kafka_server_jaas.conf file under KAFKA_HOME/config with the following content:

    KafkaServer {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      storeKey=true
      debug=true
      keyTab="PATH_TO_ZOOKEEPER_KEYTAB/kafka.keytab"
      principal="kafka/localhost@LOCALHOST";
    };
    
    // Zookeeper client authentication
    Client {
        com.sun.security.auth.module.Krb5LoginModule required
        useKeyTab=true
        storeKey=true
        debug=true
        keyTab=”PATH_TO_ZOOKEEPER_KEYTAB/kafka.keytab"
        principal="kafka/localhost@LOCALHOST";
    };

    In the default configuration it is very important to use KafkaServer and Client as identifiers for your configurations. KafkaServer is used to authenticate Kafka clients and Client is used to self authenticate against Zookeeper.

  5. Add these two properties to zookeeper.properties under KAFKA_HOME/config:

    authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
    requireClientAuthScheme=sasl

    These are enabled in Kerberos authentication of the Kafka broker against the Zookeeper server.

  6. Add the following properties to server.properties under KAFKA_HOME/config:

    listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://localhost:9093
    sasl.enabled.mechanisms=GSSAPI
    sasl.kerberos.service.name=kafka

    These tell the kafka broker to create one channel on port 9093 that requires Kerberos authentication.

  7. Open a new terminal and change directory to KAFKA_HOME/bin.

  8. To start Zookeeper you have to set an environment variable KAFKA_OPTS with the following value:

    -Djava.security.krb5.conf=<path_to_krb_config>/krb5.conf -Djava.security.auth.login.config=../config/kafka_server_jaas.conf

    For example:

    export KAFKA_OPTS="-Djava.security.krb5.conf=../config/krb5.conf -Djava.security.auth.login.config=../config/kafka_server_jaas.conf”

    The krb5.conf file contains Kerberos configuration information, including the locations of KDCs and admin servers for the Kerberos realms of interest. Under Linux you can usually find it under /etc/krb5.conf.

  9. Start zookeeper by running ./zookeeper-server-start(.sh/bat) ../config/zookeeper.properties.

  10. Open a new terminal and change directory to KAFKA_HOME/bin.

  11. Start Kafka broker by running:

    ./kafka-server-start(.sh/bat) ../config/server.properties

    You should see no errors in the console.

  12. Configure the connector. To connect to Kafka from within the connector, set the bootstrap servers to point to localhost:9093 and put the following properties in consumer.properties and producer.properties along with other properties that you usually put in those files.

    security.protocol=SASL_PLAINTEXT
    sasl.mechanism=GSSAPI
    sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
          useKeyTab=true \
          storeKey=true  \
          debug=true \
          keyTab="PATH_TO_ZOOKEEPER_KEYTAB/kafka.keytab" \
          principal="kafka/localhost@LOCALHOST";
    sasl.kerberos.service.name=kafka
View on GitHub