Contact Us 1-800-596-4880

Using Anypoint Studio to Configure Apache Kafka 4.3 - Mule 4

In Anypoint Studio, add Anypoint Connector for Apache Kafka (Apache Kafka Connector) to a Mule project, configure the connection to the Kafka cluster, and configure an input source for the connector.

To configure a connector in Anypoint Studio:

  1. Add the connector to a Mule project.

  2. Configure the connector.

  3. Configure an input source for the connector.

Add the Connector in Studio

  1. In Studio, create a Mule Project.

  2. In the Mule Palette, click (X) Search in Exchange.

  3. In Add Modules to Project, type the name of the connector in the search field.

  4. Click the connector name in Available modules.

  5. Click Add.

  6. Click Finish.

Add the Connector Using Exchange

  1. In Studio, create a Mule Project.

  2. Click the Exchange (X) icon in the upper-left of the Studio task bar.

  3. In Exchange, click Login and supply your Anypoint Platform username and password.

  4. In Exchange, select All assets and search for Apache Kafka.

  5. Select the connector and click Add to project.

  6. Follow the prompts to add the connector to the Mule project.

Consumer Configuration

You can create a consumer configuration global element to reference from Apache Kafka Connector. This enables you to apply configuration details to multiple local elements in the flow.

  1. On the Studio canvas, click Global Elements.

  2. Click Create and expand Connector Configuration.

  3. Select Apache Kafka Consumer configuration and click OK.

  4. In the Connection field, select a connection type.

  5. Enter values for the following fields:

    Connection Type Field Name Description

    All connection types

    Bootstrap Server URLs

    List of host-port pairs that establish the initial connection to the Kafka cluster. Set this value to the bootstrap.servers value you must provide to Kafka consumer clients.

    Bean reference

    URLs that the consumer can use to connect to the Kafka cluster

    Group ID

    Default group ID for all the Kafka consumers that use this configuration

    Consumer Kerberos

    Principal

    Entity for the Apache Kafka system to authenticate

    Service name

    Kerberos principal name associated with Kafka Connector

    Kafka configuration file (krb5.conf)

    File that contains Kerberos configuration information

    Consumer SASL/SCRAM

    Username

    Username with which to log in to Kafka

    Password

    Password with which to log in to Kafka

    Encryption type

    Encryption algorithm used by SCRAM

    Consumer SASL/PLAIN

    Username

    Username with which to log in to Kafka

    Password

    Password with which to log in to Kafka

Producer Configuration

Create a producer configuration global element:

  1. On the Studio canvas, click Global Elements.

  2. Click Create and expand Connector Configuration.

  3. Select Apache Kafka Producer configuration and click OK.

  4. In the Connection field, select a connection type.

  5. Enter values for the following fields:

Connection Type

Field Name

Description

All connection types

Bootstrap Server URLs

List of host-port pairs that establish the initial connection to the Kafka cluster. Set this value to the bootstrap.servers value you must provide to Kafka producer clients.

Bean reference

URLs that the producer can use to connect to the Kafka cluster

Producer Kerberos

Principal

Entity for the Apache Kafka system to authenticate

Service name

Kerberos principal name associated with Kafka Connector

Kerberos configuration file (krb5.conf)

File that contains Kerberos configuration information

Consumer SASL/SCRAM

Username

Username with which to log in to Kafka

Password

Password with which to log in to Kafka

Encryption type

Encryption algorithm used by SCRAM

Consumer SASL/PLAIN

Username

Username with which to log in to Kafa

Password

Password with which to log in to Kafka

Configure TLS

To enable TLS for your app:

  1. Click the TLS tab to configure the truststore and keystore:

    • Trust Store Configuration

      • Path
        Location of the truststore file.

      • Password
        Password for the truststore file.

      • Type
        File format of the truststore file.

      • Algorithm
        Algorithm the truststore uses.

      • Insecure
        Boolean that determines whether or not to validate the truststore. If set to true, no validation occurs. The default value is false.

    • Key Store Configuration

      • Type
        Optionally specify the file format of the keystore file. The default value is JKS.

      • Path
        Location of the keystore file. This is optional and can be used for two-way authentication for the connector.

      • Alias
        Attribute that indicates the alias of the key to use when the keystore contains many private keys. If not defined, the first key in the file is used by default.

      • Key password
        Key manager password, which is the password for the private key inside the keystore.

      • Password
        Store password for the keystore file. This is optional and needed only if the Key Store Location is configured.

      • Algorithm
        Algorithm used in the keystore.

        kafka tls studio config

