Contact Us 1-800-596-4880

Apache Kafka Connector 4.4 Reference - Mule 4

Anypoint Connector for Apache Kafka (Apache Kafka Connector) enables you to interact with the Apache Kafka messaging system. It provides seamless integration between your Mule app and an Apache Kafka cluster, using Mule runtime engine (Mule).

When a Message Listener or a Batch Message Listener uses MANUAL acknowledgment mode, you must use a Commit operation at the end of the flow when the flow finishes successfully (either because there were no errors or because there is an On Error Continue component in the flow).
Mule server notifications are not supported because the Kafka library used in the connector manages network disruptions internally.

Configurations


Consumer Configuration

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Connection

The connection types to provide to this configuration.

x

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Default acknowledgement mode

Enumeration, one of

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO: Commits messages automatically if the flow finishes successfully.

  • MANUAL: The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK: Works in the same way as MANUAL mode, but the commits are asynchronous, which can lead to duplicate records.

AUTO

Default listener poll timeout

Number

The time, in time units, to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative.

100

Default listener poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. This combines with pollTimeout to define the total timeout for the polling.

MILLISECONDS

Default listener poll timeout

Number

The time, in time units, to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever.

-1

Default operation timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the operation timeout. This combines with operationTimeout to define the total default timeout for the operations that use this configuration.

SECONDS

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. The default value is provided by the system.

Connection Types

Consumer Plaintext Connection
Parameters
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection will initially create.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

300

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for request timeout scalar. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

If set to READ_COMMITTED, consumer.poll() only transactional messages that have been committed are returned. If set to READ_UNCOMMITTED (default), consumer.poll() all messages are returned, even transactional messages that were aborted. Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, read_committed consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in read_committed the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

  • EARLIEST: Automatically reset the offset to the earliest offset.

  • LATEST: Automatically reset the offset to the latest offset.

  • ERROR: Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Request Timeout

Number

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for request timeout scalar. You can override this parameter at the source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups.

  • If set to use_all_dns_ips, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • If set to resolve_canonical_bootstrap_servers_only each entry is resolved and expanded into a list of canonical names.

DEFAULT

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for fetching the heartbeat interval time scalar

SECONDS

Session Timeout

Number

The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for session timeout scalar

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle time scalar

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol then configure at least the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the minimum default fetch minimum size scalar. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the default fetch size maximum size scalar. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the fetch maximum wait timeout scalar

MILLISECONDS

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Consumer Kerberos Connection
Parameters
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for the Kafka consumers that use this configuration

Consumer Amount

Number

The number of consumers the connection initially creates

1

Maximum polling interval

Number

The maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, either the client resends the request or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level.

300

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for request timeout scalar. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally:

  • If set to READ_COMMITTED, consumer.poll() only transactional messages that have been committed are returned.

  • If set to READ_UNCOMMITTED, consumer.poll() all messages are returned, including transactional messages that were aborted.

    Non-transactional messages are returned unconditionally in either mode.

    Messages are always returned in offset order. In READ_COMMITTED mode, consumer.poll() returns messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction. When in READ_COMMITTED mode, the seekToEnd method returns the LSO.

    Messages that appear after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, read_committed consumers cannot read up to the high watermark when there are in-flight transactions.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether to exclude from a subscription internal topics that match a subscribed pattern. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

  • EARLIEST: Automatically reset the offset to the earliest offset

  • LATEST: Automatically reset the offset to the latest offset

  • ERROR: Throw an error if no previous offset is found for the consumer’s group

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the reconnect backoff timeout scalar

MILLISECONDS

Check CRC

Boolean

Automatically checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead. In situations that require extremely high performance, the check can be disabled.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Request Timeout

Number

The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout scalar. You can override this parameter at the source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups:

  • If set to use_all_dns_ips, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. This value applies to both bootstrap and advertised servers.

  • If set to resolve_canonical_bootstrap_servers_only, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than the Session timeout parameter, but typically should be set no higher than 1/3 of that value. You can set this value lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for fetching the heartbeat interval time scalar.

SECONDS

Session Timeout

Number

The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for session timeout scalar

SECONDS

Connection maximum idle time

Number

