Contact Us 1-800-596-4880

Apache Kafka Connector 4.9 Reference - Mule 4

Anypoint Connector for Apache Kafka (Apache Kafka Connector) enables you to interact with the Apache Kafka messaging system. It provides seamless integration between your Mule app and an Apache Kafka cluster, using Mule runtime engine (Mule).

When a Message Listener or a Batch Message Listener uses the MANUAL acknowledgement mode, you must use a Commit operation at the end of the flow when the flow finishes successfully (either because there were no errors or because there is an On Error Continue component in the flow).
Mule server notifications are not supported because the Kafka library used in the connector manages network disruptions internally.

Consumer Configuration

Configuration for consumers for Apache Kafka Connector.

Name Type Description Default Value Required

Name

String

Name for this configuration. Connectors reference the configuration with this name.

x

Connection

Connection types for this configuration.

x

Default acknowledgement mode

Enumeration, one of

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

AUTO

Default listener poll timeout

Number

Time to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative.

100

Default listener poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default listener poll timeout field. This combines with Poll Timeout to define the total timeout for the polling.

MILLISECONDS

Default operation poll timeout

Number

Time to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever.

-1

Default operation poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default operation timeout field. This combines with Operation Timeout to define the total default timeout for the operations that use this configuration.

SECONDS

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. The default value is provided by the system.

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Consumer Plaintext Connection Type

Use an unauthenticated and non-encrypted connection type.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer Kerberos Connection Type

Use Kerberos configuration files.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Principal

String

Entity that is authenticated by a computer system or a network. Principals can be individual people, computers, services, or computational entities such as processes and threads.

x

Service name

String

Kerberos principal name that Kafka runs as.

x

Kerberos configuration file (krb5.conf)

String

Path to the krb5.conf file, which contains Kerberos configuration information. This information includes the locations of KDCs and admin servers for the Kerberos realms of interest, defaults for the current realm, defaults for Kerberos applications, and the mappings of hostnames to Kerberos realms.

Use ticket cache

Boolean

Set this option to true to obtain the ticket-granting ticket (TGT) from the ticket cache. Set this option to false if you do not want to use the ticket cache. The connector searches for the ticket cache as follows:

  • On Solaris and Linux, the connector looks in /tmp/krb5cc_uid, in which the uid is the numeric user identifier.

  • If the ticket cache is not available in /tmp/krb5cc_uid or the app is on a Windows platform, the connector looks in {user.home}{file.separator}krb5cc_{user.name}. You can override the ticket cache location by setting a value for the Ticket cache field.

    In a Windows environment, if a ticket cannot be retrieved from the file ticket cache, Windows uses the Local Security Authority (LSA) API to get the ticket-granting ticket (TGT).

false

Ticket cache

String

Name of the ticket cache that contains the user’s ticket-granting ticket (TGT). If this value is set, Use ticket cache must also be set to true. Otherwise, a configuration error is returned.

Use keytab

Boolean

Set this option to true if you want the connector to obtain the principal’s key from the keytab. If you don’t set this value, the connector locates the keytab by using the Kerberos configuration file. If the keytab is not specified in the Kerberos configuration file, the connector looks for the {user.home}{file.separator}krb5.keytab file.

false

Keytab

String

Set this option to the file name of the keytab to obtain the principal’s secret key.

Store key

Boolean

Set this option to true to store the principal’s subject private credentials.

false

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer SASL/OAUTHBEARER - Client Credentials Connection Type

OAuth Bearer authentication is a mechanism for authenticating requests using bearer tokens, which are typically used to authenticate users and applications to access resources. Apache Kafka® supports OAuth Bearer authentication for establishing connections securely. Additionally, the connector supports only RFC6749 standard.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Group ID

String

Default Group ID for all the Kafka Consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for request timeout scalar. This parameter can be overridden at source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether internal topics matching a subscribed pattern is excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this field at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this field at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this field at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. This parameter can be overridden at source level.

KB

Request Timeout

Number

The configuration controls the maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This parameter can be overridden at source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for request timeout scalar. This parameter can be overridden at source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. This parameter can be overridden at source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only each entry will be resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for fetch heartbeat interval time scalar.

SECONDS

Session timeout

Number

The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for session timeout scalar.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this config.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for connections maximum idle time scalar.

SECONDS

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, which can be used from both the client and server sides to secure communication for the Mule app. The connector will automatically set the 'security.protocol' to use for communication. Valid values are PLAINTEXT / SSL / SASL_PLAINTEXT / SASL_SSL. Default value when no configuration has been provided is PLAINTEXT(or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain/token). If SSL was configured as protocol on the broker side then the user needs to configure at least the keystore in the 'tls:context' child element of this config and the connector will automatically use SSL(or SASL_SSL for SASL authentication) as 'security.protocol'.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to subscribe to. This topics will be automatically rebalanced between the amount of consumers of the topic.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign. Note that there will be no automatic rebalance of the consumers

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available the request waits for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency. This parameter can be overridden at source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. This parameter can be overridden at source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. This parameter can be overridden at source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). See fetch.max.bytes for limiting the consumer request size.This parameter can be overridden at source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. This parameter can be overridden at source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for fetch maximum wait timeout scalar.