Configure the Commit Operation

  1. Drag the Commit operation to the Studio canvas.

  2. Configure the Commit operation in the General tab:

    Name

    Type

    Description

    Default Value

    Required

    Configuration

    String

    The name of the configuration to use.

    x

    Consumer commit key

    String

    The commitKey of the last poll. This operation is valid only when used inside a flow that is using one of the listener sources (Batch message listener or Message listener) which inserts this value as an attribute in the Mule event.

    x

    kafka commit studio config general
  3. In the Advanced tab, configure the reconnection strategy.

Configure the Consume Operation

  1. Drag the Consume operation to the Studio canvas.

  2. Configure the Consume operation in the General tab:

    Name Type Description Default Value Required

    Configuration

    String

    The name of the configuration to use.

    x

    Consumption timeout

    Number

    The number of time units that this operation waits for receiving messages.

    Timeout time unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    The unit of time for the timeout property.

    kafka consume studio config
  3. Configure the following settings in the Advanced tab:

    Name Type Description Default Value Required

    Operation Timeout

    Number

    Operation Timeout Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Streaming Strategy

    • repeatable-in-memory-stream

    • repeatable-file-store-stream

    • non-repeatable-stream

    Configure to use repeatable streams.

    Target Variable

    String

    The name of a variable that stores the operation’s output.

    Target Value

    String

    An expression that evaluates the operation’s output. The outcome of the evaluation is stored in the target variable.

    #[payload]

    Reconnection Strategy

    • reconnect

    • reconnect-forever

    A retry strategy in case of connectivity errors.

Configure the Publish Operation

  1. Drag the Publish operation to the Studio canvas.

  2. Configure the Publish operation in the General tab:

    Name Type Description Default Value Required

    Configuration

    String

    The name of the configuration to use.

    x

    Topic

    String

    The topic to publish to.

    Partition

    Number

    (Optional) The topic partition.

    Key

    Binary

    (Optional) Key for the published message.

    Message

    Binary

    (Optional) Message content of the message.

    #[payload]

    Headers

    Object

    (Optional) Headers for the message.

    kafka publish studio config
  3. Configure the following settings in the Advanced tab:

    Name Type Description Default Value Required

    Transactional Action

    Enumeration, one of:

    • ALWAYS_JOIN

    • JOIN_IF_POSSIBLE

    • NOT_SUPPORTED

    The type of joining action that operations can take regarding transactions.

    JOIN_IF_POSSIBLE

    Target Variable

    String

    The name of a variable to store the operation’s output.

    Target Value

    String

    An expression to evaluate against the operation’s output and store the expression outcome in the target variable.

    #[payload]

    Reconnection Strategy

    • reconnect

    • reconnect-forever

    A retry strategy in case of connectivity errors.

Configure the Seek Operation

  1. Drag the Seek operation to the Studio canvas.

  2. Configure the Seek operation in the General tab:

    Name Type Description Default Value Required

    Configuration

    String

    The name of the configuration to use.

    x

    Topic

    String

    The name of the topic on which to perform the seek operation.

    x

    Partition

    Number

    The partition number that will have its offset modified.

    x

  3. Configure the following settings in the Advanced tab:

    Name Type Description Default Value Required

    Operation Timeout

    Number

    Operation Timeout Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Reconnection Strategy

    • reconnect

    • reconnect-forever

    A retry strategy in case of connectivity errors.

Configure an Input Source

Configure an input source for the connector, such as the Message Consumer operation:

Name Description

Configuration

The name of the configuration to use.

Topic

Name of the topic from which to consume messages.

Primary Node Only

Whether to execute this source on only the primary node when running in a cluster.

Streaming Strategy

  • repeatable-in-memory-stream

  • repeatable-file-store-stream

  • non-repeatable-stream

Configure to use repeatable streams.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

  • reconnect

  • reconnect-forever

You can also use the Batch Message Listener operation as an input source in Apache Kafka Connector:

Name Description

Connector Configuration

The name of the configuration to use.

Poll timeout

The amount of time to block.

Poll timeout time unit

The time unit for the polling timeout. This combines with poll timeout to define the total timeout for the polling.

Acknowledgment mode

Declares the supported acknowledgment mode type.

Amount of parallel consumers

Declares how many consumers to use in parallel.

Primary Node Only

Whether this source should be executed only on the primary node when running in a cluster.

Streaming Strategy

Define the streaming strategy.

  • repeatable-in-memory-stream

  • repeatable-file-store-stream

  • non-repeatable-stream

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

  • reconnect

  • reconnect-forever

View on GitHub