Kafka Connector
The Anypoint Connector for Apache Kafka allows you to interact with the Apache Kafka messaging system, enabling seamless integration between your Mule applications and an Apache Kafka cluster, using Mule runtime.
Read this user guide to understand how to set up and configure a basic Mule flow with the Apache Kafka connector.
Note: Track feature additions, compatibility, limitations and API version updates with each release of the connector using the Connector Release Notes. Review the connector operations and functionality using the Technical Reference and in Anypoint Exchange.
MuleSoft maintains this connector under the Select support policy.
Before You Begin
This document assumes you are familiar with Mule, Anypoint Connectors, and Anypoint Studio Essentials To increase your familiarity with Studio, consider completing one or more Anypoint Studio Tutorials. Further, this page assumes that you have a basic understanding of Mule Flows and Global Elements.
Hardware and Software Requirements
For hardware and software requirements, see Mule Hardware and Software Requirements page.
Install the Kafka Connector
You can install this connector in Anypoint Studio using the instructions in To Import an Asset from Exchange.
Configure the Kafka Connector Global Element
To use the Apache Kafka connector in your Mule application, you must first configure a global element for the Kafka connector. We show you how this is done using the Anypoint Studio UI.
Notes:
-
This global element can be used by all the Kafka connectors in the application (read more about global elements).
-
Not all connector instances necessarily use the same global element/configuration. It is not uncommon to have multiple connectors in a flow, using different global elements/configurations to connect to one or different instances.
You must provide connection and other details in the Global Element Properties window - these settings are saved in a global element and referenced by applicable connector instances:
.properties
fileIn the image above, the placeholder values refer to values saved in a .properties
file, which exists by default in the src/main/resources
folder of your project (See Configuring Properties).
For easy maintenance and re-usability of your connector properties, we recommend that you use a .properties
file. Avoid hardcoding these in the global element if you need to deploy the app to different environments, such as production, development, and QA, where your access credentials might differ. For more, see Deploying to Multiple Environments.
Global Element Properties for Kafka Connector
Field | Description |
---|---|
Name |
Enter a name for this connector configuration to be able to reference it later. |
Bootstrap Servers |
Comma-separated host-port pairs for establishing the initial connection to the Kafka cluster — same as |
Consumer Properties File |
Path to properties file where you can set the Consumer — similar to what you provide to Kafka command line tools. If you do not specify a value for |
Producer Properties File |
Path to properties file where you can set the producer — similar to what you provide to Kafka command line tools. If you do not specify a value for |
Using the Kafka Connector
A Kafka connector is based on the concept of the endpoint. The global element can be configured either as an:
-
Inbound endpoint for consuming messages from a topic, or
-
Outbound connector for pushing new key/message pairs to a topic.
Connector Namespace and Schema
When designing your application in Studio, the act of dragging the connector from the palette onto the Anypoint Studio canvas should automatically populate the Mule application’s XML code with the connector namespace and schema location.
Namespace:
xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
Schema Location:
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/apachekafka http://www.mulesoft.org/schema/mule/sfdc-composite/current/mule-apachekafka.xsd"
If you are manually coding the Mule application in Studio’s XML editor or other text editor, define the namespace and schema location in the header of your Configuration XML, inside the <mule>
tag.
<mule xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd">
<!-- put your global configuration elements and flows here -->
</mule>
Using the Connector in a Mavenized Mule App
If you are coding a Mavenized Mule application, this XML snippet must be included in your pom.xml
file.
<dependency>
<groupId>org.mule.modules</groupId>
<artifactId>mule-module-kafka</artifactId>
<version>2.0.1</version>
</dependency>
Inside the <version>
tags, put the desired version number. The available versions to date are:
-
2.0.1
-
2.0.0
-
1.0.2
-
1.0.1
-
1.0.0
Kafka Connector Example Use Cases
The example use case walkthroughs are geared toward Anypoint Studio users. For those writing and configuring the application in XML, jump straight to the example Mule application XML code to Consume Messages or Publish Messages to see how the Kafka global element and the connector are configured in the XML in each use case.
Consume Messages from Kafka Topic
See how to use the connector to consume messages from a topic and log each consumed message to console in the following format: "New message arrived: <message>".
-
Create a new Mule Project by clicking on File > New > Mule Project.
-
With your project open, search the Studio palette for the Kafka connector you should have already installed. Drag and drop a new Apache Kafka connector onto the canvas.
The Kafka Connector is going to be configured to consume messages from a topic in this case. -
Drag and drop a Logger after the Apache Kafka element to log incoming messages in the console.
-
Double click the flow’s header and rename it
consumer-flow
. -
Double click the Apache Kafka connector element, and configure its properties as below.
Field Value Display Name
Kafka consumer
Consumer Configuration
"Apache_Kafka__Configuration" (default name of a configuration, or any other configuration that you configured as explained in Configuration section
Operation
Consumer
Topic
${consumer.topic}
Number of Partitions
${consumer.topic.partitions}
Partition Offsets MEL
#[["0":"1","1":"2"]]
-
You can use the MEL expression to pass an offset or offsets to Kafka to re-retrieve messages from the specified offset or offsets. For example, #[["0":"1","1":"2"]] means resetting the offset of partition 0 to 1 and the offset of partition 1 to 2
-
-
Select the logger and set its fields like so:
-
Enter your valid Apache Kafka properties in
/src/main/app/mule-app.properties
and identify them there using property placeholders:-
If you configured Kafka global element as explained within the Configure the Kafka Connector Global Element section then provide values for
config.bootstrapServers
,config.consumerPropertiesFile
andconfig.producerPropertiesFile
. -
Set
consumer.topic
to the name of an existing topic that you want to consume messages from. -
Set
consumer.topic.partitions
to the number of partitions that you have set at topic creation for the topic that you want to consume messages from.
-
-
Now you should be ready to deploy the app on Studio’s embedded Mule runtime (Run As > Mule Application). When a new message is pushed into the topic you set
consumer.topic
to, you should see it logged in the console.
Consume Messages from Kafka Topic - XML
Run this Mule application featuring the connector as a consumer using the full XML code that would be generated by the Studio work you did in the previous section:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka
http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd">
<apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="${config.bootstrapServers}" consumerPropertiesFile="${config.consumerPropertiesFile}" producerPropertiesFile="${config.producerPropertiesFile}" doc:name="Apache Kafka: Configuration"/>
<flow name="new-projectFlow">
<apachekafka:consumer config-ref="Apache_Kafka__Configuration" topic="${consumer.topic}" numberOfPartitions="${consumer.topic.partitions}" partitionOffsetsMEL="#[["0":"1","1":"2"]]" doc:name="Kafka consumer"/>
<logger message="New message arrived: #[payload]" level="INFO" doc:name="Consumed message logger"/>
</flow>
</mule>
Publish Messages to Kafka Topic
Use the connector to publish messages to a topic.
-
Create a new Mule Project by clicking on File > New > Mule Project.
-
Navigate through the project’s structure and double-click on
src/main/app/project-name.xml
and follow the steps below: -
Drag and drop a new HTTP element onto the canvas. This element is going to be the entry point for the flow and will provide data to be sent to the topic.
-
Drag and drop a new Apache Kafka element after the HTTP listener.
-
Drag and drop a new Set Payload element after Apache Kafka. This Set Payload element is going to set the response to the HTTP request.
-
Double click the flow header (blue line) and change the name of the flow to "producer-flow".
-
Select the HTTP element.
-
Click the plus sign next to the "Connector Configuration" dropdown.
-
A pop-up appears, accept the default configurations and click OK.
-
Set Path to
push
. -
Set Display Name to
Push http endpoint
. -
Select the Apache Kafka connector and set its properties as below:
Display Name Kafka producer Consumer Configuration
"Apache_Kafka__Configuration" (default name of a configuration, or any other configuration that you configured as explained in Configuring the Kafka Connector Global Element section)
Operation
Producer
Topic
#[payload.topic]
Key
#[server.dateTime.getMilliSeconds()]
Message
#[payload.message]
-
For the Set Payload element:
-
Set Display Name to
Set push response
-
Set Value to
Message successfully sent.
-
-
Now we have to provide values for placeholders.
-
Open
/src/main/app/mule-app.properties
and provide values for following properties:-
If you configured the Kafka global element as explained within the Configuration section then provide values for
config.bootstrapServers
,config.consumerPropertiesFile
andconfig.producerPropertiesFile
-
-
Now you can deploy the app. (Run As > Mule Application)
-
To trigger the flow and push a message to a topic, use an HTTP client app and send a POST request with content-type "application/x-www-form-urlencoded" and body in urlencoded format to
localhost:8081/push
. The request should contain values for topic and message.
You can use the following CURL command:
curl -X POST -d "topic=<topic-name-to-send-to>" -d "message=<message to push>" localhost:8081/push
You can use the other example app defined in Consume Messages from Kafka Topic example to consume the messages you are producing, and test that everything works.
Publish Messages to Kafka Topic - XML
Run this application featuring the connector as a message publisher using the full XML code that would be generated by the Studio work you did in the previous section:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http" xmlns:apachekafka="http://www.mulesoft.org/schema/mule/apachekafka"
xmlns="http://www.mulesoft.org/schema/mule/core"
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:spring="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-current.xsd
http://www.mulesoft.org/schema/mule/core
http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/apachekafka
http://www.mulesoft.org/schema/mule/apachekafka/current/mule-apachekafka.xsd
http://www.mulesoft.org/schema/mule/http
http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd">
<apachekafka:config name="Apache_Kafka__Configuration" bootstrapServers="${config.bootstrapServers}" consumerPropertiesFile="${config.consumerPropertiesFile}" producerPropertiesFile="${config.producerPropertiesFile}" doc:name="Apache Kafka: Configuration"/>
<http:listener-config name="HTTP_Listener_Configuration" host="0.0.0.0" port="8081" doc:name="HTTP Listener Configuration"/>
<flow name="producer-flow">
<http:listener config-ref="HTTP_Listener_Configuration" path="/push" doc:name="Push http endpoint"/>
<apachekafka:producer config-ref="Apache_Kafka__Configuration" topic="#[payload.topic]" key="#[server.dateTime.getMilliSeconds()]" message="#[payload.message]" doc:name="Apache Kafka"/>
<set-payload value="Message successfully sent." doc:name="Set push response"/>
</flow>
</mule>
To Configure Kafka to Use Kerberos
-
Download and install Kerberos KDC and Zookeeper.
After installing, ensure you have the following principals
zookeeper/localhost@LOCALHOST
andkafka/localhost@LOCALHOST
. This is an example for localhost and realm LOCALHOST depending on your KDC it might differ in the last part forlocalhost@LOCALHOST
. You need to have the associated keytab files saved so that you can they can be accessed by the processes that are started for Zookeeper and Kafka. -
Start Kafka server. This assumes you have downloaded Kafka server and KAFKA_HOME represents home directory for that server.
-
Create the zookeeper_server_jaas.conf file under KAFKA_HOME/config with the following content:
Server { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true useTicketCache=true storeKey=true debug=true keyTab=PATH_TO_ZOOKEEPER_KEYTAB/zookeeper.keytab" principal="zookeeper/localhost@LOCALHOST"; };
Replace PATH_TO_ZOOKEEPER_KEYTAB with the correct folder path above and in the code blocks that follow.
In the default configuration it is very important to use
Server
as an identifier for your configuration.Important: Keep in mind that if you are going to use the Kafka Producer with the
Reuse Producer
option, you should configure theuseTicketCache=false
. Otherwise the connector needs akinit
module installed on the machine your application runs on to refresh the Kerberos tickets when they expire. -
Create the kafka_server_jaas.conf file under KAFKA_HOME/config with the following content:
KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab="PATH_TO_ZOOKEEPER_KEYTAB/kafka.keytab" principal="kafka/localhost@LOCALHOST"; }; // Zookeeper client authentication Client { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true debug=true keyTab=”PATH_TO_ZOOKEEPER_KEYTAB/kafka.keytab" principal="kafka/localhost@LOCALHOST"; };
In the default configuration it is very important to use
KafkaServer
andClient
as identifiers for your configurations.KafkaServer
is used to authenticate Kafka clients andClient
is used to self authenticate against Zookeeper. -
Add these two properties to
zookeeper.properties
underKAFKA_HOME/config
:authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider requireClientAuthScheme=sasl
These are enabled in Kerberos authentication of the Kafka broker against the Zookeeper server.
-
Add the following properties to
server.properties
underKAFKA_HOME/config
:listeners=PLAINTEXT://:9092,SASL_PLAINTEXT://localhost:9093 sasl.enabled.mechanisms=GSSAPI sasl.kerberos.service.name=kafka
These tell the kafka broker to create one channel on port 9093 that requires Kerberos authentication.
-
Open a new terminal and change directory to
KAFKA_HOME/bin
. -
To start Zookeeper you have to set an environment variable KAFKA_OPTS with the following value:
-Djava.security.krb5.conf=<path_to_krb_config>/krb5.conf -Djava.security.auth.login.config=../config/kafka_server_jaas.conf
For example:
export KAFKA_OPTS="-Djava.security.krb5.conf=../config/krb5.conf -Djava.security.auth.login.config=../config/kafka_server_jaas.conf”
The krb5.conf file contains Kerberos configuration information, including the locations of KDCs and admin servers for the Kerberos realms of interest. Under Linux you can usually find it under /etc/krb5.conf.
-
Start zookeeper by running
./zookeeper-server-start(.sh/bat) ../config/zookeeper.properties
. -
Open a new terminal and change directory to KAFKA_HOME/bin.
-
Start Kafka broker by running:
./kafka-server-start(.sh/bat) ../config/server.properties
You should see no errors in the console.
-
Configure the connector. To connect to Kafka from within the connector, set the bootstrap servers to point to localhost:9093 and put the following properties in consumer.properties and producer.properties along with other properties that you usually put in those files.
security.protocol=SASL_PLAINTEXT sasl.mechanism=GSSAPI sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \ useKeyTab=true \ storeKey=true \ debug=true \ keyTab="PATH_TO_ZOOKEEPER_KEYTAB/kafka.keytab" \ principal="kafka/localhost@LOCALHOST"; sasl.kerberos.service.name=kafka