Closes idle connections after the number of milliseconds specified by this configuration

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle time scalar.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol then configure at least the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the minimum default fetch minimum size scalar. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the default fetch size maximum size scalar. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the fetch maximum wait timeout scalar

MILLISECONDS

Principal

String

The entity that is authenticated by a computer system or network. Principals can be individual people, computers, services, or computational entities such as processes and threads.

x

Service name

String

The Kerberos principal name that Kafka runs as

x

Kerberos configuration file (krb5.conf)

String

The path to the krb5.conf file, which contains Kerberos configuration information. This information includes the locations of KDCs and admin servers for the Kerberos realms of interest, defaults for the current realm, defaults for Kerberos applications, and the mappings of hostnames to Kerberos realms.

Use ticket cache

Boolean

Set this option to true to obtain the ticket-granting ticket (TGT) from the ticket cache. Set this option to false if you do not want to use the ticket cache. The connector searches for the ticket cache as follows:

  • On Solaris and Linux, the connector looks in /tmp/krb5cc_uid, where the uid is the numeric user identifier.

  • If the ticket cache is not available in /tmp/krb5cc_uid or the app is on a Windows platform, the connector looks in {user.home}{file.separator}krb5cc_{user.name}. You can override the ticket cache location by setting a value for the Ticket cache parameter.

    In a Windows environment, if a ticket cannot be retrieved from the file ticket cache, Windows uses the Local Security Authority (LSA) API to get the ticket-granting ticket (TGT).

false

Ticket cache

String

The name of the ticket cache that contains the user’s ticket-granting ticket (TGT). If this value is set, Use ticket cache must also be set to true. Otherwise, a configuration error is returned.

Use keytab

Boolean

Set this option to true if you want the connector to obtain the principal’s key from the keytab. If you don’t set this value, the connector locates the keytab by using the Kerberos configuration file. If the keytab is not specified in the Kerberos configuration file, the connector looks for the {user.home}{file.separator}krb5.keytab file.

false

Keytab

String

Set this option to the file name of the keytab to obtain the principal’s secret key.

Store key

Boolean

Set option this to true to store the principal’s in the subject’s private credentials.

false

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Consumer SASL/SCRAM Connection
Parameters
Name Type Description Default Value Required

username

String

The username with which to login

x

password

String

The password with which to login

x

encryptionType

Enumeration, one of:

  • SHA256

  • SHA512

The encryption algorithm used by SCRAM

x

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all Kafka consumers that use this configuration

Consumer Amount

Number

The number of consumers the connection initially creates

1

Maximum polling interval

Number

The maximum amount of time that the client waits for the response to a request. If the response is not received before the timeout elapses, either the client resends the request, or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level.

300

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout scalar. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally:

  • If set to READ_COMMITTED, consumer.poll() returns committed transactional messages only.

  • If set to READ_UNCOMMITTED (default), consumer.poll() returns all messages, including transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Therefore, in read_committed mode, consumer.poll() returns messages up to the last stable offset (LSO) only, which is one less than the offset of the first open transaction.

Messages that appear after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. When the isolation level is set to READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether internal topics that match a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

  • EARLIEST: Automatically reset the offset to the earliest offset

  • LATEST: Automatically reset the offset to the latest offset

  • ERROR: Throw an error if no previous offset is found for the consumer’s group

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the reconnect backoff timeout scalar

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, it can be disabled.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Request Timeout

Number

The maximum amount of time that the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or it fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout scalar. You can override this parameter at the source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups:

  • If set to use_all_dns_ips, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • If set to resolve_canonical_bootstrap_servers_only each entry is resolved and expanded into a list of canonical names.

DEFAULT

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for fetching the heartbeat interval time scalar

SECONDS

Session Timeout

Number

The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the session timeout scalar

SECONDS

Connection maximum idle time

Number

Closes idle connections after the number of milliseconds specified by this configuration

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle time scalar

SECONDS

TLS Configuration

TLS

Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the tls:context child element of the listener-config.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker hostname matches the hostname in the brokers certificate.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign to a consumer. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request.

The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available, unless the fetch request times out waiting for data to arrive.

Setting this value to a number greater than 1 causes the server to wait for larger amounts of data to accumulate. This can improve server throughput slightly, at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the minimum partition fetch size scalar.

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum.

