String
Apache Kafka Connector 4.3 Reference - Mule 4
Anypoint Connector for Apache Kafka (Apache Kafka Connector) enables you to interact with the Apache Kafka messaging system. It provides seamless integration between your Mule app and an Apache Kafka cluster, using Mule runtime engine (Mule).
When a Message Listener or a Batch Message Listener uses MANUAL acknowledgment mode, you must use a Commit operation at the end of the flow when the flow finishes successfully (either because there were no errors or because there is an On Error Continue component in the flow). |
Limitations
-
Mule server notifications are not supported because the Kafka library used in the connector manages network disruptions internally.
-
Apache Kafka Connector doesn’t support distributed tracing.
Configurations
Consumer Configuration
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
The name for this configuration. Connectors reference the configuration with this name. |
x |
||
Connection |
The connection types to provide to this configuration. |
x |
||
Default acknowledgment mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of the consumption of messages.
|
|
|
Default listener poll timeout |
Number |
The time, in time units, to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative. |
|
|
Default listener poll timeout time unit |
Enumeration, one of:
|
The time unit for the polling timeout. This combines with pollTimeout to define the total timeout for the polling. |
|
|
Default listener poll timeout |
Number |
The time, in time units, to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever. |
|
|
Default operation timeout time unit |
Enumeration, one of:
|
The time unit for the operation timeout. This combines with operationTimeout to define the total default timeout for the operations that use this configuration. |
SECONDS |
|
Zone ID |
String |
Converts the provided timestamps into |
Connection Types
Consumer Plaintext Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Group ID |
String |
Default group ID for all the Kafka consumers that use this configuration. |
||
Consumer Amount |
Number |
Determines the number of consumers the connection will initially create. |
|
|
Maximum polling interval |
Number |
The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before the expiration of this timeout, then the consumer is considered failed and the group rebalances to reassign the partitions to another member. For consumers using a non-null |
|
|
Maximum Polling Interval Time Unit |
Enumeration, one of:
|
Determines the time unit for request timeout scalar. You can override this parameter at the source level. |
|
|
Isolation Level |
Enumeration, one of:
|
Controls how to read messages that are written transactionally. If set to In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, |
|
|
Exclude internal topics |
Boolean |
Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic. |
|
|
Auto offset reset |
Enumeration, one of:
|
Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):
|
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the reconnect backoff timeout scalar. |
|
|
Check CRC |
Boolean |
Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at the source level. |
|
|
Request Timeout |
Number |
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level. |
|
|
Request Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for request timeout scalar. You can override this parameter at the source level. |
|
|
Default record limit |
Number |
The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level. |
|
|
DNS Lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups.
|
|
|
Heartbeat interval |
Number |
The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than |
|
|
Heartbeat Interval Time Unit |
Enumeration, one of:
|
Determines the time unit for fetching the heartbeat interval time scalar |
|
|
Session Timeout |
Number |
The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by |
|
|
Session timeout time unit |
Enumeration, one of:
|
Determines the time unit for session timeout scalar |
|
|
Connection maximum idle time |
Number |
Close idle connections after the number of milliseconds specified by this configuration |
|
|
Connection maximum idle time time unit |
Enumeration, one of:
|
The time unit for the connection’s maximum idle time scalar |
|
|
TLS Configuration |
Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the |
|||
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate. |
||
Topic Subscription Patterns |
Array of String |
The list of subscription regular expressions to subscribe to. Topics are automatically rebalanced between the amount of consumers of the topic. |
||
Assignments |
Array of Topic Partition |
The list of topic-partition pairs to assign. Consumers are not automatically rebalanced. |
||
Default fetch minimum size |
Number |
The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level. |
|
|
Fetch Minimum Size Unit |
Enumeration, one of:
|
The unit of measure for the minimum default fetch minimum size scalar. You can override this parameter at the source level. |
|
|
Default fetch maximum size |
Number |
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using |
|
|
Default maximum fetch size unit |
Enumeration, one of:
|
The unit of measure for the default fetch size maximum size scalar. You can override this parameter at the source level. |
|
|
Default maximum partition fetch size |
Number |
The maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using |
1 |
|
Default maximum partition fetch unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level. |
|
|
Fetch Maximum Wait Timeout |
Number |
The maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by |
|
|
Fetch Maximum Wait Timeout Unit |
Enumeration, one of:
|
The time unit for the fetch maximum wait timeout scalar |
MILLISECONDS |
|
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to |
Consumer Kerberos Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Group ID |
String |
Default group ID for the Kafka consumers that use this configuration |
||
Consumer Amount |
Number |
The number of consumers the connection initially creates |
|
|
Maximum polling interval |
Number |
The maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, either the client resends the request or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level. |
|
|
Maximum Polling Interval Time Unit |
Enumeration, one of:
|
The time unit for request timeout scalar. You can override this parameter at the source level. |
|
|
Isolation Level |
Enumeration, one of:
|
Controls how to read messages that are written transactionally:
|
|
|
Exclude internal topics |
Boolean |
Whether to exclude from a subscription internal topics that match a subscribed pattern. It is always possible to explicitly subscribe to an internal topic. |
|
|
Auto offset reset |
Enumeration, one of:
|
Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):
|
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
The time unit for the reconnect backoff timeout scalar |
|
|
Check CRC |
Boolean |
Automatically checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead. In situations that require extremely high performance, the check can be disabled. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer ( |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer ( |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at the source level. |
|
|
Request Timeout |
Number |
The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level. |
|
|
Request Timeout Time Unit |
Enumeration, one of:
|
The time unit for the request timeout scalar. You can override this parameter at the source level. |
|
|
Default record limit |
Number |
The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level. |
|
|
DNS Lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups:
|
|
|
Heartbeat interval |
Number |
The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than the |
|
|
Heartbeat Interval Time Unit |
Enumeration, one of:
|
The time unit for fetching the heartbeat interval time scalar. |
|
|
Session Timeout |
Number |
The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by |
|
|
Session timeout time unit |
Enumeration, one of:
|
The time unit for session timeout scalar |
|
|
Connection maximum idle time |
Number |
Closes idle connections after the number of milliseconds specified by this configuration |
|
|
Connection maximum idle time time unit |
Enumeration, one of:
|
The time unit for the connection’s maximum idle time scalar. |
|
|
TLS Configuration |
Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the |
|||
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the endpoint identification algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the brokers certificate. |
||
Topic Subscription Patterns |
Array of String |
The list of subscription regular expressions to subscribe to. Topics are automatically rebalanced between the amount of consumers of the topic. |
||
Assignments |
Array of Topic Partition |
The list of topic-partition pairs to assign. Consumers are not automatically rebalanced. |
||
Default fetch minimum size |
Number |
The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level. |
|
|
Fetch Minimum Size Unit |
Enumeration, one of:
|
The unit of measure for the minimum default fetch minimum size scalar. You can override this parameter at the source level. |
|
|
Default fetch maximum size |
Number |
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using |
|
|
Default maximum fetch size unit |
Enumeration, one of:
|
The unit of measure for the default fetch size maximum size scalar. You can override this parameter at the source level. |
|
|
Default maximum partition fetch size |
Number |
The maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using |
1 |
|
Default maximum partition fetch unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level. |
|
|
Fetch Maximum Wait Timeout |
Number |
The maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by |
|
|
Fetch Maximum Wait Timeout Unit |
Enumeration, one of:
|
The time unit for the fetch maximum wait timeout scalar |
MILLISECONDS |
|
Principal |
String |
The entity that is authenticated by a computer system or network. Principals can be individual people, computers, services, or computational entities such as processes and threads. |
x |
|
Service name |
String |
The Kerberos principal name that Kafka runs as |
x |
|
Kerberos configuration file (krb5.conf) |
String |
The path to the |
||
Use ticket cache |
Boolean |
Set this option to
|
false |
|
Ticket cache |
String |
The name of the ticket cache that contains the user’s ticket-granting ticket (TGT). If this value is set, |
||
Use keytab |
Boolean |
Set this option to |
false |
|
Keytab |
String |
Set this option to the file name of the keytab to obtain the principal’s secret key. |
||
Store key |
Boolean |
Set option this to |
false |
|
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
Consumer SASL/SCRAM Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
username |
String |
The username with which to login |
x |
|
password |
String |
The password with which to login |
x |
|
encryptionType |
Enumeration, one of:
|
The encryption algorithm used by SCRAM |
x |
|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Group ID |
String |
Default group ID for all Kafka consumers that use this configuration |
||
Consumer Amount |
Number |
The number of consumers the connection initially creates |
|
|
Maximum polling interval |
Number |
The maximum amount of time that the client waits for the response to a request. If the response is not received before the timeout elapses, either the client resends the request, or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level. |
|
|
Maximum Polling Interval Time Unit |
Enumeration, one of:
|
The time unit for the request timeout scalar. You can override this parameter at the source level. |
|
|
Isolation Level |
Enumeration, one of:
|
Controls how to read messages that are written transactionally:
Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Therefore, in |
|
|
Exclude internal topics |
Boolean |
Whether internal topics that match a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic. |
|
|
Auto offset reset |
Enumeration, one of:
|
Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):
|
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
The time unit for the reconnect backoff timeout scalar |
|
|
Check CRC |
Boolean |
Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, it can be disabled. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at the source level. |
|
|
Request Timeout |
Number |
The maximum amount of time that the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or it fails the request if the retries are exhausted. You can override this parameter at the source level. |
|
|
Request Timeout Time Unit |
Enumeration, one of:
|
The time unit for the request timeout scalar. You can override this parameter at the source level. |
|
|
Default record limit |
Number |
The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level. |
|
|
DNS Lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups:
|
|
|
Heartbeat interval |
Number |
The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than |
|
|
Heartbeat Interval Time Unit |
Enumeration, one of:
|
The time unit for fetching the heartbeat interval time scalar |
|
|
Session Timeout |
Number |
The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by |
|
|
Session timeout time unit |
Enumeration, one of:
|
The time unit for the session timeout scalar |
|
|
Connection maximum idle time |
Number |
Closes idle connections after the number of milliseconds specified by this configuration |
|
|
Connection maximum idle time time unit |
Enumeration, one of:
|
The time unit for the connection’s maximum idle time scalar |
|
|
TLS Configuration |
Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the |
|||
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker hostname matches the hostname in the brokers certificate. |
||
Topic Subscription Patterns |
Array of String |
The list of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the consumers of the topic.
|
||
Assignments |
Array of Topic Partition |
The list of topic-partition pairs to assign to a consumer. Consumers are not automatically rebalanced.
|
||
Default fetch minimum size |
Number |
The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request.
|
|
|
Fetch Minimum Size Unit |
Enumeration, one of:
|
The unit of measure for the minimum partition fetch size scalar. |
|
|
Default fetch maximum size |
Number |
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum.
|
|
|
Default maximum fetch size unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level. |
|
|
Default maximum partition fetch size |
Number |
The maximum amount of data per partition the server can return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress.
|
1 |
|
Default maximum partition fetch unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level. |
|
|
Fetch Maximum Wait Timeout |
Number |
The maximum amount of time the server waits before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by |
|
|
Fetch Maximum Wait Timeout Unit |
Enumeration, one of:
|
The time unit for the fetch maximum wait timeout scalar. |
|
|
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to |
Consumer SASL/PLAIN Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate the server host name. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the broker’s certificate. |
||
Group ID |
String |
Default group ID for the Kafka consumers that use this configuration |
||
Consumer Amount |
Number |
The number of consumers the connection initially creates |
|
|
Maximum polling interval |
Number |
The maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, either the client resends the request or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level. |
|
|
Maximum Polling Interval Time Unit |
Enumeration, one of:
|
The time unit for request timeout scalar. You can override this parameter at the source level. |
|
|
Isolation Level |
Enumeration, one of:
|
Controls how to read messages that are written transactionally:
|
|
|
Exclude internal topics |
Boolean |
Whether to exclude from a subscription internal topics that match a subscribed pattern. It is always possible to explicitly subscribe to an internal topic. |
|
|
Auto offset reset |
Enumeration, one of:
|
Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):
|
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
The time unit for the reconnect backoff timeout scalar |
|
|
Check CRC |
Boolean |
Automatically checks the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead. In situations that require extremely high performance, the check can be disabled. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer ( |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer ( |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at the source level. |
|
|
Request Timeout |
Number |
The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level. |
|
|
Request Timeout Time Unit |
Enumeration, one of:
|
The time unit for the request timeout scalar. You can override this parameter at the source level. |
|
|
Default record limit |
Number |
The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level. |
|
|
DNS Lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups:
|
|
|
Heartbeat interval |
Number |
The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than the |
|
|
Heartbeat Interval Time Unit |
Enumeration, one of:
|
The time unit for fetching the heartbeat interval time scalar. |
|
|
Session Timeout |
Number |
The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by |
|
|
Session timeout time unit |
Enumeration, one of:
|
The time unit for session timeout scalar |
|
|
Connection maximum idle time |
Number |
Closes idle connections after the number of milliseconds specified by this configuration |
|
|
Connection maximum idle time time unit |
Enumeration, one of:
|
The time unit for the connection’s maximum idle time scalar. |
|
|
TLS Configuration |
Defines a TLS configuration, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol, you must configure at least the keystore in the |
|||
Topic Subscription Patterns |
Array of String |
The list of subscription regular expressions to subscribe to. Topics are automatically rebalanced between the amount of consumers of the topic. |
||
Assignments |
Array of Topic Partition |
The list of topic-partition pairs to assign. Consumers are not automatically rebalanced. |
||
Default fetch minimum size |
Number |
The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. If you set this to a value greater than 1, the server waits for larger amounts of data to accumulate, which can improve server throughput slightly at the cost of some additional latency. You can override this parameter at the source level. |
|
|
Fetch Minimum Size Unit |
Enumeration, one of:
|
The unit of measure for the minimum default fetch minimum size scalar. You can override this parameter at the source level. |
|
|
Default fetch maximum size |
Number |
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using |
|
|
Default maximum fetch size unit |
Enumeration, one of:
|
The unit of measure for the default fetch size maximum size scalar. You can override this parameter at the source level. |
|
|
Default maximum partition fetch size |
Number |
The maximum amount of data per partition that the server returns. The consumer fetches records in batches. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using |
1 |
|
Default maximum partition fetch unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level. |
|
|
Fetch Maximum Wait Timeout |
Number |
The maximum amount of time the server blocks before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by |
|
|
Fetch Maximum Wait Timeout Unit |
Enumeration, one of:
|
The time unit for the fetch maximum wait timeout scalar |
MILLISECONDS |
|
Username |
String |
The user used by the client to connect to the Kafka broker |
x |
|
Password |
String |
The password used by the client to connect to the Kafka broker |
x |
|
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
Associated Input Sources
Producer configuration
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
String |
The name for this configuration. Connectors reference the configuration with this name. |
x |
|
Connection |
The connection types to provide to this configuration. |
x |
||
Default topic |
String |
A default topic name to use by the producer operations, overridable at the operation’s configuration level. |
defaultTopicName |
|
Zone ID |
String |
Zone ID is used to convert the provided timestamps into |
||
Expiration Policy |
Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule runtime engine (Mule) considers it eligible for expiration. This does not mean that the instance expires at the exact moment that it becomes eligible. Mule purges the instances when appropriate. |
Connection Types
Producer Plaintext Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Batch size |
Number |
The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records. |
|
|
The batch size unit of measure. |
Enumeration, one of:
|
The unit of measure for the batch size scalar. |
|
|
Buffer size |
Number |
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer blocks for |
1 |
|
The buffer memory size unit of measure. |
Enumeration, one of:
|
The unit of measure for the max request size scalar. |
|
|
DNS lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups. If set to |
|
|
Compression type |
Enumeration, one of:
|
The compression type for all data generated by the producer. The default is none (no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is performed on full batches of data, so the efficacy of batching also impacts the compression ratio (more batching means better compression). |
|
|
Connections maximum idle time |
Number |
Close idle connections after the specified time is reached. |
|
|
Connections maximum idle time unit |
Enumeration, one of:
|
Determines the time unit for the connection’s maximum idle scalar. |
|
|
Delivery Timeout |
Number |
An upper limit on the time to report success or failure after a call to |
|
|
Delivery Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the delivery timeout scalar. |
SECONDS |
|
Enable idempotence |
Boolean |
When set to |
|
|
Linger time |
Number |
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load.
|
|
|
Linger Time Unit |
Enumeration, one of:
|
Determines the time unit for the linger time scalar |
|
|
Maximum block time |
Number |
The configuration controls for how long |
|
|
Maximum block time unit |
Enumeration, one of:
|
Determines the time unit for the maximum block time scalar |
|
|
Maximum in flight requests |
Number |
The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than |
|
|
Maximum request size |
Number |
The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this. |
|
|
Maximum request size unit |
Enumeration, one of:
|
The unit of measure for the max request size scalar. |
|
|
Producer Acknowledge Mode |
Enumeration, one of:
|
The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
|
|
Retries amount |
Number |
Setting a value greater than zero causes the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting |
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the reconnect backoff timeout scalar. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at source level. |
|
|
Default request timeout |
Number |
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than |
|
|
Default request timeout time unit |
Enumeration, one of:
|
The request timeout time unit. |
|
|
TLS Configuration |
Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the |
|||
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate. |
||
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to |
Producer Kerberos Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Batch size |
Number |
The producer attempts to batch records together into fewer requests when multiple records are sent to the same partition. This helps performance on both the client and the server. This parameter controls the default batch size, in bytes. No attempt is made to batch records larger than this size.
|
16 |
|
The batch size unit of measure |
Enumeration, one of:
|
The unit of measure for the batch size scalar. |
KB |
|
Buffer size |
Number |
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered, the producer blocks for |
1000 |
|
The buffer memory size unit of measure |
Enumeration, one of:
|
The unit of measure for the max request size scalar. |
KB |
|
DNS lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups:
|
DEFAULT |
|
Compression type |
Enumeration, one of:
|
The compression type for all data generated by the producer. The default is no compression. Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression. |
NONE |
|
Connections maximum idle time |
Number |
Closes idle connections after the value specified by this parameter |
540 |
|
Connections maximum idle time unit |
Enumeration, one of:
|
The time unit for the connection’s maximum idle scalar |
SECONDS |
|
Delivery timeout |
Number |
An upper bound on the time to report success or failure after a call to |
120 |
|
Delivery Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the delivery timeout scalar |
SECONDS |
|
Enable idempotence |
Boolean |
When set to |
false |
|
Linger time |
Number |
The producer groups records that arrive in between request transmissions into a single batched request. Normally, this occurs under load, when records arrive faster than they can be sent out. However, in some circumstances, the client might want to reduce the number of requests, even under a moderate load.
|
0 |
|
Linger Time Unit |
Enumeration, one of:
|
The time unit for the linger time scalar |
SECONDS |
|
Maximum block time |
Number |
Specifies for how long |
60 |
|
Maximum block time unit |
Enumeration, one of:
|
The time unit for the maximum block time scalar |
SECONDS |
|
Maximum in flight requests |
Number |
The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than |
5 |
|
Maximum request size |
Number |
The maximum size of a request, in bytes. This setting limits the number of record batches the producer sends in a single request. This effectively provides a cap on the maximum record batch size. The server has its own cap on record batch size, which can be different from this value. |
1 |
|
Maximum request size unit. |
Enumeration, one of:
|
The unit of measure for the max request size scalar |
MB |
|
Producer acknowledge mode |
Enumeration, one of:
|
The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent. |
NONE |
|
Default receive buffer size |
Number |
The size of the TCP receive buffer ( |
64 |
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
KB |
|
Retries amount |
Number |
Setting a value greater than zero causes the client to resend any record whose send failed with a potentially transient error. This retry is the same as having the client resend the record upon receiving the error. Allowing retries without setting
|
1 |
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
The time unit for the retry backoff timeout time scalar |
MILLISECONDS |
|
Retry backoff timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
100 |
|
Default send buffer size |
Number |
The size of the TCP send buffer ( |
128 |
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at the source level. |
KB |
|
Default request timeout time unit |
Enumeration, one of:
|
The time unit for the request timeout time scalar |
SECONDS |
|
Default request timeout |
Number |
The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value should be larger than |
30 |
|
TLS Configuration |
Protocol to use for communication. Valid values are HTTP (default) and HTTPS. When using HTTPS, the HTTP communication is secured using TLS or SSL. If HTTPS was configured as the protocol, then the user needs to configure at least the keystore in the |
|||
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
Producer SASL/SCRAM Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Username |
String |
The username with which to login. |
x |
|
Password |
String |
The password with which to login. |
x |
|
EncryptionType |
Enumeration, one of:
|
The encryption algorithm used by SCRAM. Only acceptable values are SHA-256 and SHA-512. |
x |
|
Bootstrap server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Batch size |
Number |
The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records. |
|
|
The batch size unit of measure. |
Enumeration, one of:
|
The unit of measure for the batch size scalar |
|
|
Buffer size |
Number |
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer blocks for |
1 |
|
The buffer memory size unit of measure. |
Enumeration, one of:
|
The unit of measure for the max request size scalar |
|
|
DNS lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups. If set to |
|
|
Compression type |
Enumeration, one of:
|
The compression type for all data generated by the producer. The default is none (no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is performed on full batches of data, so the efficacy of batching also impacts the compression ratio (more batching means better compression). |
|
|
Connections maximum idle time |
Number |
Close idle connections after the specified time is reached. |
|
|
Connections maximum idle time unit |
Enumeration, one of:
|
Determines the time unit for the connection’s maximum idle scalar. |
|
|
Delivery Timeout |
Number |
An upper limit on the time to report success or failure after a call to |
|
|
Delivery Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the delivery timeout scalar. |
SECONDS |
|
Enable idempotence |
Boolean |
When set to |
|
|
Linger time |
Number |
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load. This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching. After the specified |
|
|
Linger Time Unit |
Enumeration, one of:
|
Determines the time unit for the linger time scalar. |
|
|
Maximum block time |
Number |
The configuration controls how long |
|
|
Maximum block time unit |
Enumeration, one of:
|
Determines the time unit for the maximum block time scalar. |
|
|
Maximum in flight requests |
Number |
The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than |
|
|
Maximum request size |
Number |
The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this. |
|
|
Maximum request size unit. |
Enumeration, one of:
|
The unit of measure for the max request size scalar. |
|
|
Producer Acknowledge Mode |
Enumeration, one of:
|
The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
|
|
Retries amount |
Number |
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting |
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the reconnect backoff timeout scalar. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at source level. |
|
|
Default request timeout |
Number |
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than |
|
|
Default request timeout time unit |
Enumeration, one of:
|
The request timeout time unit. |
|
|
TLS Configuration |
Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the |
|||
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate. |
||
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to |
Producer SASL/PLAIN Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Endpoint identification algorithm |
String |
The endpoint identification algorithm used by clients to validate the server host name. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker host name matches the host name in the broker’s certificate. |
||
Batch size |
Number |
The producer attempts to batch records together into fewer requests when multiple records are sent to the same partition. This helps performance on both the client and the server. This parameter controls the default batch size, in bytes. No attempt is made to batch records larger than this size.
|
16 |
|
The batch size unit of measure |
Enumeration, one of:
|
The unit of measure for the batch size scalar |
KB |
|
Buffer size |
Number |
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered, the producer blocks for |
1000 |
|
The buffer memory size unit of measure |
Enumeration, one of:
|
The unit of measure for the max request size scalar |
KB |
|
DNS lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups:
|
DEFAULT |
|
Compression type |
Enumeration, one of:
|
The compression type for all data generated by the producer. The default is no compression. Compression works on full batches of data, so the efficacy of batching also impacts the compression ratio. More batching means better compression. |
NONE |
|
Connections maximum idle time |
Number |
Closes idle connections after the value specified by this parameter |
540 |
|
Connections maximum idle time unit |
Enumeration, one of:
|
The time unit for the connection’s maximum idle scalar |
SECONDS |
|
Delivery timeout |
Number |
An upper bound on the time to report success or failure after a call to |
120 |
|
Delivery Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the delivery timeout scalar |
SECONDS |
|
Enable idempotence |
Boolean |
When set to |
false |
|
Linger time |
Number |
The producer groups records that arrive in between request transmissions into a single batched request. Normally, this occurs under load, when records arrive faster than they can be sent out. However, in some circumstances, the client might want to reduce the number of requests, even under a moderate load.
|
0 |
|
Linger Time Unit |
Enumeration, one of:
|
The time unit for the linger time scalar |
SECONDS |
|
Maximum block time |
Number |
Specifies for how long |
60 |
|
Maximum block time unit |
Enumeration, one of:
|
The time unit for the maximum block time scalar |
SECONDS |
|
Maximum in flight requests |
Number |
The maximum number of unacknowledged requests the client will send on a single connection before blocking. If the value is greater than |
5 |
|
Maximum request size |
Number |
The maximum size of a request, in bytes. This setting limits the number of record batches the producer sends in a single request. This effectively provides a cap on the maximum record batch size. The server has its own cap on record batch size, which can be different from this value. |
1 |
|
Maximum request size unit. |
Enumeration, one of:
|
The unit of measure for the max request size scalar |
MB |
|
Producer acknowledge mode |
Enumeration, one of:
|
The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent. |
NONE |
|
Default receive buffer size |
Number |
The size of the TCP receive buffer ( |
64 |
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. You can override this parameter at the source level. |
KB |
|
Retries amount |
Number |
Setting a value greater than zero causes the client to resend any record whose send failed with a potentially transient error. This retry is the same as having the client resend the record upon receiving the error. Allowing retries without setting
|
1 |
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
The time unit for the retry backoff timeout time scalar |
MILLISECONDS |
|
Retry backoff timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
100 |
|
Default send buffer size |
Number |
The size of the TCP send buffer ( |
128 |
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. You can override this parameter at the source level. |
KB |
|
Default request timeout time unit |
Enumeration, one of:
|
The time unit for the request timeout time scalar |
SECONDS |
|
Default request timeout |
Number |
The maximum amount of time the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary or fails the request if retries are exhausted. This value should be larger than |
30 |
|
TLS Configuration |
Protocol to use for communication. Valid values are HTTP (default) and HTTPS. When using HTTPS, the HTTP communication is secured using TLS or SSL. If HTTPS was configured as the protocol, then the user needs to configure at least the keystore in the |
|||
Username |
String |
The user used by the client to connect to the Kafka broker. |
x |
|
Password |
String |
The password used by the client to connect to the Kafka broker. |
x |
|
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
Operations
Commit
<kafka:commit>
Commits the offsets associated to a message or batch of messages consumed in a Message Listener. This would be a List or a single message consumed in the BatchMessageListenerSource.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Consumer commit key |
String |
The commitKey of the last poll. This operation is valid only when used inside a flow that is using one of the MessageListenerSource(s) ( BatchMessageListenerSource / BatchMessageListenerSource) which inserts this value as an attribute in the Mule Event. |
x |
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Consume
<kafka:consume>
This operation allows receiving messages from one or more Kafka topics, it works very similarly to the Message Listener source, so all the operations that apply to that, apply to this operation as well.
Note: The Consume operation works in IMMEDIATE mode only and does not return the consumerCommitKey
.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Consumption timeout |
Number |
The number of TimeUnits that this operation will wait for receiving messages. |
||
Timeout time unit |
Enumeration, one of:
|
The unit of time for the timeout property. |
||
Operation Timeout |
Number |
|||
Operation Timeout Time Unit |
Enumeration, one of:
|
|||
Streaming Strategy |
|
Configure to use repeatable streams. |
||
Target Variable |
String |
The name of a variable to store the operation’s output. |
||
Target Value |
String |
An expression to evaluate against the operation’s output and store the expression outcome in the target variable. |
|
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Seek
<kafka:seek>
Sets the current offset of the consumer for the given topic and partition to the provided offset value.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Topic |
String |
The name of the topic on which the seek operation is performed. |
x |
|
Partition |
Number |
The partition number that will have its offset modified. |
x |
|
Operation Timeout |
Number |
|||
Operation Timeout Time Unit |
Enumeration, one of:
|
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Publish
<kafka:publish>
Publish a message to the specified Kafka topic, optionally specifying the partition, key, and message content for it. The publish operation supports transactions.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Topic |
String |
The topic to publish to. |
||
Partition |
Number |
(Optional) The topic partition. |
||
Key |
Binary |
(Optional) Key for the published message. |
||
Message |
Binary |
(Optional) Message content of the message. |
|
|
Headers |
Object |
(Optional) Headers for the message. |
||
Transactional Action |
Enumeration, one of:
|
The type of joining action that operations can take for transactions. |
|
|
Target Variable |
String |
The name of a variable to store the operation’s output. |
||
Target Value |
String |
An expression to evaluate against the operation’s output and store the expression outcome in the target variable. |
|
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Input Sources
Batch Message Listener
<kafka:batch-message-listener>
This source supports the consumption of messages from a Kafka cluster, producing a list of messages to the flow.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Poll timeout |
Number |
The amount of time to block. Defines the total timeout for polling. |
||
Poll timeout time unit |
Enumeration, one of:
|
The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling. |
||
Acknowledgment mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of the consumption of messages.
|
||
Number of parallel consumers. |
Number |
1 |
||
Primary Node Only |
Boolean |
Whether this source should be executed only on the primary node when running in a cluster. |
||
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Output
Type |
Array of Record |
Attributes Type |
For Configurations
The Batch Message Listener source does not support configurable streaming strategies because the source takes a batch of records as an input stream. The streaming strategy configuration is non-repeatable-stream by default for the source. |
Message Listener
<kafka:message-listener>
This source supports the consumption of messages from a Kafka Cluster, producing a single message to the flow.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Poll timeout |
Number |
The amount of time to block. Defines the total timeout for polling. |
||
Poll timeout time unit |
Enumeration, one of:
|
The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling. |
||
Acknowledgment mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of the consumption of messages.
|
||
Number of parallel consumers |
Number |
|
||
Primary Node Only |
Boolean |
Whether this source should be executed only on the primary node when running in a cluster. |
||
Streaming Strategy |
|
Configure to use repeatable streams. |
||
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Types
TLS
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Enabled Protocols |
String |
A comma-separated list of protocols enabled for this context. |
||
Enabled Cipher Suites |
String |
A comma-separated list of cipher suites enabled for this context. |
||
Trust Store |
||||
Key Store |
||||
Revocation Check |
Truststore
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
The location (which will be resolved relative to the current classpath and file system, if possible) of the truststore. |
||
Password |
String |
The password used to protect the truststore. |
||
Type |
String |
The type of store used. |
||
Algorithm |
String |
The algorithm used by the truststore. |
||
Insecure |
Boolean |
If true, no certificate validations will be performed, rendering connections vulnerable to attacks. Use at your own risk. |
Key Store
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
The location (which will be resolved relative to the current classpath and file system, if possible) of the keystore. |
||
Type |
String |
The type of store used. |
||
Alias |
String |
When the keystore contains many private keys, this attribute indicates the alias of the key that should be used. If not defined, the first key in the file will be used by default. |
||
Key Password |
String |
The password used to protect the private key. |
||
Password |
String |
The password used to protect the keystore. |
||
Algorithm |
String |
The algorithm used by the key store. |
Standard Revocation Check
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Only End Entities |
Boolean |
Only verify the last element of the certificate chain. |
||
Prefer Crls |
Boolean |
Try CRL instead of OCSP first. |
||
No Fallback |
Boolean |
Do not use the secondary checking method (the one not selected before). |
||
Soft Fail |
Boolean |
Avoid verification failure when the revocation server cannot be reached or is busy. |
Custom OCSP Responder
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Url |
String |
The URL of the OCSP responder. |
||
Cert Alias |
String |
Alias of the signing certificate for the OCSP response (must be in the truststore), if present. |
Reconnection
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Fails Deployment |
Boolean |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
||
Reconnection Strategy |
The reconnection strategy to use. |
Reconnect
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often in milliseconds to reconnect. |
||
Count |
Number |
How many reconnection attempts to make. |
||
blocking |
Boolean |
If false, the reconnection strategy runs in a separate, non-blocking thread. |
|
Reconnect Forever
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often in milliseconds to reconnect. |
||
blocking |
Boolean |
If |
|
Kafka Record Attributes
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Consumer Commit Key |
String |
|||
Creation Timestamp |
DateTime |
|||
Headers |
Object |
|||
Key |
Binary |
|||
Leader Epoch |
Number |
|||
Log Append Timestamp |
DateTime |
|||
Offset |
Number |
|||
Partition |
Number |
|||
Serialized Key Size |
Number |
|||
Serialized Value Size |
Number |
|||
Topic |
String |
Redelivery Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Redelivery Count |
Number |
The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message. |
||
Use Secure Hash |
Boolean |
Whether to use a secure hash algorithm to identify a redelivered message. |
||
Message Digest Algorithm |
String |
The secure hashing algorithm to use. |
SHA-256 |
|
Id Expression |
String |
Defines one or more expressions to use to determine when a message has been redelivered. This property can be set only if |
||
Object Store |
ObjectStore |
The object store in which to store the redelivery counter for each message. |
Repeatable In Memory Stream
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Initial Buffer Size |
Number |
The amount of memory to allocate to consume the stream and provide random access to it. If the stream contains more data than can fit into this buffer, then the buffer expands according to the |
||
Buffer Size Increment |
Number |
This is by how much the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a |
||
Max Buffer Size |
Number |
The maximum amount of memory to use. If more than that is used, then a |
||
Buffer Unit |
Enumeration, one of:
|
The unit in which all these attributes are expressed. |
Repeatable File Store Stream
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
In Memory Size |
Number |
Defines the maximum memory that the stream should use to keep data in memory. If more than that is consumed then content is buffered on disk. |
||
Buffer Unit |
Enumeration, one of:
|
The unit in which |
Expiration Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Idle Time |
Number |
A scalar time value for the maximum amount of time a dynamic configuration instance should be allowed to be idle before it’s considered eligible for expiration |
||
Time Unit |
Enumeration, one of:
|
A time unit that qualifies the maxIdleTime attribute |