MILLISECONDS

Client ID Key

String

Specifies the configuration key used to define the OAuth client ID within the JAAS configuration.

clientId

Client ID

String

The value for the Client ID which refers to the unique identifier assigned to the OAuth client, facilitating secure authentication and authorization processes.

x

Client Secret Key

String

Specifies the configuration key used to define the OAuth client secret within the JAAS configuration.

clientSecret

Client Secret

String

The value for the client secret which refers to the confidential authentication credential assigned to the OAuth client.

x

OAuth Token Endpoint Key

String

Specifies the configuration key used to define the OAuth token endpoint within the JAAS configuration.

sasl.oauthbearer.token.endpoint.url

OAuth Token Endpoint

String

Describes the URL where clients can request access tokens from the authentication server.

x

Scope Key

String

Specifies the configuration key used to define the OAuth scope within the JAAS configuration.

scope

Scope

String

Defines the specific permissions and access rights granted to the client application by the authorization server.

Audience Key

String

Specifies the configuration key used to define the OAuth audience within the JAAS configuration.

audience

Audience

String

Comma separated list of one or more target audiences for the access token.

Max Token Expiration Seconds

Number

Sets the maximum duration, in seconds, for which an access token remains valid.

Principal Name

String

Unique identifier associated with an authenticated user, often used for authorization decisions and resource access.

Credentials Placement

Enumeration, one of:

  • BASIC_AUTHENTICATION_HEADER

  • BODY

Defines whether to include client credentials in the basic authentication header or in the body of the authentication request.

BASIC_AUTHENTICATION_HEADER

OAuth Module Required

Boolean

Indicates whether this login module must be successfully authenticated for Kafka clients to establish a connection using OAuth 2.0 bearer tokens.

true

Include Accept Header

Boolean

Whether to include 'Accept-application/json' header in the authentication request.

true

OAuth Extensions

Object

Represents a mapping of key-value pairs containing extensions for custom broker OAuth implementations. If you specify a key for the OAuth Extensions field, ensure that the input contains only alphabetic characters and is at least one character long. If you specify a value for OAuth Extensions field, ensure the input contains only printable ASCII characters or whitespace (including space, tab, carriage return, and newline), and the value is at least one character long.

Login Refresh Window Factor

Number

Time for which the login refresh thread waits until a certain portion of the credential’s lifespan passes and attempts to refresh it. Acceptable values range from (0.5) 50% to (1.0) 100%, with a default of (0.8) 80% if unspecified.

0.8

Login Refresh Window Jitter

Number

The maximum random jitter added to the login refresh thread’s sleep time relative to the credential’s lifespan is between (0) 0% and (0.25) 25%, with a default of (0.05) 5% if unspecified.

0.05

Login Refresh Minimum Period

Number

The desired minimum time in seconds for the login refresh thread to wait before refreshing a credential. Legal values are between 0 and 900 (15 minutes).

60

Login Refresh Buffer

Number

The amount of buffer time in seconds before credential expiration to maintain when refreshing a credential. Legal values are between 0 and 3600 (1 hour). A default value of 300 (5 minutes) is used if no value is specified.

300

Use Persistent Connections

Boolean

Indicates whether to use persistent connections:

  • true

Mule uses persistent connections.

  • false

Mule closes the connection after the first request completes.

true

Max Connections

Number

Maximum number of connections to open to the backend. HTTP requests are sent in parallel over multiple connections. Setting this value too high can impact latency and consume additional resources without increasing throughput. By default, the number of connections is unlimited.

-1

Connection Idle Timeout

Number

The number of milliseconds that a connection can remain idle before it is closed. The value of this attribute is only used when persistent connections are enabled.

30000

Stream Response

Boolean

Whether received responses should be streamed, meaning processing continues as soon as all headers are parsed and the body streamed as it arrives. When enabled, the response MUST be eventually read since depending on the configured buffer size it may not fit into memory and processing will stop until space is available.

false

Response Buffer Size

Number

Size of the buffer that stores the HTTP response, in bytes. By default, the space is not limited.

-1

Client Socket Properties

Encapsulates the configurable properties for TCP client socket connections.

Proxy Config Params

Configures a proxy for outbound connections.

Follow Redirects

Boolean

Specifies whether to follow redirects or not.

false

Response Timeout

Number

Maximum time that the request element will block the execution of the flow waiting for the HTTP response.

60000

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy

Consumer SASL/SCRAM Connection Type

Use Salted Challenge Response Authentication Mechanism (SCRAM) or SASL/SCRAM, a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username and password authentication like PLAIN. Apache Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Username

String

Username with which to login.

x

Password

String

Password with which to login.

x

Encryption type

Enumeration, one of:

  • SHA256

  • SHA512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer SASL/PLAIN Connection Type

Use SASL authenticated with a username and password.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Username

String