The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per partition the server can return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress.

The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server waits before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the fetch maximum wait timeout scalar.

MILLISECONDS

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Consumer SASL/PLAIN Connection

Parameters
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate the server host name. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the broker’s certificate.

Group ID

String

Default group ID for the Kafka consumers that use this configuration

Consumer Amount

Number

The number of consumers the connection initially creates

1

Maximum polling interval

Number

The maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, either the client resends the request or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level.

300

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for request timeout scalar. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally:

  • If set to READ_COMMITTED, consumer.poll() only transactional messages that have been committed are returned.

  • If set to READ_UNCOMMITTED, consumer.poll() all messages are returned, including transactional messages that were aborted.

    Non-transactional messages are returned unconditionally in either mode.

    Messages are always returned in offset order. In READ_COMMITTED mode, consumer.poll() returns messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction. When in READ_COMMITTED mode, the seekToEnd method returns the LSO.

    Messages that appear after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, read_committed consumers cannot read up to the high watermark when there are in-flight transactions.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether to exclude from a subscription internal topics that match a subscribed pattern. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

  • EARLIEST: Automatically reset the offset to the earliest offset

  • LATEST: Automatically reset the offset to the latest offset

  • ERROR: Throw an error if no previous offset is found for the consumer’s group

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the reconnect backoff timeout scalar

MILLISECONDS

Check CRC

Boolean

Automatically checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead. In situations that require extremely high performance, the check can be disabled.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Request Timeout

Number

The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout scalar. You can override this parameter at the source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups:

  • If set to use_all_dns_ips, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. This value applies to both bootstrap and advertised servers.

  • If set to resolve_canonical_bootstrap_servers_only, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than the Session timeout parameter, but typically should be set no higher than 1/3 of that value. You can set this value lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for fetching the heartbeat interval time scalar.

SECONDS

Session Timeout

Number

The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for session timeout scalar

SECONDS

Connection maximum idle time

Number

Closes idle connections after the number of milliseconds specified by this configuration

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle time scalar.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol, you must configure at least the keystore in the tls:context child element of the listener-config.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the minimum default fetch minimum size scalar. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the default fetch size maximum size scalar. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the fetch maximum wait timeout scalar

MILLISECONDS

Username

String

The user used by the client to connect to the Kafka broker

x

Password

String

The password used by the client to connect to the Kafka broker

x

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.


Producer configuration

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Connection

The connection types to provide to this configuration.

x

Default topic

String

A default topic name to use by the producer operations, overridable at the operation’s configuration level.

defaultTopicName

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. The default value is provided by the system.

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule runtime engine (Mule) considers it eligible for expiration. This does not mean that the instance expires at the exact moment that it becomes eligible. Mule purges the instances when appropriate.

Connection Types

Producer Plaintext Connection
Parameters
Name Type Description Default Value Required

Bootstrap server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar.

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting should generally correspond to the total memory the producer will use, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, the connection is attempted on all of the IP addresses before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the connection’s maximum idle scalar.

SECONDS

Delivery Timeout

Number

An upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures, and so on, and may write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a Connection Exception is thrown.

false

Linger time

Number

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the linger time scalar

SECONDS

Maximum block time

Number

The configuration controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() be blocked. These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the maximum block time scalar

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than zero causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

1000

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at source level.

KB

Default request timeout

Number

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The request timeout time unit.

SECONDS

TLS Configuration

TLS

Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the tls:context child element of the listener-config.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Producer Kerberos Connection
Parameters
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

The producer attempts to batch records together into fewer requests when multiple records are sent to the same partition. This helps performance on both the client and the server. This parameter controls the default batch size, in bytes. No attempt is made to batch records larger than this size.

Requests sent to brokers contain one batch for each partition with data available to send. A small batch size makes batching less common and can reduce throughput. A batch size of zero disables batching. A very large batch size might use memory less efficiently because the connector allocates a buffer of the specified batch size in anticipation of additional records.

16

The batch size unit of measure

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar.

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered, the producer blocks for max.block.ms and then throws an exception. This setting should correspond roughly to the total memory the producer will use, but not all memory the producer uses is used for buffering. The producer uses additional memory for compression, if enabled, and for maintaining in-flight requests.

