Nav

Kafka Connector

Select

The Anypoint Connector for Apache Kafka allows you to interact with the Apache Kafka messaging system, enabling seamless integration between your Mule applications and an Apache Kafka cluster, using Mule runtime.

Read through this user guide to understand how to set up and configure a basic Mule flow with the Apache Kafka connector.

Prerequisites

This document assumes that you are familiar with Mule, Anypoint Connectors, Anypoint Studio, Mule concepts, elements in a Mule flow, and Global Elements.

You need login credentials to test your connection to your target resource.

For hardware and software requirements and compatibility information, see the Connector Release Notes.

To use this connector with Maven, view the pom.xml dependency information in the Dependency Snippets in Anypoint Exchange.

What’s New in this Connector

  • Connection configuration - You can choose from the following types of connections for both the consumer and producer:

    • Basic - Gives you the flexibility of settings whatever property you need from properties accepted by Kafka.

    • SSL - Use this configuration when one or more brokers requires an SSL connection.

    • Kerberos - Use this configuration when one or more brokers requires a Kerberos with plaintext connection.

    • Kerberos SSL - Use this configuration when one or more brokers requires a Kerberos with SSL connection.

  • Consumer trigger - Use for Partition offsets where you have to provide a list of Offsets instead of a MEL that evaluates to a map.

  • Consumer trigger - The message produced has the following attributes associated: topic, key, partition, offset.

To Connect in Design Center

  1. In Design Center, click Set Up > Upload, browse for and select the driver for this connector on your file system, and upload it. Alternatively, search for and select a driver that is already uploaded.

  2. Click a trigger. You can create a global element by selecting this connector as they trigger. If a global element is not needed, you can use an HTTP Listener or Scheduler trigger.

  3. To create an optional global element for the connector, set these fields:

    1. Basic:

      • Bootstap Servers - Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. This is the same as the bootstrap.servers value you must provide to Kafka clients (producer/consumer).

      • Additional properties - Additional properties as key and value that you need for your connection. Here you can put whatever property Kafka supports.

        Basic DC Configuration

    2. SSL:

      • All the parameters from Basic configuration.

      • Key Store Type - The file format of the key store file. This is optional and default value is "JKS".

      • Key Store Password - The store password for the key store file. This is optional and only needed if "keyStoreLocation" is configured.

      • Key Store Location - The location of the key store file. This is optional and can be used for two-way authentication for connector.

      • Trust Store Type - The file format of the trust store file.

      • Trust Store Password - The password for the trust store file.

      • Trust Store Location - The location of the trust store file.

        SSL DC Configuration

    3. Kerberos:

      • All the parameters from Basic configuration.

      • Principal - Kerberos principal

      • Keytab - Path to keytab file associated with "principal".

      • Service Name - The Kerberos principal name that Kafka Broker runs as.

      • Additional JAAS Properties - Additional parperties as key→value that you need to set on "sasl.jaas.config" and that you usually include in JAAS configuration file.

        Kerberos DC Configuration

    4. Kerberos SSL:

      • All the parameters from Basic configuration.

      • All the parameters from SSL configuration.

      • All the parameters from Kerberos configuration.

        Kerberos SSL DC Configuration

        When configuring a Consumer (trigger) for all the configurations, fill in:

        Consumer Partitions - The number of partitions to be used for the consumer and:
        Group Id - A unique string that identifies the consumer group this consumer belongs to.

  4. Select the plus sign to add a component.

  5. Select the connector as a component.

  6. Configure these fields:

    1. Consumer trigger:

      • Topic - name of Kafka topic to consume messages from.

      • Partition offsets(Optional) - list of Offset representing partitions offsets configuration. For each element in the list you have to specify partition index and offset.

        Consumer on DC

    2. Producer operation:

      • Topic - Topic to send the message to

      • Key - Key belonging to the message that is going to be sent

      • Message - Message to be sent

        Producer on DC

Connect in Anypoint Studio 7

You can use this connector in Anypoint Studio by first downloading it from Exchange and configuring it as needed.

Install Connector in Studio

  1. In Anypoint Studio, click the Exchange icon in the Studio taskbar.

  2. Click Login in Anypoint Exchange.

  3. Search for this connector and click Install.

  4. Follow the prompts to install this connector.

When Studio has an update, a message displays in the lower right corner, which you can click to install the update.