User used by the client to connect to the Kafka broker.

x

Password

String

Password used by the client to connect to the Kafka broker.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer SASL/TOKEN Connection Type

Use delegation tokens to authenticate to the Kafka cluster.

Due to security reasons, a delegation token cannot be renewed if the initial authentication uses a delegation token. For more information, refer to Delegation Token Support for Kafka.
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection initially creates.

1

Maximum polling interval

Number

The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null group.instance.id which reaches this timeout, partitions are not immediately reassigned. Instead, the consumer stops sending heartbeats and partitions are reassigned after the expiration of session.timeout.ms.

60

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum polling interval field. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

  • READ_COMMITTED

    consumer.poll() returns only committed transactional messages are returned.

  • READ_UNCOMMITTED

    consumer.poll() returns all messages, even transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in the READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Determines whether internal topics matching a subscribed pattern are excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server, for example, because the data was deleted.

  • EARLIEST

    Automatically reset the offset to the earliest offset.

  • LATEST

    Automatically reset the offset to the latest offset.

  • ERROR

    Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Request Timeout

Number

Maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Request Timeout field. You can override this parameter at the source level.

SECONDS

Default record limit

Number

Maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Heartbeat interval

Number

Expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Heartbeat interval field.

SECONDS

Session Timeout

Number

Timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Session Timeout field.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connection maximum idle time field.

SECONDS

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain. If the broker configures SSL as the protocol, then configure at least the keystore in the tls:context child element of the configuration and the connector automatically uses SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

List of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the topic consumers.

Assignments

Array of Topic Partition

List of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

Minimum amount of data the server returns for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch minimum size field. You can override this parameter at the source level.

BYTE

Default fetch maximum size

Number

Maximum amount of data the server returns for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default fetch maximum size field. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

Maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default maximum partition fetch size field. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

Maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Fetch Maximum Wait Timeout field.

MILLISECONDS

Token ID

String

ID of the token.

x

Token HMAC

String

Token HMAC.

x

Encryption type

Enumeration, one of:

  • SCRAM_SHA_256

  • SCRAM_SHA_512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.


Producer Configuration

Configuration for producers for Apache Kafka Connector.

Name Type Description Default Value Required

Name

String

Name for this configuration. Connectors reference the configuration with this name.

x

Connection

Connection types for this configuration.

x

Default topic

String

Default topic name to use by the producer operations, overridable at the operation’s configuration level.

defaultTopicName

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. The default value is provided by the system.

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Producer Plaintext Connection Type

Use an unauthenticated and non-encrypted connection type.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of 0 disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names..

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer Kerberos Connection Type

Use Kerberos configuration files.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of 0 disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Principal

String

Entity that is authenticated by a computer system or a network. Principals can be individual people, computers, services, or computational entities such as processes and threads.

x

Service name

String

Kerberos principal name that Kafka runs as.

x

Kerberos configuration file (krb5.conf)

String

Path to the krb5.conf file, which contains Kerberos configuration information. This information includes the locations of KDCs and admin servers for the Kerberos realms of interest, defaults for the current realm, defaults for Kerberos applications, and the mappings of hostnames to Kerberos realms.

Use ticket cache

Boolean

Set this option to true to obtain the ticket-granting ticket (TGT) from the ticket cache. Set this option to false if you do not want to use the ticket cache. The connector searches for the ticket cache as follows:

  • On Solaris and Linux, the connector looks in /tmp/krb5cc_uid, in which the uid is the numeric user identifier.

  • If the ticket cache is not available in /tmp/krb5cc_uid or the app is on a Windows platform, the connector looks in {user.home}{file.separator}krb5cc_{user.name}. You can override the ticket cache location by setting a value for the Ticket cache field.

    In a Windows environment, if a ticket cannot be retrieved from the file ticket cache, Windows uses the Local Security Authority (LSA) API to get the ticket-granting ticket (TGT).

false

Ticket cache

String

Name of the ticket cache that contains the user’s ticket-granting ticket (TGT). If this value is set, Use ticket cache must also be set to true. Otherwise, a configuration error is returned.

Use keytab

Boolean

Set this option to true if you want the connector to obtain the principal’s key from the keytab. If you don’t set this value, the connector locates the keytab by using the Kerberos configuration file. If the keytab is not specified in the Kerberos configuration file, the connector looks for the {user.home}{file.separator}krb5.keytab file.

false

Keytab

String

Set this option to the file name of the keytab to obtain the principal’s secret key.

Store key

Boolean

Set this option to true to store the principal’s subject private credentials.

false

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer SASL/OAUTHBEARER - Client Credentials Connection Type

OAuth Bearer authentication is a mechanism for authenticating requests using bearer tokens, which are typically used to authenticate users and applications to access resources. Apache Kafka® supports OAuth Bearer authentication for establishing connections securely.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the kafka cluster. This can be a partial list of the available servers.

x

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Batch size

Number

The producer attempts to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar.

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception. This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. The default value in the Kafka docs is of 33554432 (32MB), but this should be capped to align with expected values for mule instances in cloudhub (v0.1 core)