The default value in Kafka is 33554432 (32MB), but you should cap the value of Buffer size to align with expected values for Mule instances in CloudHub (v0.1 core)

1000

The buffer memory size unit of measure

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups:

  • DEFAULT: Uses the Kafka default

  • USE_ALL_DNS_IPS: When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. This functionality applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY: Each entry is resolved and expanded into a list of canonical names.

DEFAULT

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Closes idle connections after the value specified by this parameter

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle scalar

SECONDS

Delivery timeout

Number

An upper bound on the time to report success or failure after a call to send()` returns. This limits the total time that a record is delayed prior to sending, the time to wait for an acknowledgment from the broker (if expected), and the time allowed for send failures that can be retried. The producer can report a failure to send a record earlier than this value if either an unrecoverable error is encountered, the retries were exhausted, or the record was added to a batch that reached an earlier delivery expiration deadline. The value should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS0

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If set to false, the producer retries due to broker failures and other conditions can write duplicates of the retried message in the stream. Enabling idempotence requires the value of max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0, and acks to be all. If you do not explicitly set these values, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

The producer groups records that arrive in between request transmissions into a single batched request. Normally, this occurs under load, when records arrive faster than they can be sent out. However, in some circumstances, the client might want to reduce the number of requests, even under a moderate load.

This setting adds a small amount of artificial delay to the sending of recrords. Rather than immediately sending out a record, the producer waits for up to the specified delay to allow other records to be sent so it can batch the records together. This is analogous to Nagle’s algorithm in TCP. The setting gives the upper bound on the delay for batching. Once the connector receives batch.size worth of records for a partition, it sends the batch immediately, regardless of this setting.

This setting defaults to 0, which means no delay. Setting linger.ms=5, for example, has the same effect of reducing the number of requests sent, but adds up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the linger time scalar

SECONDS

Maximum block time

Number

Specifies for how long KafkaProducer.send() and KafkaProducer.partitionsFor() can be blocked. These methods can be blocked either because the buffer is full or because the unavailable.Blocking metadata in the user-supplied serializers or partitioner are counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the maximum block time scalar

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (for example, if retries are enabled).

5

Maximum request size

Number

The maximum size of a request, in bytes. This setting limits the number of record batches the producer sends in a single request. This effectively provides a cap on the maximum record batch size. The server has its own cap on record batch size, which can be different from this value.

1

Maximum request size unit.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar

MB

Producer acknowledge mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than zero causes the client to resend any record whose send failed with a potentially transient error. This retry is the same as having the client resend the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 can change the ordering of records when the following is true:

  • Two batches are sent to a single partition

  • The first batch fails and is retried

  • The second batch succeeds

    Producer requests are failed before the number of retries is exhausted if the timeout configured by delivery.timeout.ms expires before a successful acknowledgment. It is best to leave this value unset and use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the retry backoff timeout time scalar

MILLISECONDS

Retry backoff timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout time scalar

SECONDS

Default request timeout

Number

The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

TLS Configuration

TLS

Protocol to use for communication. Valid values are HTTP (default) and HTTPS. When using HTTPS, the HTTP communication is secured using TLS or SSL. If HTTPS was configured as the protocol, then the user needs to configure at least the keystore in the tls:context child element of this listener-config.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Producer SASL/SCRAM Connection
Parameters
Name Type Description Default Value Required

Username

String

The username with which to login.

x

Password

String

The password with which to login.

x

EncryptionType

Enumeration, one of:

  • SHA256

  • SHA512

The encryption algorithm used by SCRAM. Only acceptable values are SHA-256 and SHA-512.

x

Bootstrap server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting should generally correspond to the total memory the producer will use, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, the connection is attempted on all of the IP addresses before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the connection’s maximum idle scalar.

SECONDS

Delivery Timeout

Number

An upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures, and so on, and may write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0, and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a Connection Exception is thrown.

false

Linger time

Number

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive. This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the linger time scalar.

SECONDS

Maximum block time

Number

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block. These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the maximum block time scalar.

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this.

1

Maximum request size unit.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

1000

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at source level.

KB

Default request timeout

Number

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The request timeout time unit.

SECONDS

TLS Configuration

TLS

Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the tls:context child element of the listener-config.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Producer SASL/PLAIN Connection
Parameters
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate the server host name. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the broker’s certificate.

Batch size

Number

The producer attempts to batch records together into fewer requests when multiple records are sent to the same partition. This helps performance on both the client and the server. This parameter controls the default batch size, in bytes. No attempt is made to batch records larger than this size.

Requests sent to brokers contain one batch for each partition with data available to send. A small batch size makes batching less common and can reduce throughput. A batch size of zero disables batching. A very large batch size might use memory less efficiently because the connector allocates a buffer of the specified batch size in anticipation of additional records.

16

The batch size unit of measure

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered, the producer blocks for max.block.ms and then throws an exception. This setting should correspond roughly to the total memory the producer will use, but not all memory the producer uses is used for buffering. The producer uses additional memory for compression, if enabled, and for maintaining in-flight requests.

The default value in Kafka is 33554432 (32MB), but you should cap the value of Buffer size to align with expected values for Mule instances in CloudHub (v0.1 core)

1000

The buffer memory size unit of measure

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups:

  • DEFAULT: Uses the Kafka default

  • USE_ALL_DNS_IPS: When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. This functionality applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY: Each entry is resolved and expanded into a list of canonical names.

DEFAULT

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Closes idle connections after the value specified by this parameter

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle scalar

SECONDS

Delivery timeout

Number

An upper bound on the time to report success or failure after a call to send()` returns. This limits the total time that a record is delayed prior to sending, the time to wait for an acknowledgment from the broker (if expected), and the time allowed for send failures that can be retried. The producer can report a failure to send a record earlier than this value if either an unrecoverable error is encountered, the retries were exhausted, or the record was added to a batch that reached an earlier delivery expiration deadline. The value should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS0

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If set to false, the producer retries due to broker failures and other conditions can write duplicates of the retried message in the stream. Enabling idempotence requires the value of max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0, and acks to be all. If you do not explicitly set these values, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

