Using Anypoint Studio to Configure Apache Kafka 4.3 - Mule 4
In Anypoint Studio, add Anypoint Connector for Apache Kafka (Apache Kafka Connector) to a Mule project, configure the connection to the Kafka cluster, and configure an input source for the connector.
To configure a connector in Anypoint Studio:
-
Add the connector to a Mule project.
-
Configure the connector.
-
Configure an input source for the connector.
Add the Connector in Studio
-
In Studio, create a Mule Project.
-
In the Mule Palette, click (X) Search in Exchange.
-
In Add Modules to Project, type the name of the connector in the search field.
-
Click the connector name in Available modules.
-
Click Add.
-
Click Finish.
Add the Connector Using Exchange
-
In Studio, create a Mule Project.
-
Click the Exchange (X) icon in the upper-left of the Studio task bar.
-
In Exchange, click Login and supply your Anypoint Platform username and password.
-
In Exchange, select All assets and search for
Apache Kafka
. -
Select the connector and click Add to project.
-
Follow the prompts to add the connector to the Mule project.
Consumer Configuration
You can create a consumer configuration global element to reference from Apache Kafka Connector. This enables you to apply configuration details to multiple local elements in the flow.
-
On the Studio canvas, click Global Elements.
-
Click Create and expand Connector Configuration.
-
Select Apache Kafka Consumer configuration and click OK.
-
In the Connection field, select a connection type.
-
Enter values for the following fields:
Connection Type Field Name Description All connection types
Bootstrap Server URLs
List of host-port pairs that establish the initial connection to the Kafka cluster. Set this value to the
bootstrap.servers
value you must provide to Kafka consumer clients.Bean reference
URLs that the consumer can use to connect to the Kafka cluster
Group ID
Default group ID for all the Kafka consumers that use this configuration
Consumer Kerberos
Principal
Entity for the Apache Kafka system to authenticate
Service name
Kerberos principal name associated with Kafka Connector
Kafka configuration file (krb5.conf)
File that contains Kerberos configuration information
Consumer SASL/SCRAM
Username
Username with which to log in to Kafka
Password
Password with which to log in to Kafka
Encryption type
Encryption algorithm used by SCRAM
Consumer SASL/PLAIN
Username
Username with which to log in to Kafka
Password
Password with which to log in to Kafka
Producer Configuration
Create a producer configuration global element:
-
On the Studio canvas, click Global Elements.
-
Click Create and expand Connector Configuration.
-
Select Apache Kafka Producer configuration and click OK.
-
In the Connection field, select a connection type.
-
Enter values for the following fields:
Connection Type |
Field Name |
Description |
All connection types |
Bootstrap Server URLs |
List of host-port pairs that establish the initial connection to the Kafka cluster. Set this value to the |
Bean reference |
URLs that the producer can use to connect to the Kafka cluster |
|
Producer Kerberos |
Principal |
Entity for the Apache Kafka system to authenticate |
Service name |
Kerberos principal name associated with Kafka Connector |
|
Kerberos configuration file (krb5.conf) |
File that contains Kerberos configuration information |
|
Consumer SASL/SCRAM |
Username |
Username with which to log in to Kafka |
Password |
Password with which to log in to Kafka |
|
Encryption type |
Encryption algorithm used by SCRAM |
|
Consumer SASL/PLAIN |
Username |
Username with which to log in to Kafa |
Password |
Password with which to log in to Kafka |
Configure TLS
To enable TLS for your app:
-
Click the TLS tab to configure the truststore and keystore:
-
Trust Store Configuration
-
Path
Location of the truststore file. -
Password
Password for the truststore file. -
Type
File format of the truststore file. -
Algorithm
Algorithm the truststore uses. -
Insecure
Boolean that determines whether or not to validate the truststore. If set totrue
, no validation occurs. The default value isfalse
.
-
-
Key Store Configuration
-
Type
Optionally specify the file format of the keystore file. The default value isJKS
. -
Path
Location of the keystore file. This is optional and can be used for two-way authentication for the connector. -
Alias
Attribute that indicates the alias of the key to use when the keystore contains many private keys. If not defined, the first key in the file is used by default. -
Key password
Key manager password, which is the password for the private key inside the keystore. -
Password
Store password for the keystore file. This is optional and needed only if the Key Store Location is configured. -
Algorithm
Algorithm used in the keystore.
-
-
Configure the Commit Operation
-
Drag the Commit operation to the Studio canvas.
-
Configure the Commit operation in the General tab:
Name
Type
Description
Default Value
Required
Configuration
String
The name of the configuration to use.
x
Consumer commit key
String
The commitKey of the last poll. This operation is valid only when used inside a flow that is using one of the listener sources (Batch message listener or Message listener) which inserts this value as an attribute in the Mule event.
x
-
In the Advanced tab, configure the reconnection strategy.
Configure the Consume Operation
-
Drag the Consume operation to the Studio canvas.
-
Configure the Consume operation in the General tab:
Name Type Description Default Value Required Configuration
String
The name of the configuration to use.
x
Consumption timeout
Number
The number of time units that this operation waits for receiving messages.
Timeout time unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
The unit of time for the timeout property.
-
-
Configure the following settings in the Advanced tab:
Name Type Description Default Value Required Operation Timeout
Number
Operation Timeout Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Streaming Strategy
-
repeatable-in-memory-stream
-
repeatable-file-store-stream
-
non-repeatable-stream
Configure to use repeatable streams.
Target Variable
String
The name of a variable that stores the operation’s output.
Target Value
String
An expression that evaluates the operation’s output. The outcome of the evaluation is stored in the target variable.
#[payload]
Reconnection Strategy
-
reconnect
-
reconnect-forever
A retry strategy in case of connectivity errors.
-
Configure the Publish Operation
-
Drag the Publish operation to the Studio canvas.
-
Configure the Publish operation in the General tab:
Name Type Description Default Value Required Configuration
String
The name of the configuration to use.
x
Topic
String
The topic to publish to.
Partition
Number
(Optional) The topic partition.
Key
Binary
(Optional) Key for the published message.
Message
Binary
(Optional) Message content of the message.
#[payload]
Headers
Object
(Optional) Headers for the message.
-
Configure the following settings in the Advanced tab:
Name Type Description Default Value Required Transactional Action
Enumeration, one of:
-
ALWAYS_JOIN
-
JOIN_IF_POSSIBLE
-
NOT_SUPPORTED
The type of joining action that operations can take regarding transactions.
JOIN_IF_POSSIBLE
Target Variable
String
The name of a variable to store the operation’s output.
Target Value
String
An expression to evaluate against the operation’s output and store the expression outcome in the target variable.
#[payload]
Reconnection Strategy
-
reconnect
-
reconnect-forever
A retry strategy in case of connectivity errors.
-
Configure the Seek Operation
-
Drag the Seek operation to the Studio canvas.
-
Configure the Seek operation in the General tab:
Name Type Description Default Value Required Configuration
String
The name of the configuration to use.
x
Topic
String
The name of the topic on which to perform the seek operation.
x
Partition
Number
The partition number that will have its offset modified.
x
-
Configure the following settings in the Advanced tab:
Name Type Description Default Value Required Operation Timeout
Number
Operation Timeout Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Reconnection Strategy
-
reconnect
-
reconnect-forever
A retry strategy in case of connectivity errors.
-
Configure an Input Source
Configure an input source for the connector, such as the Message Consumer operation:
Name | Description |
---|---|
Configuration |
The name of the configuration to use. |
Topic |
Name of the topic from which to consume messages. |
Primary Node Only |
Whether to execute this source on only the primary node when running in a cluster. |
Streaming Strategy |
Configure to use repeatable streams. |
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
Reconnection Strategy |
A retry strategy in case of connectivity errors.
|
You can also use the Batch Message Listener operation as an input source in Apache Kafka Connector:
Name | Description |
---|---|
Connector Configuration |
The name of the configuration to use. |
Poll timeout |
The amount of time to block. |
Poll timeout time unit |
The time unit for the polling timeout. This combines with poll timeout to define the total timeout for the polling. |
Acknowledgment mode |
Declares the supported acknowledgment mode type. |
Amount of parallel consumers |
Declares how many consumers to use in parallel. |
Primary Node Only |
Whether this source should be executed only on the primary node when running in a cluster. |
Streaming Strategy |
Define the streaming strategy.
|
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
Reconnection Strategy |
A retry strategy in case of connectivity errors.
|