1000

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only each entry will be resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).

NONE

Connections maximum idle time

Number

Close idle connections after the value specified by this config.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the connections maximum idle scalar.

SECONDS

Delivery timeout

Number

An upper bound on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retriable send failures. The producer may report failure to send a record earlier than this config if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch which reached an earlier delivery expiration deadline. The value of this config should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar.

SECONDS

Enable idempotence

Boolean

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc, may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConnectionException will be thrown

false

Linger time

Number

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay?that is, rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it is sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the linger time scalar.

SECONDS

Maximum block time

Number

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the maximum block time scalar.

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

5

Maximum request size

Number

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size which may be different from this.

1

Maximum request size unit.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

MB

Producer acknowledge mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default will be used. This parameter can be overridden at source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. This parameter can be overridden at source level.

KB

Retries amount

Number

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgment. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the retry backoff timeout time scalar.

MILLISECONDS

Retry backoff timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default will be used. This parameter can be overridden at source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. This parameter can be overridden at source level.

KB

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the request timeout time scalar.

SECONDS

Default request timeout

Number

The configuration controls the maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

The configuration controls which partitioning strategy is used when sending messages without providing a key or a partition.

DEFAULT

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, which can be used from both the client and server sides to secure communication for the Mule app. The connector will automatically set the 'security.protocol' to use for communication. Valid values are PLAINTEXT / SSL / SASL_PLAINTEXT / SASL_SSL. Default value when no configuration has been provided is PLAINTEXT(or SASL_PLAINTEXT for SASL authentication - kerberos/scram/plain/token). If SSL was configured as protocol on the broker side then the user needs to configure at least the keystore in the 'tls:context' child element of this config and the connector will automatically use SSL(or SASL_SSL for SASL authentication) as 'security.protocol'.

Client ID Key

String

Specifies the configuration key used to define the OAuth client ID within the JAAS configuration.

clientId

Client ID

String

The value for the Client ID which refers to the unique identifier assigned to the OAuth client, facilitating secure authentication and authorization processes.

x

Client Secret Key

String

Specifies the configuration key used to define the OAuth client secret within the JAAS configuration.

clientSecret

Client Secret

String

The value for the client secret which refers to the confidential authentication credential assigned to the OAuth client.

x

OAuth Token Endpoint Key

String

Specifies the configuration key used to define the OAuth token endpoint within the JAAS configuration.

sasl.oauthbearer.token.endpoint.url

OAuth Token Endpoint

String

Describes the URL where clients can request access tokens from the authentication server.

x

Scope Key

String

Specifies the configuration key used to define the OAuth scope within the JAAS configuration.

scope

Scope

String

Defines the specific permissions and access rights granted to the client application by the authorization server.

Audience Key

String

Specifies the configuration key used to define the OAuth audience within the JAAS configuration.

audience

Audience

String

Comma separated list of one or more target audiences for the access token.

Max Token Expiration Seconds

Number

Sets the maximum duration, in seconds, for which an access token remains valid.

Principal Name

String

Unique identifier associated with an authenticated user, often used for authorization decisions and resource access.

Credentials Placement

Enumeration, one of:

  • BASIC_AUTHENTICATION_HEADER

  • BODY

Defines whether to include client credentials in the basic authentication header or in the body of the authentication request.

BASIC_AUTHENTICATION_HEADER

OAuth Module Required

Boolean

Indicates whether this login module must be successfully authenticated for Kafka clients to establish a connection using OAuth 2.0 bearer tokens.

true

Include Accept Header

Boolean

Whether to include 'Accept-application/json' header in the authentication request.

true

OAuth Extensions

Object

Represents a mapping of key-value pairs containing extensions for custom broker OAuth implementations. If you specify a key for the OAuth Extensions field, ensure that the input contains only alphabetic characters and is at least one character long. If you specify a value for OAuth Extensions field, ensure the input contains only printable ASCII characters or whitespace (including space, tab, carriage return, and newline), and the value is at least one character long.

Login Refresh Window Factor

Number

Time for which the login refresh thread waits until a certain portion of the credential’s lifespan passes and attempts to refresh it. Acceptable values range from (0.5) 50% to (1.0) 100%, with a default of (0.8) 80% if unspecified.

0.8

Login Refresh Window Jitter

Number

The maximum random jitter added to the login refresh thread’s sleep time relative to the credential’s lifespan is between (0) 0% and (0.25) 25%, with a default of (0.05) 5% if unspecified.

0.05

Login Refresh Minimum Period

Number

The desired minimum time in seconds for the login refresh thread to wait before refreshing a credential. Legal values are between 0 and 900 (15 minutes).

60

Login Refresh Buffer

Number

The amount of buffer time in seconds before credential expiration to maintain when refreshing a credential. Legal values are between 0 and 3600 (1 hour). A default value of 300 (5 minutes) is used if no value is specified.

300

Use Persistent Connections

Boolean