The producer groups records that arrive in between request transmissions into a single batched request. Normally, this occurs under load, when records arrive faster than they can be sent out. However, in some circumstances, the client might want to reduce the number of requests, even under a moderate load.

This setting adds a small amount of artificial delay to the sending of recrords. Rather than immediately sending out a record, the producer waits for up to the specified delay to allow other records to be sent so it can batch the records together. This is analogous to Nagle’s algorithm in TCP. The setting gives the upper bound on the delay for batching. Once the connector receives batch.size worth of records for a partition, it sends the batch immediately, regardless of this setting.

This setting defaults to 0, which means no delay. Setting linger.ms=5, for example, has the same effect of reducing the number of requests sent, but adds up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the linger time scalar

SECONDS

Maximum block time

Number

Specifies for how long KafkaProducer.send() and KafkaProducer.partitionsFor() can be blocked. These methods can be blocked either because the buffer is full or because the unavailable.Blocking metadata in the user-supplied serializers or partitioner are counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the maximum block time scalar

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (for example, if retries are enabled).

5

Maximum request size

Number

The maximum size of a request, in bytes. This setting limits the number of record batches the producer sends in a single request. This effectively provides a cap on the maximum record batch size. The server has its own cap on record batch size, which can be different from this value.

1

Maximum request size unit.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar

MB

Producer acknowledge mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than zero causes the client to resend any record whose send failed with a potentially transient error. This retry is the same as having the client resend the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 can change the ordering of records when the following is true:

  • Two batches are sent to a single partition

  • The first batch fails and is retried

  • The second batch succeeds

    Producer requests are failed before the number of retries is exhausted if the timeout configured by delivery.timeout.ms expires before a successful acknowledgment. It is best to leave this value unset and use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the retry backoff timeout time scalar

MILLISECONDS

Retry backoff timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout time scalar

SECONDS

Default request timeout

Number

The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

TLS Configuration

TLS

Protocol to use for communication. Valid values are HTTP (default) and HTTPS. When using HTTPS, the HTTP communication is secured using TLS or SSL. If HTTPS was configured as the protocol, then the user needs to configure at least the keystore in the tls:context child element of this listener-config.

Username

String

The user used by the client to connect to the Kafka broker.

