Contact Us 1-800-596-4880

Apache Kafka Connector 4.7 Reference - Mule 4

Anypoint Connector for Apache Kafka (Apache Kafka Connector) enables you to interact with the Apache Kafka messaging system. It provides seamless integration between your Mule app and an Apache Kafka cluster, using Mule runtime engine (Mule).

When a Message Listener or a Batch Message Listener uses the MANUAL acknowledgement mode, you must use a Commit operation at the end of the flow when the flow finishes successfully (either because there were no errors or because there is an On Error Continue component in the flow).
Mule server notifications are not supported because the Kafka library used in the connector manages network disruptions internally.

Consumer Configuration

Configuration for consumers for Apache Kafka Connector.

Name Type Description Default Value Required

Name

String

Name for this configuration. Connectors reference the configuration with this name.

x

Connection

Connection types for this configuration.

x

Default acknowledgement mode

Enumeration, one of

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

AUTO

Default listener poll timeout

Number

Time to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative.

100

Default listener poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default listener poll timeout field. This combines with Poll Timeout to define the total timeout for the polling.

MILLISECONDS

Default operation poll timeout

Number

Time to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever.

-1

Default operation poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default operation timeout field. This combines with Operation Timeout to define the total default timeout for the operations that use this configuration.

SECONDS

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. The default value is provided by the system.

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Consumer Plaintext Connection Type

Use an unauthenticated and non-encrypted connection type.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer Kerberos Connection Type

Use Kerberos configuration files.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Principal

String

Entity that is authenticated by a computer system or a network. Principals can be individual people, computers, services, or computational entities such as processes and threads.

x

Service name

String

Kerberos principal name that Kafka runs as.

x

Kerberos configuration file (krb5.conf)

String

Path to the krb5.conf file, which contains Kerberos configuration information. This information includes the locations of KDCs and admin servers for the Kerberos realms of interest, defaults for the current realm, defaults for Kerberos applications, and the mappings of hostnames to Kerberos realms.

Use ticket cache

Boolean

Set this option to true to obtain the ticket-granting ticket (TGT) from the ticket cache. Set this option to false if you do not want to use the ticket cache. The connector searches for the ticket cache as follows:

  • On Solaris and Linux, the connector looks in /tmp/krb5cc_uid, in which the uid is the numeric user identifier.

  • If the ticket cache is not available in /tmp/krb5cc_uid or the app is on a Windows platform, the connector looks in {user.home}{file.separator}krb5cc_{user.name}. You can override the ticket cache location by setting a value for the Ticket cache field.

    In a Windows environment, if a ticket cannot be retrieved from the file ticket cache, Windows uses the Local Security Authority (LSA) API to get the ticket-granting ticket (TGT).

false

Ticket cache

String

Name of the ticket cache that contains the user’s ticket-granting ticket (TGT). If this value is set, Use ticket cache must also be set to true. Otherwise, a configuration error is returned.

Use keytab

Boolean

Set this option to true if you want the connector to obtain the principal’s key from the keytab. If you don’t set this value, the connector locates the keytab by using the Kerberos configuration file. If the keytab is not specified in the Kerberos configuration file, the connector looks for the {user.home}{file.separator}krb5.keytab file.

false

Keytab

String

Set this option to the file name of the keytab to obtain the principal’s secret key.

Store key

Boolean

Set this option to true to store the principal’s subject private credentials.

false

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer SASL/SCRAM Connection Type

Use Salted Challenge Response Authentication Mechanism (SCRAM) or SASL/SCRAM, a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username and password authentication like PLAIN. Apache Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Username

String

Username with which to login.

x

Password

String

Password with which to login.

x

Encryption type

Enumeration, one of:

  • SHA256

  • SHA512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer SASL/PLAIN Connection Type

Use SASL authenticated with a username and password.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Username

String

User used by the client to connect to the Kafka broker.

x

Password

String

Password used by the client to connect to the Kafka broker.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer SASL/TOKEN Connection Type

Use delegation tokens to authenticate to the Kafka cluster.

Due to security reasons, a delegation token cannot be renewed if the initial authentication uses a delegation token. For more information, refer to Delegation Token Support for Kafka.
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Token ID

String

ID of the token.

x

Token HMAC

String

Token HMAC.

x

Encryption type

Enumeration, one of:

  • SCRAM_SHA_256

  • SCRAM_SHA_512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.


Producer Configuration

Configuration for producers for Apache Kafka Connector.

Name Type Description Default Value Required

Name

String

Name for this configuration. Connectors reference the configuration with this name.

x

Connection

Connection types for this configuration.

x

Default topic

String

Default topic name to use by the producer operations, overridable at the operation’s configuration level.