Indicates whether to use persistent connections:

  • true

Mule uses persistent connections.

  • false

Mule closes the connection after the first request completes.

true

Max Connections

Number

Maximum number of connections to open to the backend. HTTP requests are sent in parallel over multiple connections. Setting this value too high can impact latency and consume additional resources without increasing throughput. By default, the number of connections is unlimited.

-1

Connection Idle Timeout

Number

The number of milliseconds that a connection can remain idle before it is closed. The value of this attribute is only used when persistent connections are enabled.

30000

Stream Response

Boolean

Whether received responses should be streamed, meaning processing continues as soon as all headers are parsed and the body streamed as it arrives. When enabled, the response MUST be eventually read since depending on the configured buffer size it may not fit into memory and processing will stop until space is available.

false

Response Buffer Size

Number

Size of the buffer that stores the HTTP response, in bytes. By default, the space is not limited.

-1

Client Socket Properties

Encapsulates the configurable properties for TCP client socket connections.

Proxy Config Params

Configures a proxy for outbound connections.

Follow Redirects

Boolean

Specifies whether to follow redirects or not.

false

Response Timeout

Number

Maximum time that the request element will block the execution of the flow waiting for the HTTP response.

60000

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy

Producer SASL/SCRAM Connection Type

Use Salted Challenge Response Authentication Mechanism (SCRAM) or SASL/SCRAM, a family of SASL mechanisms that addresses the security concerns with traditional mechanisms that perform username and password authentication like PLAIN. Apache Kafka supports SCRAM-SHA-256 and SCRAM-SHA-512.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of 0 disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Username

String

Username with which to login.

x

Password

String

Password with which to login.

x

EncryptionType

Enumeration, one of:

  • SHA256

  • SHA512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer SASL/PLAIN Connection Type

Use SASL authenticated with a username and password.

Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Username

String

Username with which to login.

x

Password

String

Password with which to login.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Producer SASL/TOKEN Connection Type

Use delegation tokens to authenticate to the Kafka cluster.

Due to security reasons, a delegation token cannot be renewed if the initial authentication uses a delegation token. For more information, refer to Delegation Token Support for Kafka.
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

List of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

Producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Batch size field.

KB

Buffer size

Number

Total bytes of memory the producer uses to buffer records waiting to send to the server. If records are sent faster than they are delivered to the server, the producer blocks for max.block.ms, after which it throws an exception. This setting generally corresponds to the total memory the producer uses, but is not exact because the memory used by the producer is not all used for buffering. Some additional memory is used for compression (if compression is enabled), as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core).

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Buffer size field.

KB

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to USE_ALL_DNS_IPS then, when the lookup returns multiple IP addresses for a hostname, they attempt to connect before failing the connection. Applies to both bootstrap and advertised servers. If the value is RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY each entry is resolved and expanded into a list of canonical names.

  • USE_ALL_DNS_IPS

    When the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers.

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Each entry is resolved and expanded into a list of canonical names.

USE_ALL_DNS_IPS

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

Compression type for all data generated by the producer. The default is NONE (no compression). Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression.

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Connections maximum idle time field.

SECONDS

Delivery Timeout

Number

Upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record is delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration must be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Delivery Timeout field.

SECONDS

Enable idempotence

Boolean

When set to true, the producer ensures that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures and might write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0. and acks to be all. If these values are not explicitly set by the user, suitable values are chosen. If incompatible values are set, a ConnectionException is thrown.

false

Linger time

Number

Producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they are sent out. However, in some circumstances the client might want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer waits for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive.

This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Linger time field.

SECONDS

Maximum block time

Number

Controls for how long KafkaProducer.send() and KafkaProducer.partitionsFor() are blocked. These methods can be blocked either because the buffer is full or the metadata unavailable.Blocking in the user-supplied serializers or partitioner is not counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Maximum block time field.

SECONDS

Maximum in flight requests

Number

Maximum number of unacknowledged requests the client sends on a single connection before blocking. If the value is greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (if retries are enabled).

5

Maximum request size

Number

Maximum size of a request in bytes. This setting limits the number of record batches the producer sends in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which might be different from this.

1

Maximum request size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Maximum request size field.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

Number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

Size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default receive buffer size field. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than 0 causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 potentially changes the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch might appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

Amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Retry Backoff Timeout field.

MILLISECONDS

Default send buffer size

Number

Size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit of measure for the Default send buffer size field. You can override this parameter at the source level.

KB

Default request timeout

Number

Maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value must be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Default request timeout field.

SECONDS

Partitioner

Enumeration, one of:

  • DEFAULT

  • ROUND_ROBIN

  • UNIFORM_STICKY

Controls the partitioning strategy.

DEFAULT

Additional Properties

Object

Additional properties used to customize the Kafka connection.

For example:

  • For serialization, set Key to key.serializer or value.serializer and Value to com.mulesoft.connectors.kafka.internal.model.serializer.MuleKafkaAvroSerializer.

  • For deserialization, set Key to key.deserializer or value.deserializer and Value to io.confluent.kafka.serializers.KafkaAvroDeserializer.