x

Password

String

The password used by the client to connect to the Kafka broker.

x

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Consumer Sources

Batch Message Listener

<kafka:batch-message-listener>

This source supports the consumption of messages from a Kafka cluster, producing a list of messages to the flow.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Poll timeout

Number

The amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling.

Acknowledgment mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka broker instance is notified of the consumption of messages.

  • AUTO: Commits messages automatically if the flow finishes successfully.

  • MANUAL: The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK: Works in the same way as MANUAL mode, but the commits are asynchronous, which can lead to duplicate records.

Number of parallel consumers.

Number

1

Primary Node Only

Boolean

Whether this source should be executed only on the primary node when running in a cluster.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Array of Record

Attributes Type

For Configurations

The Batch Message Listener source does not support configurable streaming strategies because the source takes a batch of records as an input stream. The streaming strategy configuration is non-repeatable-stream by default for the source.

Message Listener

<kafka:message-listener>

This source supports the consumption of messages from a Kafka Cluster, producing a single message to the flow.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Poll timeout

Number

The amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling.

Acknowledgment mode

Enumeration, one of:

Defines the way that the Kafka broker instance is notified of the consumption of messages:

  • AUTO: Commits messages automatically if the flow finishes successfully.

  • MANUAL: The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK: Works in the same way as MANUAL mode, but the commit is made asynchronously, which can lead to duplicate records.

Number of parallel consumers

Number

1

Primary Node Only

Boolean

Whether this source should be executed only on the primary node when running in a cluster.

Streaming Strategy

Configure to use repeatable streams.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

For Configurations

Consumer Operations

Commit

<kafka:commit>

Commits the offsets associated to a message or batch of messages consumed in a Message Listener. This would be a List or a single message consumed in the BatchMessageListenerSource.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Consumer commit key

String

The commitKey of the last poll. This operation is valid only when used inside a flow that is using one of the MessageListenerSource(s) ( BatchMessageListenerSource / BatchMessageListenerSource) which inserts this value as an attribute in the Mule Event.

x

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:INVALID_ACK_MODE

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ALREADY_COMMITTED

  • KAFKA:TIMEOUT

  • KAFKA:SESSION_NOT_FOUND

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Consume

<kafka:consume>

This operation allows receiving messages from one or more Kafka topics, it works very similarly to the Message Listener source, so all the operations that apply to that, apply to this operation as well.

Note: The Consume operation does not return the consumerCommitKey.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Consumption timeout

Number

The number of TimeUnits that this operation will wait for receiving messages.

Timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The unit of time for the timeout property.

Operation Timeout

Number

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Acknowledgement mode:

  • AUTO: Commits messages automatically if the flow finishes successfully.

  • MANUAL: The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK: Works in the same way as MANUAL mode, but the commits are asynchronous, which can lead to duplicate records.

IMMEDIATE

Streaming Strategy

Configure to use repeatable streams.

Target Variable

String

Name of the variable in which to store the operation’s output

Target Value

String

Expression that evaluates the operation’s output. The expression outcome is stored in the target variable.

#[payload]

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

For Configurations

Throws

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ILLEGAL_STATE

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Seek

<kafka:seek>

Sets the current offset of the consumer for the given topic and partition to the provided offset value.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use

x

Topic

String

The name of the topic on which the seek operation is performed.

x

Partition

Number

The partition number that will have its offset modified

x

Offset

Number

The offset value to commit for the configured partition

x

Operation Timeout

Number

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:INVALID_TOPIC

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Producer Operation

Publish

<kafka:publish>

Publish a message to the specified Kafka topic, optionally specifying the partition, key, and message content for it. The publish operation supports transactions.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Topic

String

The topic to publish to.

Partition

Number

(Optional) The topic partition.

Key

Binary

(Optional) Key for the published message.

Message

Binary

(Optional) Message content of the message.

#[payload]

Headers

Object

(Optional) Headers for the message.

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

The type of joining action that operations can take for transactions.

JOIN_IF_POSSIBLE

Target Variable

String

Name of the variable in which to store the operation’s output

Target Value

String

Expression that evaluates the operation’s output. The expression outcome is stored in the target variable.

