String
Apache Kafka Connector Reference - Mule 4
Support Category: Select
Apache Kafka Connector v4.0
Anypoint Connector for Apache Kafka (Apache Kafka Connector) enables you to interact with the Apache Kafka messaging system. It provides seamless integration between your Mule app and an Apache Kafka cluster, using Mule runtime engine (Mule).
Release Notes: Apache Kafka Connector
Configurations
Consumer Configuration
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
The name for this configuration. Connectors reference the configuration with this name. |
x |
||
Connection |
The connection types to provide to this configuration. |
x |
||
Default acknowledgment mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of the consumption of messages.
|
|
|
Default listener poll timeout |
Number |
The time, in time units, to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative. |
|
|
Default listener poll timeout time unit |
Enumeration, one of:
|
The time unit for the polling timeout. This combines with pollTimeout to define the total timeout for the polling. |
|
|
Default listener poll timeout |
Number |
The time, in time units, to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever. |
|
|
Default operation timeout time unit |
Enumeration, one of:
|
The time unit for the operation timeout. This combines with operationTimeout to define the total default timeout for the operations that use this configuration. |
SECONDS |
|
Zone ID |
String |
Converts the provided timestamps into |
Connection Types
Consumer Plaintext Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap Server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Group ID |
String |
Default group ID for all the Kafka consumers that use this configuration. |
||
Consumer Amount |
Number |
Determines the number of consumers the connection will initially create. |
|
|
Maximum polling interval |
Number |
Controls the maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request, or the request fails if the specified number of retries are exhausted. This parameter can be overridden at the source level. |
|
|
Maximum Polling Interval Time Unit |
Enumeration, one of:
|
Determines the time unit for request timeout scalar. This parameter can be overridden at the source level. |
|
|
Isolation Level |
Enumeration, one of:
|
Controls how to read messages that are written transactionally. If set to In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, |
|
|
Exclude internal topics |
Boolean |
Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic. |
|
|
Auto offset reset |
Enumeration, one of:
|
Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):
|
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the reconnect backoff timeout scalar. |
|
|
Check CRC |
Boolean |
Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. This parameter can be overridden at the source level. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. This parameter can be overridden at the source level. |
|
|
Request Timeout |
Number |
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. This parameter can be overridden at the source level. |
|
|
Request Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for request timeout scalar. This parameter can be overridden at the source level. |
|
|
Default record limit |
Number |
The maximum number of records returned on a poll call to the Kafka cluster. This parameter can be overridden at the source level. |
|
|
DNS Lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups. If set to |
|
|
Heartbeat interval |
Number |
The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than |
|
|
Heartbeat Interval Time Unit |
Enumeration, one of:
|
Determines the time unit for fetching the heartbeat interval time scalar. |
|
|
Session Timeout |
Number |
The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by |
|
|
Session timeout time unit |
Enumeration, one of:
|
Determines the time unit for session timeout scalar. |
|
|
Connection maximum idle time |
Number |
Close idle connections after the number of milliseconds specified by this configuration. |
|
|
Connection maximum idle time time unit |
Enumeration, one of:
|
Determines the time unit for connections maximum idle time scalar. |
|
|
TLS Configuration |
Protocol to use for communication. Valid values are HTTP (default) and HTTPS. When using HTTPS the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the |
|
||
Topic Subscription Patterns |
Array of String |
The list of subscription regular expressions to subscribe to. Topics are automatically rebalanced between the amount of consumers of the topic. |
||
Assignments |
Array of Topic Partition |
The list of topic-partition pairs to assign. Consumers are not automatically rebalanced. |
||
Default fetch minimum size |
Number |
The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 causes the server to wait for larger amounts of data to accumulate, which can improve server throughput slightly, at the cost of some additional latency. This parameter can be overridden at the source level. |
|
|
Fetch Minimum Size Unit |
Enumeration, one of:
|
|
||
Default fetch maximum size |
Number |
The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using |
|
|
Default maximum fetch size unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. This parameter can be overridden at the source level. |
|
|
Default maximum partition fetch size |
Number |
The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using |
1 |
|
Default maximum partition fetch unit |
Enumeration, one of:
|
The unit of measure for the maximum partition fetch size scalar. This parameter can be overridden at the source level. |
|
|
Fetch Maximum Wait Timeout |
Number |
The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by |
|
|
Fetch Maximum Wait Timeout Unit |
Enumeration, one of:
|
Determines the time unit for fetch maximum wait timeout scalar. |
|
|
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to |
Supported Operations
Producer configuration
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Name |
String |
The name for this configuration. Connectors reference the configuration with this name. |
x |
|
Connection |
The connection types to provide to this configuration. |
x |
||
Default topic |
String |
A default topic name to use by the producer operations, overridable at the operation’s configuration level. |
defaultTopicName |
|
Zone ID |
String |
Zone ID is used to convert the provided timestamps into |
||
Expiration Policy |
Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule runtime engine (Mule) considers it eligible for expiration. This does not mean that the instance expires at the exact moment that it becomes eligible. Mule purges the instances when appropriate. |
Connection Types
Producer Plaintext Connection
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Bootstrap server URLs |
Array of String |
The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers. |
x |
|
Batch size |
Number |
The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records. |
|
|
The batch size unit of measure. |
Enumeration, one of:
|
The unit of measure for the batch size scalar. |
|
|
Buffer size |
Number |
The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer will block for |
1 |
|
The buffer memory size unit of measure. |
Enumeration, one of:
|
The unit of measure for the max request size scalar. |
|
|
DNS lookups |
Enumeration, one of:
|
Controls how the client uses DNS lookups. If set to |
|
|
Compression type |
Enumeration, one of:
|
The compression type for all data generated by the producer. The default is none (no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is performed on full batches of data, so the efficacy of batching also impacts the compression ratio (more batching means better compression). |
|
|
Connections maximum idle time |
Number |
Close idle connections after the specified time is reached. |
|
|
Connections maximum idle time unit |
Enumeration, one of:
|
Determines the time unit for the connections maximum idle scalar. |
|
|
Delivery Timeout |
Number |
An upper limit on the time to report success or failure after a call to |
|
|
Delivery Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the delivery timeout scalar. |
SECONDS |
|
Enable idempotence |
Boolean |
When set to |
|
|
Linger time |
Number |
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load. This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching. After the specified |
|
|
Linger Time Unit |
Enumeration, one of:
|
Determines the time unit for the linger time scalar. |
|
|
Maximum block time |
Number |
The configuration controls how long |
|
|
Maximum block time unit |
Enumeration, one of:
|
Determines the time unit for the maximum block time scalar. |
|
|
Maximum in flight requests |
Number |
The maximum number of unacknowledged requests the client will send on a single connection before blocking. If this setting is set to be greater than |
|
|
Maximum request size |
Number |
The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this. |
|
|
Maximum request size unit. |
Enumeration, one of:
|
The unit of measure for the max request size scalar. |
|
|
Producer Acknowledge Mode |
Enumeration, one of:
|
The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent. |
|
|
Default receive buffer size |
Number |
The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is |
|
|
Default receive buffer size unit |
Enumeration, one of:
|
The unit of measure for the receive buffer size scalar. This parameter can be overridden at the source level. |
|
|
Retries amount |
Number |
Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting |
|
|
Retry Backoff Timeout |
Number |
The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. |
|
|
Retry Backoff Timeout Time Unit |
Enumeration, one of:
|
Determines the time unit for the reconnect backoff timeout scalar. |
|
|
Default send buffer size |
Number |
The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is |
|
|
Default send buffer size unit |
Enumeration, one of:
|
The unit of measure for the send buffer size scalar. This parameter can be overridden at source level. |
|
|
Default request timeout |
Number |
The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than |
|
|
Default request timeout time unit |
Enumeration, one of:
|
The request timeout time unit. |
|
|
TLS Configuration |
Protocol to use for communication. Valid values are HTTP (default) and HTTPS. When using HTTPS, the HTTP communication is secured using TLS or SSL. If HTTPS was configured as the protocol, then the user needs to configure at least the keystore in the |
|||
Reconnection |
When the application is deployed, a connectivity test is performed on all connectors. If set to |
Operations
Commit
<kafka:commit>
Commits the offsets associated to a message or batch of messages consumed in a Message Listener. This would be a List or a single message consumed in the BatchMessageListenerSource.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Consumer commit key |
String |
The commitKey of the last poll. This operation is valid only when used inside a flow that is using one of the MessageListenerSource(s) ( BatchMessageListenerSource / BatchMessageListenerSource) which inserts this value as an attribute in the Mule Event. |
x |
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Consume
<kafka:consume>
This operation allows receiving messages from one or more Kafka topics, it works very similarly to the Message Listener source, so all the operations that apply to that, apply to this operation as well.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Consumption timeout |
Number |
The number of TimeUnits that this operation will wait for receiving messages. |
||
Timeout time unit |
Enumeration, one of:
|
The unit of time for the timeout property. |
||
Operation Timeout |
Number |
|||
Operation Timeout Time Unit |
Enumeration, one of:
|
|||
Streaming Strategy |
|
Configure to use repeatable streams. |
||
Target Variable |
String |
The name of a variable to store the operation’s output. |
||
Target Value |
String |
An expression to evaluate against the operation’s output and store the expression outcome in the target variable. |
|
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Seek
<kafka:seek>
Sets the current offset of the consumer for the given topic and partition to the provided offset value.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Topic |
String |
The name of the topic on which the seek operation will be performed. |
x |
|
Partition |
Number |
The partition number that will have its offset modified. |
x |
|
Operation Timeout |
Number |
|||
Operation Timeout Time Unit |
Enumeration, one of:
|
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Publish
<kafka:publish>
Publish a message to the specified Kafka topic, optionally specifying the partition, key, and message content for it. The publish operation supports transactions.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Topic |
String |
The topic to publish to. |
||
Partition |
Number |
(Optional) The topic partition. |
||
Key |
Binary |
(Optional) Key for the published message. |
||
Message |
Binary |
(Optional) Message content of the message. |
|
|
Headers |
Object |
(Optional) Headers for the message. |
||
Transactional Action |
Enumeration, one of:
|
The type of joining action that operations can take regarding transactions. |
|
|
Target Variable |
String |
The name of a variable to store the operation’s output. |
||
Target Value |
String |
An expression to evaluate against the operation’s output and store the expression outcome in the target variable. |
|
|
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Sources
Batch Message Listener
<kafka:batch-message-listener>
This source supports the consumption of messages from a Kafka cluster, producing a list of messages to the flow.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Poll timeout |
Number |
The amount of time to block. Defines the total timeout for polling. |
||
Poll timeout time unit |
Enumeration, one of:
|
The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling. |
||
Acknowledgment mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of the consumption of messages.
|
||
Number of parallel consumers. |
Number |
1 |
||
Primary Node Only |
Boolean |
Whether this source should be executed only on the primary node when running in a cluster. |
||
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Output
Type |
Array of Record |
Attributes Type |
Message Listener
<kafka:message-listener>
This source supports the consumption of messages from a Kafka Cluster, producing a single message to the flow.
Parameters
Name | Type | Description | Default Value | Required |
---|---|---|---|---|
Configuration |
String |
The name of the configuration to use. |
x |
|
Poll timeout |
Number |
The amount of time to block. Defines the total timeout for polling. |
||
Poll timeout time unit |
Enumeration, one of:
|
The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling. |
||
Acknowledgment mode |
Enumeration, one of:
|
Defines the way that the Kafka broker instance is notified of the consumption of messages.
|
||
Number of parallel consumers |
Number |
|
||
Primary Node Only |
Boolean |
Whether this source should be executed only on the primary node when running in a cluster. |
||
Streaming Strategy |
|
Configure to use repeatable streams. |
||
Redelivery Policy |
Defines a policy for processing the redelivery of the same message. |
|||
Reconnection Strategy |
A retry strategy in case of connectivity errors. |
Types
TLS
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Enabled Protocols |
String |
A comma-separated list of protocols enabled for this context. |
||
Enabled Cipher Suites |
String |
A comma-separated list of cipher suites enabled for this context. |
||
Trust Store |
||||
Key Store |
||||
Revocation Check |
Truststore
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
The location (which will be resolved relative to the current classpath and file system, if possible) of the truststore. |
||
Password |
String |
The password used to protect the truststore. |
||
Type |
String |
The type of store used. |
||
Algorithm |
String |
The algorithm used by the truststore. |
||
Insecure |
Boolean |
If true, no certificate validations will be performed, rendering connections vulnerable to attacks. Use at your own risk. |
Key Store
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Path |
String |
The location (which will be resolved relative to the current classpath and file system, if possible) of the keystore. |
||
Type |
String |
The type of store used. |
||
Alias |
String |
When the keystore contains many private keys, this attribute indicates the alias of the key that should be used. If not defined, the first key in the file will be used by default. |
||
Key Password |
String |
The password used to protect the private key. |
||
Password |
String |
The password used to protect the keystore. |
||
Algorithm |
String |
The algorithm used by the key store. |
Standard Revocation Check
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Only End Entities |
Boolean |
Only verify the last element of the certificate chain. |
||
Prefer Crls |
Boolean |
Try CRL instead of OCSP first. |
||
No Fallback |
Boolean |
Do not use the secondary checking method (the one not selected before). |
||
Soft Fail |
Boolean |
Avoid verification failure when the revocation server cannot be reached or is busy. |
Custom OCSP Responder
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Url |
String |
The URL of the OCSP responder. |
||
Cert Alias |
String |
Alias of the signing certificate for the OCSP response (must be in the truststore), if present. |
Reconnection
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Fails Deployment |
Boolean |
When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy. |
||
Reconnection Strategy |
The reconnection strategy to use. |
Reconnect
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often in milliseconds to reconnect. |
||
Count |
Number |
How many reconnection attempts to make. |
||
blocking |
Boolean |
If false, the reconnection strategy runs in a separate, non-blocking thread. |
|
Reconnect Forever
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Frequency |
Number |
How often in milliseconds to reconnect. |
||
blocking |
Boolean |
If |
|
Kafka Record Attributes
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Consumer Commit Key |
String |
|||
Creation Timestamp |
DateTime |
|||
Headers |
Object |
|||
Key |
Binary |
|||
Leader Epoch |
Number |
|||
Log Append Timestamp |
DateTime |
|||
Offset |
Number |
|||
Partition |
Number |
|||
Serialized Key Size |
Number |
|||
Serialized Value Size |
Number |
|||
Topic |
String |
Redelivery Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Redelivery Count |
Number |
The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message. |
||
Use Secure Hash |
Boolean |
Whether to use a secure hash algorithm to identify a redelivered message. |
||
Message Digest Algorithm |
String |
The secure hashing algorithm to use. |
SHA-256 |
|
Id Expression |
String |
Defines one or more expressions to use to determine when a message has been redelivered. This property can be set only if |
||
Object Store |
The object store in which to store the redelivery counter for each message. |
Repeatable In Memory Stream
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Initial Buffer Size |
Number |
The amount of memory to allocate to consume the stream and provide random access to it. If the stream contains more data than can fit into this buffer, then the buffer expands according to the |
||
Buffer Size Increment |
Number |
This is by how much the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a |
||
Max Buffer Size |
Number |
The maximum amount of memory to use. If more than that is used, then a |
||
Buffer Unit |
Enumeration, one of:
|
The unit in which all these attributes are expressed. |
Repeatable File Store Stream
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
In Memory Size |
Number |
Defines the maximum memory that the stream should use to keep data in memory. If more than that is consumed then content is buffered on disk. |
||
Buffer Unit |
Enumeration, one of:
|
The unit in which |
Expiration Policy
Field | Type | Description | Default Value | Required |
---|---|---|---|---|
Max Idle Time |
Number |
A scalar time value for the maximum amount of time a dynamic configuration instance should be allowed to be idle before it’s considered eligible for expiration |
||
Time Unit |
Enumeration, one of:
|
A time unit that qualifies the maxIdleTime attribute |