TLS Configuration

TLS

Defines a TLS configuration, used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT, or SASL_SSL. If the broker configures SSL as the protocol, configure the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

Endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate.

Token ID

String

ID of the token.

x

Token HMAC

String

Token HMAC.

x

Encryption type

Enumeration, one of:

  • SCRAM_SHA_256

  • SCRAM_SHA_512

Encryption algorithm used by SCRAM.

x

Reconnection

Configures a reconnection strategy to use when a connector operation fails to connect to an external server.

Consumer Sources

Batch Message Listener

<kafka:batch-message-listener>

This source supports the consumption of messages from a Kafka cluster, producing a list of messages to the flow.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Poll timeout

Number

Amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Poll timeout field.

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

Amount of parallel consumers.

Number

Number of parallel consumers.

1

Primary Node Only

Boolean

Determines whether to execute this source on only the primary node when running Mule instances in a cluster.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Array of Record

Attributes Type

Associated Configurations

The Batch Message Listener source does not support configurable streaming strategies because the source takes a batch of records as an input stream. The streaming strategy configuration is non-repeatable-stream by default for the source.

Message Listener

<kafka:message-listener>

This source supports the consumption of messages from a Kafka cluster, producing a single message to the flow.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Poll timeout

Number

Amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Poll timeout field.

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

Amount of parallel consumers.

Number

Number of parallel consumers.

1

Primary Node Only

Boolean

Determines whether to execute this source on only the primary node when running Mule instances in a cluster.

Streaming Strategy

Configures how Mule processes streams. Repeatable streams are the default behavior.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

Associated Configurations

Consumer Operations

Commit

<kafka:commit>

Commits the offsets associated to a message or batch of messages consumed in a Message Listener. This is a list or a single message consumed in the Batch Message Listener source.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Consumer commit key

String

Commit key of the last poll. This operation is valid only when used inside a flow that uses one of the sources (Batch Message Listener or Message Listener) which inserts this value as an attribute in the Mule event.

x

Reconnection Strategy

Retry strategy in case of connectivity errors.

Associated Configurations

Throws

  • KAFKA:INVALID_ACK_MODE

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ALREADY_COMMITTED

  • KAFKA:TIMEOUT

  • KAFKA:SESSION_NOT_FOUND

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Consume

<kafka:consume>

Receives messages from one or more Kafka topics. This operation works similarly to the Message Listener source, so all the operations that apply to that, apply to this operation as well.

The Consume operation does not return the consumerCommitKey.
Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Consumption timeout

Number

Number of time units that this operation waits for receiving messages.

Timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Consumption timeout field.

Operation Timeout

Number

Timeout for the operation to start executing.

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Operation Timeout field.

Acknowledgement mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka Broker instance is notified of the consumption of messages.

  • AUTO

    Commits messages automatically if the flow finishes successfully.

  • MANUAL

    The user must commit messages manually upon receipt by using the Commit operation.

  • IMMEDIATE

    Mule automatically commits the messages upon receipt before triggering the flow.

  • DUPS_OK

    Works in the same way as MANUAL, but the commits are asynchronous, which can lead to duplicate records.

IMMEDIATE

Streaming Strategy

Configures how Mule processes streams. Repeatable streams are the default behavior.

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

Associated Configurations

Throws

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ILLEGAL_STATE

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Seek

<kafka:seek>

Sets the current offset of the consumer for the given topic and partition to the provided offset value.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Topic

String

Name of the topic on which the Seek operation is performed.

x

Partition

Number

Partition number that has its offset modified.

x

Offset

Number

Offset value to commit for the configured partition.

x

Operation Timeout

Number

Timeout for the operation to start executing.

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Operation Timeout field.

Reconnection Strategy

Retry strategy in case of connectivity errors.

Associated Configurations

Throws

  • KAFKA:INVALID_TOPIC

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Producer Operation

Bulk Publish

<kafka:bulk-publish>

Publishes a batch messages to the specified Kafka topic, optionally specifying the partition, key, and message content for it. This operation supports transactions.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Messages

Array of Kafka Message

List of messages to publish.

#[payload]

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

Type of joining action that operations can take for transactions.

JOIN_IF_POSSIBLE

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Output

Type

Array of Kafka Message Metadata

Associated Configurations

Throws

  • KAFKA:INVALID_TOPIC_PARTITION

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:AUTHENTICATION_ERROR

  • KAFKA:INPUT_TOO_LARGE

  • KAFKA:CONNECTIVITY

Publish

<kafka:publish>

Publishes a message to the specified Kafka topic, optionally specifying the partition, key, and message content for it. This operation supports transactions.

Name Type Description Default Value Required

Configuration

String

Name of the configuration to use.

x

Topic

String

Topic to publish to.

Partition

Number

(Optional) Topic partition.

Key

Binary

(Optional) Key for the published message.

Message

Binary

(Optional) Message content of the message.

#[payload]

Headers

Object

(Optional) Headers for the message.

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