Configure in Studio

  1. Drag and drop the connector to the Studio Canvas.

  2. To create a global element for the connector, set these fields:

    1. Basic:

      • Bootstap Servers - Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. This is the same as the "bootstrap.servers" value you must provide to Kafka clients (producer/consumer).

      • Additional properties - Additional properties as key and value that you need for your connection. Here you can put whatever property Kafka supports.

        Basic Configuration

    2. SSL:

      • All the parameters from Basic configuration.

      • Key Store Type - The file format of the key store file. This is optional and default value is "JKS".

      • Key Store Password - The store password for the key store file. This is optional and only needed if "keyStoreLocation" is configured.

      • Key Store Location - The location of the key store file. This is optional and can be used for two-way authentication for connector.

      • Trust Store Type - The file format of the trust store file.

      • Trust Store Password - The password for the trust store file.

      • Trust Store Location - The location of the trust store file.

        SSL Configuration

    3. Kerberos:

      • All the parameters from Basic configuration.

      • Principal - Kerberos principal

      • Keytab - Path to keytab file associated with "principal".

      • Service Name - The Kerberos principal name that Kafka Broker runs as.

      • Additional JAAS Properties - Additional parperties as key→value that you need to set on "sasl.jaas.config" and that you usually include in JAAS configuration file.

        Kerberos Configuration

    4. Kerberos SSL:

      • All the parameters from Basic configuration.

      • All the parameters from SSL configuration.

      • All the parameters from Kerberos configuration.

        Kerberos SSL Configuration

  3. Based on the operation that you have dragged on the canvas, configure the following fields:

    1. Consumer trigger:

      • Topic - name of Kafka topic to consume messages from.

      • Partition offsets(Optional) - list of Offset representing partitions offsets configuration. For each element in the list you have to specify partition index and offset.

        Consumer Studio Configuration

    2. Producer operation:

      • Topic - Topic to send the message to

      • Key - Key belonging to the message that is going to be sent

      • Message - Message to be sent

        Producer Studio Configuration

Use Case: Studio

Kafka connector can be used for either consuming messages from a partitcular topic and feed the flow with those messages or produce a message into a topic. In studio this is how it looks like:

Consumer:

Consumer on canvas

Producer:

Producer on canvas

Use Case: XML

<?xml version="1.0" encoding="UTF-8"?>

<mule xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns:kafka="http://www.mulesoft.org/schema/mule/kafka"
      xmlns:http="http://www.mulesoft.org/schema/mule/http"
      xmlns="http://www.mulesoft.org/schema/mule/core"
      xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
      xmlns:spring="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-current.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/kafka http://www.mulesoft.org/schema/mule/kafka/current/mule-kafka.xsd">
    <configuration-properties file="mule-app.properties"></configuration-properties>
    <http:listener-config name="HTTP_Listener_config" doc:name="HTTP Listener config" doc:id="DOC_ID" >
        <http:listener-connection host="0.0.0.0" port="8081" />
    </http:listener-config>

    <kafka:kafka-consumer-config name="consumer-basic" doc:name="Consumer Basic" doc:id="DOC_ID" >
    <kafka:basic-kafka-consumer-connection consumerPartitions="${consumer.topic.partitions}"
    groupId="${consumer.groupId}" bootstrapServers="${config.basic.bootstrapServers}" />
  </kafka:kafka-consumer-config>
  <kafka:kafka-producer-config name="producer-basic" doc:name="Producer Basic" doc:id="DOC_ID" >
    <kafka:basic-kafka-producer-connection bootstrapServers="${config.basic.bootstrapServers}" />
  </kafka:kafka-producer-config>

    <flow name="consumer-flow" doc:id="DOC_ID">
        <kafka:consumer config-ref="consumer-krb-plain" topic="${consumer.topic.name}"
        doc:name="Consumer" doc:id="DOC_ID"/>
        <logger level="INFO" doc:name="Logger" doc:id="DOC_ID"
        message="#['New message arrived: ' ++ payload ++ &quot;, key:&quot; ++ attributes.key ++ &quot;, partition:&quot; ++ attributes.partition ++ &quot;, offset:&quot; ++ attributes.offset ]"/>
    </flow>
  <flow name="producer-flow" doc:id="DOC_ID" >
        <http:listener config-ref="HTTP_Listener_config" path="/pushMessage" doc:name="Push message endpoint" doc:id="DOC_ID" />
        <logger level="INFO" doc:name="Logger" doc:id="DOC_ID"
        message="#[&quot;Message: '&quot; ++ payload.message ++ &quot;' is going to be published to topic: '&quot; ++ payload.topic ++ &quot;'.&quot;]" />
        <kafka:producer config-ref="producer-krb-plain" topic="#[payload.topic]"
                        key="#[now()]"
                        doc:name="Producer" doc:id="DOC_ID" >
            <kafka:message ><![CDATA[#[payload.message]]]></kafka:message>
        </kafka:producer>
        <set-payload value="Message successfully sent to Kafka topic." doc:name="Push response builder"
        doc:id="DOC_ID" />
    </flow>
</mule>