defaultTopicName

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. The default value is provided by the system.

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Producer Plaintext Connection Type

Use an unauthenticated and non-encrypted connection type.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of 0 disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names..

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer Kerberos Connection Type

Use Kerberos configuration files.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of 0 disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Principal

String

Entity that is authenticated by a computer system or a network. Principals can be individual people, computers, services, or computational entities such as processes and threads.

x

Service name

String

Kerberos principal name that Kafka runs as.

x

Kerberos configuration file (krb5.conf)

String

Path to the krb5.conf file, which contains Kerberos configuration information. This information includes the locations of KDCs and admin servers for the Kerberos realms of interest, defaults for the current realm, defaults for Kerberos applications, and the mappings of hostnames to Kerberos realms.

Use ticket cache

Boolean

Set this option to true to obtain the ticket-granting ticket (TGT) from the ticket cache. Set this option to false if you do not want to use the ticket cache. The connector searches for the ticket cache as follows:

  • On Solaris and Linux, the connector looks in /tmp/krb5cc_uid, in which the uid is the numeric user identifier.

  • If the ticket cache is not available in /tmp/krb5cc_uid or the app is on a Windows platform, the connector looks in {user.home}{file.separator}krb5cc_{user.name}. You can override the ticket cache location by setting a value for the Ticket cache field.

    In a Windows environment, if a ticket cannot be retrieved from the file ticket cache, Windows uses the Local Security Authority (LSA) API to get the ticket-granting ticket (TGT).

false

Ticket cache

String

Name of the ticket cache that contains the user’s ticket-granting ticket (TGT). If this value is set, Use ticket cache must also be set to true. Otherwise, a configuration error is returned.

Use keytab

Boolean

Set this option to true if you want the connector to obtain the principal’s key from the keytab. If you don’t set this value, the connector locates the keytab by using the Kerberos configuration file. If the keytab is not specified in the Kerberos configuration file, the connector looks for the {user.home}{file.separator}krb5.keytab file.

false

Keytab

String

Set this option to the file name of the keytab to obtain the principal’s secret key.

Store key

Boolean

Set this option to true to store the principal’s subject private credentials.

false

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer SASL/SCRAM Connection Type

Use Salted Challenge Response Authentication Mechanism (SCRAM) or SASL/SCRAM, a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username and password authentication like PLAIN. Apache Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of 0 disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Username

String

Username with which to login.

x

Password

String

Password with which to login.

x

EncryptionType

Enumeration, one of:

  • SHA256

  • SHA512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer SASL/PLAIN Connection Type

Use SASL authenticated with a username and password.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Username

String

Username with which to login.

x

Password

String

Password with which to login.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer SASL/TOKEN Connection Type

Use delegation tokens to authenticate to the Kafka cluster.

Due to security reasons, a delegation token cannot be renewed if the initial authentication uses a delegation token. For more information, refer to Delegation Token Support for Kafka.
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Token ID

String

ID of the token.

x

Token HMAC

String

Token HMAC.

x

Encryption type

Enumeration, one of:

  • SCRAM_SHA_256

  • SCRAM_SHA_512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer Sources

Batch Message Listener

<kafka:batch-message-listener>

This source supports the consumption of messages from a Kafka cluster, producing a list of messages to the flow.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Poll timeout

Number

Amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Poll timeout field.

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

Amount of parallel consumers.

Number

Number of parallel consumers.

1

Primary Node Only

Boolean

Determines whether to execute this source on only the primary node when running Mule instances in a cluster.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Array of Record

Attributes Type

Associated Configurations

The Batch Message Listener source does not support configurable streaming strategies because the source takes a batch of records as an input stream. The streaming strategy configuration is non-repeatable-stream by default for the source.

Message Listener

<kafka:message-listener>

This source supports the consumption of messages from a Kafka cluster, producing a single message to the flow.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Poll timeout

Number

Amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Poll timeout field.

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

Amount of parallel consumers.

Number

Number of parallel consumers.

1

Primary Node Only

Boolean

Determines whether to execute this source on only the primary node when running Mule instances in a cluster.

Streaming Strategy

Configures how Mule processes streams. Repeatable streams are the default behavior.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

Associated Configurations

Consumer Operations

Commit

<kafka:commit>

Commits the offsets associated to a message or batch of messages consumed in a Message Listener. This is a list or a single message consumed in the Batch Message Listener source.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Consumer commit key

String

Commit key of the last poll. This operation is valid only when used inside a flow that uses one of the sources (Batch Message Listener or Message Listener) which inserts this value as an attribute in the Mule event.

x

Reconnection Strategy

Retry strategy in case of connectivity errors.

Associated Configurations