Type of joining action that operations can take for transactions.

JOIN_IF_POSSIBLE

Target Variable

String

Name of the variable that stores the operation’s output.

Target Value

String

Expression that evaluates the operation’s output. The outcome of the expression is stored in the Target Variable field.

#[payload]

Reconnection Strategy

Retry strategy in case of connectivity errors.

Associated Configurations

Throws

  • KAFKA:INVALID_TOPIC_PARTITION

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INPUT_TOO_LARGE

  • KAFKA:CONNECTIVITY

Object Types

Consumer Context

Type for the consumer context.

Field Type Description Default Value Required

Consumer Commit Key

String

Consumer commit key.

CRL File

Specifies the location of the certification revocation list (CRL) file.

Field Type Description Default Value Required

Path

String

Path to the CRL file.

Custom OCSP Responder

Configures a custom OCSP responder for certification revocation checks.

Field Type Description Default Value Required

Url

String

URL of the OCSP responder.

Cert Alias

String

Alias of the signing certificate for the OCSP response. If specified, the alias must be in the truststore.

Expiration Policy

Configures an expiration policy strategy.

Field Type Description Default Value Required

Max Idle Time

Number

Configures the maximum amount of time that a dynamic configuration instance can remain idle before Mule considers it eligible for expiration.

Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Time unit for the Max Idle Time field.

Kafka Message

Represents a Kafka message.

Field Type Description Default Value Required

Attributes

Attributes associated with the Kafka message.

Payload

Binary

Payload of the Kafka message, represented as an InputStream.

Kafka Message Attributes

Represents the attributes of a Kafka message.

Field Type Description Default Value Required

Headers

Object

Headers of the Kafka message, represented as a map in which the key is a string and the value is an InputStream.

Key

Binary

Key of the Kafka message, represented as an InputStream.

Partition

Number

Partition of the topic where the Kafka message is sent.

Topic

String

Topic to which the Kafka message belongs to.

Kafka Message Metadata

Metadata of the Kafka message.

Field Type Description Default Value Required

Offset

Number

Offset of the message.

Partition

Number

Partition of the message.

Serialized Key Size

Number

Serialized key size of the message.

Serialized Value Size

Number

Serialized value size of the message.

Timestamp

DateTime

Timestamp of the message.

Topic

String

Topic of the message.

Kafka Record Attributes

Attributes of the Kafka record.

Field Type Description Default Value Required

Consumer Commit Key

String

Consumer commit key.

Creation Timestamp

DateTime

Timestamp of record creation.

Headers

Object

Map of HTTP headers in the message.

Key

Binary

Keys of the record.

Leader Epoch

Number

Leader epoch of the record.

Log Append Timestamp

DateTime

Log append timestamp of the record.

Offset

Number

Offset of the record.

Partition

Number

Partition of the record.

Serialized Key Size

Number

Serialized key size of the record.

Serialized Value Size

Number

Serialized value size of the record.

Topic

String

Topic of the record.

Keystore

Configures the keystore for the TLS protocol. The keystore you generate contains a private key and a public certificate.

Field Type Description Default Value Required

Path

String

Path to the keystore. Mule resolves the path relative to the current classpath and file system.

Type

String

Type of keystore.

Alias

String

Alias of the key to use when the keystore contains multiple private keys. By default, Mule uses the first key in the file.

Key Password

String

Password used to protect the private key.

Password

String

Password used to protect the keystore.

Algorithm

String

Encryption algorithm that the keystore uses.

Proxy Config Params

Configures the proxy configuration parameters.

Field Type Description Default Value Required

Host

String

Hostname or IP address of the proxy server.

Port

Number

Port of the proxy server.

Username

String

Username to authenticate against the proxy server.

Password

String

Password to authenticate against the proxy server.

Reconnect

Configures a standard reconnection strategy, which specifies how often to reconnect and how many reconnection attempts the connector source or operation can make.

Field Type Description Default Value Required

Frequency

Number

How often to attempt to reconnect, in milliseconds.

Blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

Count

Number

How many reconnection attempts the Mule app can make.

Reconnect Forever

Configures a forever reconnection strategy by which the connector source or operation attempts to reconnect at a specified frequency for as long as the Mule app runs.

Field Type Description Default Value Required

Frequency

Number

How often to attempt to reconnect, in milliseconds.

Blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

Reconnection

Configures a reconnection strategy for an operation.

Field Type Description Default Value Required

Fails Deployment

Boolean

What to do if, when an app is deployed, a connectivity test does not pass after exhausting the associated reconnection strategy:

  • true

    Allow the deployment to fail.

  • false

    Ignore the results of the connectivity test.

Reconnection Strategy

Reconnection strategy to use.

Record

Type for records.

Field Type Description Default Value Required

Attributes

Kafka record attributes.

Payload

Binary

Payload for the record.

Redelivery Policy

Configures the redelivery policy for executing requests that generate errors. You can add a redelivery policy to any source in a flow.

Field Type Description Default Value Required

Max Redelivery Count

Number