#[payload]

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:INVALID_TOPIC_PARTITION

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INPUT_TOO_LARGE

  • KAFKA:CONNECTIVITY

Types

TLS

Field Type Description Default Value Required

Enabled Protocols

String

A comma-separated list of protocols enabled for this context.

Enabled Cipher Suites

String

A comma-separated list of cipher suites enabled for this context.

Trust Store

Key Store

Revocation Check

Truststore

Field Type Description Default Value Required

Path

String

The location (which will be resolved relative to the current classpath and file system, if possible) of the truststore.

Password

String

The password used to protect the truststore.

Type

String

The type of store used.

Algorithm

String

The algorithm used by the truststore.

Insecure

Boolean

If true, no certificate validations will be performed, rendering connections vulnerable to attacks. Use at your own risk.

Key Store

Field Type Description Default Value Required

Path

String

The location (which will be resolved relative to the current classpath and file system, if possible) of the keystore.

Type

String

The type of store used.

Alias

String

When the keystore contains many private keys, this attribute indicates the alias of the key that should be used. If not defined, the first key in the file will be used by default.

Key Password

String

The password used to protect the private key.

Password

String

The password used to protect the keystore.

Algorithm

String

The algorithm used by the key store.

Standard Revocation Check

Field Type Description Default Value Required

Only End Entities

Boolean

Only verify the last element of the certificate chain.

Prefer Crls

Boolean

Try CRL instead of OCSP first.

No Fallback

Boolean

Do not use the secondary checking method (the one not selected before).

Soft Fail

Boolean

Avoid verification failure when the revocation server cannot be reached or is busy.

Custom OCSP Responder

Field Type Description Default Value Required

Url

String

The URL of the OCSP responder.

Cert Alias

String

Alias of the signing certificate for the OCSP response (must be in the truststore), if present.

CRL File

Field Type Description Default Value Required

Path

String

The path to the CRL file.

Topic Partition

Field Type Description Default Value Required

Topic

String

x

Partition

Number

x

Reconnection

Field Type Description Default Value Required

Fails Deployment

Boolean

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Reconnection Strategy

The reconnection strategy to use.

Reconnect

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

Count

Number

How many reconnection attempts to make.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Reconnect Forever

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Record

Field Type Description Default Value Required

Attributes

Payload

Binary

Kafka Record Attributes

Field Type Description Default Value Required

Consumer Commit Key

String

Creation Timestamp

DateTime

Headers

Object

Key

Binary

Leader Epoch

Number

Log Append Timestamp

DateTime

Offset

Number

Partition

Number

Serialized Key Size

Number

Serialized Value Size

Number

Topic

String

Consumer Context

Field Type Description Default Value Required

Consumer Commit Key

String

Redelivery Policy

Field Type Description Default Value Required

Max Redelivery Count

Number

The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message.

Use Secure Hash

Boolean

Whether to use a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

The secure hashing algorithm to use.

SHA-256

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property can be set only if useSecureHash is false.

Object Store

ObjectStore

The object store in which to store the redelivery counter for each message.

Repeatable In Memory Stream

Field Type Description Default Value Required

Initial Buffer Size

Number

The amount of memory to allocate to consume the stream and provide random access to it. If the stream contains more data than can fit into this buffer, then the buffer expands according to the bufferSizeIncrement attribute, with an upper limit of maxInMemorySize.

Buffer Size Increment

Number

This is by how much the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised when the buffer gets full.

Max Buffer Size

Number

The maximum amount of memory to use. If more than that is used, then a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower than or equal to zero means no limit.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit in which all these attributes are expressed.

Repeatable File Store Stream

Field Type Description Default Value Required

In Memory Size

Number

Defines the maximum memory that the stream should use to keep data in memory. If more than that is consumed then content is buffered on disk.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit in which maxInMemorySize is expressed.

Expiration Policy

Field Type Description Default Value Required

Max Idle Time

Number

A scalar time value for the maximum amount of time a dynamic configuration instance should be allowed to be idle before it’s considered eligible for expiration

Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

A time unit that qualifies the maxIdleTime attribute

Kafka Message Metadata

Field Type Description Default Value Required

Offset

Number

Partition

Number

Serialized Key Size

Number

Serialized Value Size

Number

Timestamp

DateTime

Topic

String

View on GitHub