Throws

  • KAFKA:INVALID_ACK_MODE

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ALREADY_COMMITTED

  • KAFKA:TIMEOUT

  • KAFKA:SESSION_NOT_FOUND

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Consume

<kafka:consume>

Receives messages from one or more Kafka topics. This operation works similarly to the Message Listener source, so all the operations that apply to that, apply to this operation as well.

The Consume operation does not return the consumerCommitKey.
Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Consumption timeout

Number

Number of time units that this operation waits for receiving messages.

Timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Consumption timeout field.

Operation Timeout

Number

Timeout for the operation to start executing.

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Operation Timeout field.

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

IMMEDIATE

Streaming Strategy

Configures how Mule processes streams. Repeatable streams are the default behavior.

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

Associated Configurations

Throws

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ILLEGAL_STATE

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Seek

<kafka:seek>

Sets the current offset of the consumer for the given topic and partition to the provided offset value.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Topic

String

Name of the topic on which the Seek operation is performed.

x

Partition

Number

Partition number that has its offset modified.

x

Offset

Number

Offset value to commit for the configured partition.

x

Operation Timeout

Number

Timeout for the operation to start executing.

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Operation Timeout field.

Reconnection Strategy

Retry strategy in case of connectivity errors.

Associated Configurations

Throws

  • KAFKA:INVALID_TOPIC

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Producer Operation

Publish

<kafka:publish>

Publishes a message to the specified Kafka topic, optionally specifying the partition, key, and message content for it. This operation supports transactions.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Topic

String

Topic to publish to.

Partition

Number

(Optional) Topic partition.

Key

Binary

(Optional) Key for the published message.

Message

Binary

(Optional) Message content of the message.

#[payload]

Headers

Object

(Optional) Headers for the message.

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

Type of joining action that operations can take for transactions.

JOIN_IF_POSSIBLE

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Associated Configurations

Throws

  • KAFKA:INVALID_TOPIC_PARTITION

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INPUT_TOO_LARGE

  • KAFKA:CONNECTIVITY

Object Types

Consumer Context

Type for the consumer context.

Field Type Description Default Value Required

Consumer Commit Key

String

Consumer commit key.

CRL File

Specifies the location of the certification revocation list (CRL) file.

Field Type Description Default Value Required

Path

String

Path to the CRL file.

Custom OCSP Responder

Configures a custom OCSP responder for certification revocation checks.

Field Type Description Default Value Required

Url

String

URL of the OCSP responder.

Cert Alias

String

Alias of the signing certificate for the OCSP response. If specified, the alias must be in the truststore.

Expiration Policy

Configures an expiration policy strategy.

Field Type Description Default Value Required

Max Idle Time

Number

Configures the maximum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Max Idle Time field.

Kafka Message Metadata

Metadata of the Kafka message.

Field Type Description Default Value Required

Offset

Number

Offset of the message.

Partition

Number

Partition of the message.

Serialized Key Size

Number

Serialized key size of the message.

Serialized Value Size

Number

Serialized value size of the message.

Timestamp

DateTime

Timestamp of the message.

Topic

String

Topic of the message.

Kafka Record Attributes

Attributes of the Kafka record.

Field Type Description Default Value Required

Consumer Commit Key

String

Consumer commit key.

Creation Timestamp

DateTime

Timestamp of record creation.

Headers

Object

Map of HTTP headers in the message.

Key

Binary

Keys of the record.

Leader Epoch

Number

Leader epoch of the record.

Log Append Timestamp

DateTime

Log append timestamp of the record.

Offset

Number

Offset of the record.

Partition

Number

Partition of the record.

Serialized Key Size

Number

Serialized key size of the record.

Serialized Value Size

Number

Serialized value size of the record.

Topic

String

Topic of the record.

Keystore

Configures the keystore for the TLS protocol. The keystore you generate contains a private key and a public certificate.

Field Type Description Default Value Required

Path

String

Path to the keystore. Mule resolves the path relative to the current classpath and file system.

Type

String

Type of keystore.

Alias

String

Alias of the key to use when the keystore contains multiple private keys. By default, Mule uses the first key in the file.

Key Password

String

Password used to protect the private key.

Password

String

Password used to protect the keystore.

Algorithm

String

Encryption algorithm that the keystore uses.

Reconnect

Configures a standard reconnection strategy, which specifies how often to reconnect and how many reconnection attempts the connector source or operation can make.

Field Type Description Default Value Required

Frequency

Number

How often to attempt to reconnect, in milliseconds.

Blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

Count

Number

How many reconnection attempts the Mule app can make.

Reconnect Forever

Configures a forever reconnection strategy by which the connector source or operation attempts to reconnect at a specified frequency for as long as the Mule app runs.