Maximum number of times that a redelivered request can be processed unsuccessfully before returning a REDELIVERY_EXHAUSTED error.

Use Secure Hash

Boolean

If true, Mule uses a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

Secure hashing algorithm to use if the Use Secure Hash field is true. If the payload of the message is a Java object, Mule ignores this value and returns the value that the payload’s hashCode() returned.

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property may only be set if Use Secure Hash is false.

Object Store

ObjectStore

Configures the object store that stores the redelivery counter for each message.

Repeatable File Store Stream

Configures the repeatable file-store streaming strategy by which Mule keeps a portion of the stream content in memory. If the stream content is larger than the configured buffer size, Mule backs up the buffer’s content to disk and then clears the memory.

Field Type Description Default Value Required

In Memory Size

Number

Maximum amount of memory that the stream can use for data. If the amount of memory exceeds this value, Mule buffers the content to disk. To optimize performance:

  • Configure a larger buffer size to avoid the number of times Mule needs to write the buffer on disk. This increases performance, but it also limits the number of concurrent requests your application can process, because it requires additional memory.

  • Configure a smaller buffer size to decrease memory load at the expense of response time.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the In Memory Size field.

Repeatable In Memory Stream

Configures the in-memory streaming strategy by which the request fails if the data exceeds the MAX buffer size. Always run performance tests to find the optimal buffer size for your specific use case.

Field Type Description Default Value Required

Initial Buffer Size

Number

Initial amount of memory to allocate to the data stream. If the streamed data exceeds this value, the buffer expands by Buffer Size Increment, with an upper limit of Max In Memory Size value.

Buffer Size Increment

Number

Amount by which the buffer size expands if it exceeds its initial size. Setting a value of 0 or lower specifies that the buffer can’t expand.

Max Buffer Size

Number

Maximum amount of memory to use. If more than that is used then a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower than or equal to zero means no limit.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

Unit for the Initial Buffer Size, Buffer Size Increment, and Buffer Unit fields.

Standard Revocation Check

Configures standard revocation checks for TLS certificates.

Field Type Description Default Value Required

Only End Entities

Boolean

Which elements to verify in the certificate chain:

  • true

    Verify only the last element in the certificate chain.

  • false

    Verify all elements in the certificate chain.

Prefer Crls

Boolean

How to check certificate validity:

  • true

    Check the Certification Revocation List (CRL) for certificate validity.

  • false

    Use the Online Certificate Status Protocol (OCSP) to check certificate validity.

No Fallback

Boolean

Whether to use the secondary method to check certificate validity:

  • true

    Use the method that wasn’t specified in the Prefer Crls field (the secondary method) to check certificate validity.

  • false

    Do not use the secondary method to check certificate validity.

Soft Fail

Boolean

What to do if the revocation server can’t be reached or is busy:

  • true

    Avoid verification failure.

  • false

    Allow the verification to fail.

Tcp Client Socket Params

Configures the TCP client socket parameters.

Field Type Description Default Value Required

Send Buffer Size

Number

Size of the buffer (in bytes) used when sending data, set on the socket itself.

Receive Buffer Size

Number

Buffer size.

Client Timeout

Number

Sets the SO_TIMEOUT value on sockets. Indicates the amount of time (in milliseconds) that the socket waits in a blocking operation before failing. A value of 0 (the default) means waiting indefinitely.

0

Send Tcp No Delay

Boolean

If set, data is sent immediately rather than being collected for efficiency, prioritizing latency over network traffic reduction. Despite the socket default being false, this option defaults to true because reducing network traffic is generally not a major concern.

true

Linger

Number

Sets the SO_LINGER value in milliseconds. This determines how long the socket remains open to ensure all remaining data is transmitted correctly before closing.

Keep Alive

Boolean

Enables the SO_KEEPALIVE behavior on open sockets. This option periodically checks idle socket connections and closes them if they become unavailable. It is used by server sockets to control whether connections are kept alive before being recycled.

false

Connection Timeout

Number

How long the connector waits before timing out when establishing a connection to the remote service.

30000

TLS

Configures TLS to provide secure communications for the Mule app.

Field Type Description Default Value Required

Enabled Protocols

String

Comma-separated list of protocols enabled for this context.

Enabled Cipher Suites

String

Comma-separated list of cipher suites enabled for this context.

Trust Store

Configures the TLS truststore.

Key Store

Configures the TLS keystore.

Revocation Check

Configures a revocation checking mechanism.

Topic Partition

Type for topic partition.

Field Type Description Default Value Required

Topic

String

Topic name.

x

Partition

Number

Number of partitions.

x

Truststore

Configures the truststore for TLS.

Field Type Description Default Value Required

Path

String

Path to the truststore. Mule resolves the path relative to the current classpath and file system.

Password

String

Password used to protect the truststore.

Type

String

Type of truststore.

Algorithm

String

Encryption algorithm that the truststore uses.

Insecure

Boolean

If true, Mule stops performing certificate validations. Setting this to true can make connections vulnerable to attacks.

View on GitHub