String
Use Flow Designer to Configure Apache Kafka Connector v4.2 - Mule 4
Design Center enables you to create apps visually. To use Design Center, work with your Anypoint Platform administrator to ensure that you have a Design environment. For more information, see the Flow Designer Tour.
To create an app in Design Center:
-
Configure the input source (trigger) for your app.
-
Add the connector as a component to process the input for the app.
For information on field values, see the Apache Kafka Connector Reference.
Configure the Trigger
-
In Design Center, click Create new.
-
Click Create new application.
-
Enter a Project name, and select the Target Environment.
-
Click Create.
-
Click Go straight to canvas to exit from Let’s get started.
-
Click the name of the trigger card.
-
If you are using the Anypoint Connector for Apache Kafka (Apache Kafka Connector) as a trigger, search for the connector; otherwise, search for
HTTP Listener
orScheduler
. -
Select the source.
Apache Kafka Connector provides Batch message listener and Message listener as sources. -
To configure Consumer, set the field values:
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
The name for this configuration. Connectors reference the configuration with this name. |
x |
||
Default Acknowledgment Mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of message consumption.
|
|
|
Default Listener Poll Timeout |
Number |
The time, in time units, to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative. |
|
|
Default Listener Poll Timeout Time Unit |
Enumeration, one of:
|
The time unit for the polling timeout. This combines with pollTimeout to define the total timeout for the polling. |
|
|
Default Listener Poll Timeout |
Number |
The time, in time units, to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever. |
|
|
Default Operation Timeout Time Unit |
Enumeration, one of:
|
The time unit for the operation timeout. This combines with operationTimeout to define the total default timeout for the operations that use this configuration. |
SECONDS |
|
Zone ID |
String |
Converts the provided timestamps into |
Design Center automatically saves changes you make in a session.
Kafka Consumer Plaintext Connection
Add a connection configuration:
-
Click Add Connection.
-
Enter a unique Connection Name.
-
Optionally, select Share this connection with my business group.
-
Select the Connection Type, and enter the values for the connection:
Name Type Description Default Value Required Bootstrap Server URLs
Array of String
The list of servers to use to bootstrap the connection with the Apache Kafka cluster. This can be a partial list of the available servers.
x
Group ID
String
Default group ID for all the Kafka consumers that use this configuration.
Consumer Amount
Number
Determines the number of consumers the connection will initially create.
1
Maximum Polling Interval
Number
Controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client resends the request; or the request fails if the specified number of retries are exhausted. This parameter can be overridden at the source level.
300
Maximum Polling Interval Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Determines the time unit for request timeout scalar. This parameter can be overridden at source level.
SECONDS
Isolation Level
Enumeration, one of:
-
READ_UNCOMMITTED
-
READ_COMMITTED
Controls how to read messages that are written transactionally.
If set to
read_committed
,consumer.poll()
will return only transactional messages that have been committed. If set toread_uncommitted’ (default), `consumer.poll()
returns all messages, even transactional messages that were aborted. Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, inread_committed
mode,consumer.poll()
returns only messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction.In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result,
read_committed
consumers are not able to read up to the high watermark when there are in-flight transactions. Further, when inread_committed
theseekToEnd
method returns the LSO.READ_UNCOMMITTED
Exclude Internal Topics
Boolean
Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.
true
Auto Offset Reset
Enumeration, one of:
-
EARLIEST
-
LATEST
-
ERROR
Determines what to do when there is no initial offset in Apache Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):
-
EARLIEST: Automatically reset the offset to the earliest offset.
-
LATEST: Automatically reset the offset to the latest offset.
-
ERROR: Throw an error if no previous offset is found for the consumer’s group.
LATEST
Retry Backoff Timeout
Number
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.
100
Retry Backoff Timeout Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Determines the time unit for the reconnect backoff timeout scalar.
MILLISECONDS
Check CRC
Boolean
Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in cases seeking extreme performance, this can be disabled.
true
Default Receive Buffer Size
Number
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is
-1
, the OS default is used. This parameter can be overridden at the source level.64
Default Receive Buffer Size Unit
Enumeration, one of:
-
BYTE
-
KB
-
MB
-
GB
The unit of measure for the receive buffer size scalar. This parameter can be overridden at the source level.
KB
Default Send Buffer Size
Number
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is
-1
, the OS default is used. This parameter can be overridden at the source level.128
Default Send Buffer Size Unit
Enumeration, one of:
-
BYTE
-
KB
-
MB
-
GB
The unit of measure for the send buffer size scalar. This parameter can be overridden at the source level.
KB
Request Timeout
Number
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client resends the request if necessary or fails the request if the retries are exhausted. This parameter can be overridden at the source level.
30
Request Timeout Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Determines the time unit for request timeout scalar. This parameter can be overridden at the source level.
SECONDS
Default Record Limit
Number
The maximum number of records returned on a poll call to the Apache Kafka cluster. You can override this parameter at the source level.
500
DNS Lookups
Enumeration, one of:
-
DEFAULT
-
USE_ALL_DNS_IPS
-
RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY
Controls how the client uses DNS lookups. If set to
use_all_dns_ips
then, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers. If the value isresolve_canonical_bootstrap_servers_only
each entry is resolved and expanded into a list of canonical names.DEFAULT
Heartbeat Interval
Number
The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than
session.timeout.ms
, but typically no higher than 1/3 of that value. You can adjust it to be even lower to control the expected time for normal rebalances.3
Heartbeat Interval Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Determines the time unit for fetching the heartbeat interval time scalar.
SECONDS
Session Timeout
Number
The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to confirm to the broker that it is still alive. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by
group.min.session.timeout.ms
andgroup.max.session.timeout.ms
.10
Session Timeout Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Determines the time unit for session timeout scalar.
SECONDS
Connection Maximum Idle Time
Number
Close idle connections after the number of milliseconds specified by this configuration.
540
Connection Maximum Idle Time Time Unit
Enumeration, one of:
-
NANOSECONDS
-
MICROSECONDS
-
MILLISECONDS
-
SECONDS
-
MINUTES
-
HOURS
-
DAYS
Determines the time unit for connections maximum idle time scalar.
SECONDS
-
-
Click Save.
Add Topics and Assignments
-
In Topics, next to Topic Subscription Patterns, click Add.
-
In the Add Item dialog, enter the Topic Subscription Pattern.
-
To add topic-partition pairs to assign, next to Assignments, click Add.
Add a Component
-
Click + next to the trigger card.
-
In Select a component, search for the connector name.
-
Select Apache Kafka Connector as the component.
-
Select the Publish operation and configure the following values:
-
Topic
Enter the topic to which to send a message. -
Partition
Enter the number for the partition that will have its offset modified. -
Key
Enter the key of the message to send. -
Message
Enter the content for the message to send. -
Headers
Optionally, click Add to add a header for the message to send.
-
Next
Now that you have completed configuring Design Center, see the Apache Kafka Connector topic for more information.