Field Type Description Default Value Required

Frequency

Number

How often to attempt to reconnect, in milliseconds.

Blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

Reconnection

Configures a reconnection strategy for an operation.

Field Type Description Default Value Required

Fails Deployment

Boolean

What to do if, when an app is deployed, a connectivity test does not pass after exhausting the associated reconnection strategy:

  • true

    Allow the deployment to fail.

  • false

    Ignore the results of the connectivity test.

Reconnection Strategy

Reconnection strategy to use.

Record

Type for records.

Field Type Description Default Value Required

Attributes

Kafka record attributes.

Payload

Binary

Payload for the record.

Redelivery Policy

Configures the redelivery policy for executing requests that generate errors. You can add a redelivery policy to any source in a flow.

Field Type Description Default Value Required

Max Redelivery Count

Number

Maximum number of times that a redelivered request can be processed unsuccessfully before returning a REDELIVERY_EXHAUSTED error.

Use Secure Hash

Boolean

If true, Mule uses a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

Secure hashing algorithm to use if the Use Secure Hash field is true. If the payload of the message is a Java object, Mule ignores this value and returns the value that the payload’s hashCode() returned.

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property may only be set if Use Secure Hash is false.

Object Store

ObjectStore

Configures the object store that stores the redelivery counter for each message.

Repeatable File Store Stream

Configures the repeatable file-store streaming strategy by which Mule keeps a portion of the stream content in memory. If the stream content is larger than the configured buffer size, Mule backs up the buffer’s content to disk and then clears the memory.

Field Type Description Default Value Required

In Memory Size

Number

Maximum amount of memory that the stream can use for data. If the amount of memory exceeds this value, Mule buffers the content to disk. To optimize performance:

  • Configure a larger buffer size to avoid the number of times Mule needs to write the buffer on disk. This increases performance, but it also limits the number of concurrent requests your application can process, because it requires additional memory.

  • Configure a smaller buffer size to decrease memory load at the expense of response time.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the In Memory Size field.

Repeatable In Memory Stream

Configures the in-memory streaming strategy by which the request fails if the data exceeds the MAX buffer size. Always run performance tests to find the optimal buffer size for your specific use case.

Field Type Description Default Value Required

Initial Buffer Size

Number

Initial amount of memory to allocate to the data stream. If the streamed data exceeds this value, the buffer expands by Buffer Size Increment, with an upper limit of Max In Memory Size value.

Buffer Size Increment

Number

Amount by which the buffer size expands if it exceeds its initial size. Setting a value of 0 or lower specifies that the buffer can’t expand.

Max Buffer Size

Number

Maximum amount of memory to use. If more than that is used then a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower than or equal to zero means no limit.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the Initial Buffer Size, Buffer Size Increment, and Buffer Unit fields.

Standard Revocation Check

Configures standard revocation checks for TLS certificates.

Field Type Description Default Value Required

Only End Entities

Boolean

Which elements to verify in the certificate chain:

  • true

    Verify only the last element in the certificate chain.

  • false

    Verify all elements in the certificate chain.

Prefer Crls

Boolean

How to check certificate validity:

  • true

    Check the Certification Revocation List (CRL) for certificate validity.

  • false

    Use the Online Certificate Status Protocol (OCSP) to check certificate validity.

No Fallback

Boolean

Whether to use the secondary method to check certificate validity:

  • true

    Use the method that wasn’t specified in the Prefer Crls field (the secondary method) to check certificate validity.

  • false

    Do not use the secondary method to check certificate validity.

Soft Fail

Boolean

What to do if the revocation server can’t be reached or is busy:

  • true

    Avoid verification failure.

  • false

    Allow the verification to fail.

TLS

Configures TLS to provide secure communications for the Mule app.

Field Type Description Default Value Required

Enabled Protocols

String

Comma-separated list of protocols enabled for this context.

Enabled Cipher Suites

String

Comma-separated list of cipher suites enabled for this context.

Trust Store

Configures the TLS truststore.

Key Store

Configures the TLS keystore.

Revocation Check

Configures a revocation checking mechanism.

Topic Partition

Type for topic partition.

Field Type Description Default Value Required

Topic

String

Topic name.

x

Partition

Number

Number of partitions.

x

Truststore

Configures the truststore for TLS.

Field Type Description Default Value Required

Path

String

Path to the truststore. Mule resolves the path relative to the current classpath and file system.

Password

String

Password used to protect the truststore.

Type

String

Type of truststore.

Algorithm

String

Encryption algorithm that the truststore uses.

Insecure

Boolean

If true, Mule stops performing certificate validations. Setting this to true can make connections vulnerable to attacks.

View on GitHub