Contact Free trial Login

Apache Kafka Connector Reference 4.1 - Mule 4

Support Category: Select

Apache Kafka Connector v4.1

Anypoint Connector for Apache Kafka (Apache Kafka Connector) enables you to interact with the Apache Kafka messaging system. It provides seamless integration between your Mule app and an Apache Kafka cluster, using Mule runtime engine (Mule).

Release Notes: Apache Kafka Connector

Configurations


Consumer Configuration

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Connection

The connection types to provide to this configuration.

x

Default acknowledgment mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka broker instance is notified of the consumption of messages.

  • AUTO: Messages are committed only if the flow finishes successfully.

  • MANUAL: The user must commit manually through the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon reception and before triggering the flow.

  • DUPS_OK: Same as the MANUAL mode, but the commit is made asynchronously, which can lead to duplicate records.

AUTO

Default listener poll timeout

Number

The time, in time units, to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative.

100

Default listener poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. This combines with pollTimeout to define the total timeout for the polling.

MILLISECONDS

Default listener poll timeout

Number

The time, in time units, to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever.

-1

Default operation timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the operation timeout. This combines with operationTimeout to define the total default timeout for the operations that use this configuration.

SECONDS

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. Default value is the one provided by the system.

Connection Types

Consumer Plaintext Connection
Parameters
Name Type Description Default Value Required

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all the Kafka consumers that use this configuration.

Consumer Amount

Number

Determines the number of consumers the connection will initially create.

1

Maximum polling interval

Number

Controls the maximum amount of time the client waits for the response of a request. If the response is not received before the timeout elapses, the client resends the request, or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level.

300

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for request timeout scalar. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally.

If set to READ_COMMITTED, consumer.poll() only transactional messages that have been committed are returned. If set to READ_UNCOMMITTED (default), consumer.poll() all messages are returned, even transactional messages that were aborted. Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in READ_COMMITTED mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is one less than the offset of the first open transaction.

In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, read_committed consumers are not able to read up to the high watermark when there are in-flight transactions. Furthermore, when in read_committed the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do when there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

  • EARLIEST: Automatically reset the offset to the earliest offset.

  • LATEST: Automatically reset the offset to the latest offset.

  • ERROR: Throw an error if no previous offset is found for the consumer’s group.

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, this can be disabled.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Request Timeout

Number

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for request timeout scalar. You can override this parameter at the source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only each entry is resolved and expanded into a list of canonical names.

DEFAULT

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for fetching the heartbeat interval time scalar.

SECONDS

Session Timeout

Number

The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for session timeout scalar.

SECONDS

Connection maximum idle time

Number

Close idle connections after the number of milliseconds specified by this configuration.

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for connections maximum idle time scalar.

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - scram. If the broker configures SSL as the protocol then configure at least the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to subscribe to. Topics are automatically rebalanced between the amount of consumers of the topic.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 causes the server to wait for larger amounts of data to accumulate, which can improve server throughput slightly, at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress. The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for fetch maximum wait timeout scalar.

MILLISECONDS

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Consumer SASL SCRAM Connection
Parameters
Name Type Description Default Value Required

username

String

The username with which to login

x

password

String

The password with which to login

x

encryptionType

Enumeration, one of:

  • SHA256

  • SHA512

The encryption algorithm used by SCRAM

x

Bootstrap Server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Group ID

String

Default group ID for all Kafka consumers that use this configuration

Consumer Amount

Number

The number of consumers the connection initially creates

1

Maximum polling interval

Number

The maximum amount of time that the client waits for the response to a request. If the response is not received before the timeout elapses, either the client resends the request, or the request fails if the specified number of retries are exhausted. You can override this parameter at the source level.

300

Maximum Polling Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout scalar. You can override this parameter at the source level.

SECONDS

Isolation Level

Enumeration, one of:

  • READ_UNCOMMITTED

  • READ_COMMITTED

Controls how to read messages that are written transactionally:

  • If set to READ_COMMITTED, consumer.poll() returns committed transactional messages only.

  • If set to READ_UNCOMMITTED (default), consumer.poll() returns all messages, including transactional messages that were aborted.

Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Therefore, in read_committed mode, consumer.poll() returns messages up to the last stable offset (LSO) only, which is one less than the offset of the first open transaction.

Messages that appear after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, READ_COMMITTED consumers are not able to read up to the high watermark when there are in-flight transactions. When the isolation level is set to READ_COMMITTED, the seekToEnd method returns the LSO.

READ_UNCOMMITTED

Exclude internal topics

Boolean

Whether internal topics that match a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

true

Auto offset reset

Enumeration, one of:

  • EARLIEST

  • LATEST

  • ERROR

Determines what to do if there is no initial offset in Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

  • EARLIEST: Automatically reset the offset to the earliest offset

  • LATEST: Automatically reset the offset to the latest offset

  • ERROR: Throw an error if no previous offset is found for the consumer’s group

LATEST

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

100

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the reconnect backoff timeout scalar

MILLISECONDS

Check CRC

Boolean

Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in situations that require extremely high performance, it can be disabled.

true

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at the source level.

KB

Request Timeout

Number

The maximum amount of time that the client waits for the response to a request. If the response is not received before the timeout elapses, the client resends the request if necessary, or it fails the request if the retries are exhausted. You can override this parameter at the source level.

30

Request Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the request timeout scalar. You can override this parameter at the source level.

SECONDS

Default record limit

Number

The maximum number of records returned on a poll call to the Kafka cluster. You can override this parameter at the source level.

500

DNS Lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups:

  • If set to USE_ALL_DNS_IPS and the lookup returns multiple IP addresses for a hostname, the connector tries to connect to all of the IP addresses before the connection fails. This functionality applies to both bootstrap and advertised servers.

  • If set to RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Heartbeat interval

Number

The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted lower to control the expected time for normal rebalances.

3

Heartbeat Interval Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for fetching the heartbeat interval time scalar

SECONDS

Session Timeout

Number

The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its aliveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

10

Session timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the session timeout scalar

SECONDS

Connection maximum idle time

Number

Closes idle connections after the number of milliseconds specified by this configuration

540

Connection maximum idle time time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the connection’s maximum idle time scalar

SECONDS

TLS Configuration

TLS

Defines a TLS configuration used by both clients and server sides to secure the communication for the Mule app. The connector automatically sets the security.protocol to use for the communication. The valid values are PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL. The default value is PLAINTEXT or SASL_PLAINTEXT for SASL authentication - scram. If the broker configures SSL as the protocol then configure at least the keystore in the tls:context child element of the configuration and the connector will automatically use SSL (or SASL_SSL for SASL authentication) as the security.protocol.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate the server hostname. The default value is an empty string, which means the algorithm is disabled. Clients, including client connections created by the broker for inter-broker communication, verify that the broker hostname matches the hostname in the brokers certificate.

Topic Subscription Patterns

Array of String

The list of subscription regular expressions to which to subscribe. Topics are automatically rebalanced between the consumers of the topic.

Assignments

Array of Topic Partition

The list of topic-partition pairs to assign to a consumer. Consumers are not automatically rebalanced.

Default fetch minimum size

Number

The minimum amount of data the server should return for a fetch request. If insufficient data is available, the request waits for the specified minimum amount of data to accumulate before answering the request.

The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available, unless the fetch request times out waiting for data to arrive.

Setting this value to a number greater than 1 causes the server to wait for larger amounts of data to accumulate. This can improve server throughput slightly, at the cost of some additional latency. You can override this parameter at the source level.

1

Fetch Minimum Size Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the minimum partition fetch size scalar.

BYTE

Default fetch maximum size

Number

The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch is still returned to ensure that the consumer can make progress. As such, this is not an absolute maximum.

The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). The consumer performs multiple fetches in parallel. You can override this parameter at the source level.

1

Default maximum fetch size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Default maximum partition fetch size

Number

The maximum amount of data per partition the server can return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch is still returned to ensure that the consumer can make progress.

The maximum record batch size accepted by the broker is defined using message.max.bytes (broker configuration) or max.message.bytes (topic configuration). See fetch.max.bytes for limiting the consumer request size. You can override this parameter at the source level.

1

Default maximum partition fetch unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the maximum partition fetch size scalar. You can override this parameter at the source level.

MB

Fetch Maximum Wait Timeout

Number

The maximum amount of time the server waits before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement specified by fetch.min.bytes.

500

Fetch Maximum Wait Timeout Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the fetch maximum wait timeout scalar.

MILLISECONDS

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Supported Operations (Consumer)

Associated Input Sources


Producer configuration

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Connection

The connection types to provide to this configuration.

x

Default topic

String

A default topic name to use by the producer operations, overridable at the operation’s configuration level.

defaultTopicName

Zone ID

String

Zone ID is used to convert the provided timestamps into ZonedLocalDateTimes in the results. The default value is the one provided by the system.

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before Mule runtime engine (Mule) considers it eligible for expiration. This does not mean that the instance expires at the exact moment that it becomes eligible. Mule purges the instances when appropriate.

Connection Types

Producer Plaintext Connection
Parameters
Name Type Description Default Value Required

Bootstrap server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar.

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer will block for max.block.ms after which it throws an exception. This setting should generally correspond to the total memory the producer will use, but is not exact because not all memory the producer uses is used for buffering. Some additional memory is used for compression (if compression is enabled) as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core)

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, the connection is attempted on all of the IP addresses before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is none (no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is performed on full batches of data, so the efficacy of batching also impacts the compression ratio (more batching means better compression).

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the connections maximum idle scalar.

SECONDS

Delivery Timeout

Number

An upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar.

SECONDS

Enable idempotence

Boolean

When set to true, the producer will ensure that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures, and so on, and may write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be all. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a Connection Exception is thrown.

false

Linger time

Number

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive. This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the linger time scalar.

SECONDS

Maximum block time

Number

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block. These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the maximum block time scalar.

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. If this setting is set to be greater than 1 and there are failed sends, there is a risk of message reordering due to retries (if retries are enabled).

5

Maximum request size

Number

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this.

1

Maximum request size unit.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

1000

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at source level.

KB

Default request timeout

Number

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The request timeout time unit.

SECONDS

TLS Configuration

TLS

Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the tls:context child element of the listener-config.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Producer SASL SCRAM Connection
Parameters
Name Type Description Default Value Required

Username

String

The username with which to login.

x

Password

String

The password with which to login.

x

EncryptionType

Enumeration, one of:

  • SHA256

  • SHA512

The encryption algorithm used by SCRAM. Only acceptable values are SHA-256 and SHA-512.

x

Bootstrap server URLs

Array of String

The list of servers to bootstrap the connection with the Kafka cluster. This can be a partial list of the available servers.

x

Batch size

Number

The producer attempts to batch records together into fewer requests whenever multiple records are sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt is made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with the data that is available to send. A small batch size makes batching less common and can reduce throughput (a batch size of zero disables batching entirely). A very large batch size can result in more wasteful use of memory as a buffer of the specified batch size is always allocated in anticipation of additional records.

16

The batch size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the batch size scalar.

KB

Buffer size

Number

The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server, the producer will block for max.block.ms after which it throws an exception. This setting should generally correspond to the total memory the producer will use, but is not exact because not all memory the producer uses is used for buffering. Some additional memory is used for compression (if compression is enabled) as well as for maintaining in-flight requests. The default value in the Apache Kafka documentation is 33554432 (32MB), but this should be capped to align with expected values for Mule instances in CloudHub (v0.1 core)

1

The buffer memory size unit of measure.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

KB

DNS lookups

Enumeration, one of:

  • DEFAULT

  • USE_ALL_DNS_IPS

  • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, the connection is attempted on all of the IP addresses before failing the connection. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only, each entry is resolved and expanded into a list of canonical names.

DEFAULT

Compression type

Enumeration, one of:

  • NONE

  • GZIP

  • SNAPPY

  • LZ4

  • ZSTD

The compression type for all data generated by the producer. The default is none (no compression). Valid values are none, gzip, snappy, lz4, or zstd. Compression is performed on full batches of data, so the efficacy of batching also impacts the compression ratio (more batching means better compression).

NONE

Connections maximum idle time

Number

Close idle connections after the specified time is reached.

540

Connections maximum idle time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the connections maximum idle scalar.

SECONDS

Delivery Timeout

Number

An upper limit on the time to report success or failure after a call to send() returns. This limits the total time that a record will be delayed prior to sending, the time to await acknowledgment from the broker (if expected), and the time allowed for retrying send failures. The producer might report failure to send a record earlier than this configuration if either an unrecoverable error is encountered, the retries have been exhausted, or the record is added to a batch that reached an earlier delivery expiration deadline. The value of this configuration should be greater than or equal to the sum of request.timeout.ms and linger.ms.

120

Delivery Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the delivery timeout scalar.

SECONDS

Enable idempotence

Boolean

When set to true, the producer will ensure that exactly one copy of each message is written in the stream. If false, the producer retries due to broker failures, and so on, and may write duplicates of the retried message in the stream. Enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be all. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a Connection Exception is thrown.

false

Linger time

Number

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests, even under moderate load.

This setting accomplishes this by adding a small amount of artificial delay (rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together). This is analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching.

After the specified batch.size worth of records is received for a partition, it is sent immediately regardless of this setting, however if fewer than the specified number of bytes accumulated for this partition is received, the producer "lingers" for the specified time waiting for more records to arrive. This setting defaults to 0 (no delay). Setting linger.ms=5, for example, has the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

0

Linger Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the linger time scalar.

SECONDS

Maximum block time

Number

The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block. These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.

60

Maximum block time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the maximum block time scalar.

SECONDS

Maximum in flight requests

Number

The maximum number of unacknowledged requests the client will send on a single connection before blocking. If this setting is set to be greater than 1 and there are failed sends, there is a risk of message reordering due to retries (if retries are enabled).

5

Maximum request size

Number

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Note that the server has its own cap on record batch size, which may be different from this.

1

Maximum request size unit.

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the max request size scalar.

MB

Producer Acknowledge Mode

Enumeration, one of:

  • NONE

  • LEADER_ONLY

  • ALL

The number of acknowledgments the producer requires the leader to receive before considering a request complete. This controls the durability of records that are sent.

NONE

Default receive buffer size

Number

The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. You can override this parameter at the source level.

64

Default receive buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the receive buffer size scalar. You can override this parameter at the source level.

KB

Retries amount

Number

Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Additionally, produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires before successful acknowledgment. It is recommended that users leave this configuration unset and instead use delivery.timeout.ms to control retry behavior.

1

Retry Backoff Timeout

Number

The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

1000

Retry Backoff Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Determines the time unit for the reconnect backoff timeout scalar.

MILLISECONDS

Default send buffer size

Number

The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. You can override this parameter at the source level.

128

Default send buffer size unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit of measure for the send buffer size scalar. You can override this parameter at source level.

KB

Default request timeout

Number

The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses, the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries.

30

Default request timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The request timeout time unit.

SECONDS

TLS Configuration

TLS

Defines a configuration for TLS, which can be used from both the client and server sides to secure communication for the Mule app. When using the HTTPS protocol, the HTTP communication is secured using TLS or SSL. If HTTPS is configured as the protocol then the user needs to configure at least the keystore in the tls:context child element of the listener-config.

Endpoint identification algorithm

String

The endpoint identification algorithm used by clients to validate server host name. The default value is an empty string, which means it is disabled. Clients including client connections created by the broker for inter-broker communication verify that the broker host name matches the host name in the brokers certificate.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Supported Operations (Producer)

Operations

Commit

<kafka:commit>

Commits the offsets associated to a message or batch of messages consumed in a Message Listener. This would be a List or a single message consumed in the BatchMessageListenerSource.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Consumer commit key

String

The commitKey of the last poll. This operation is valid only when used inside a flow that is using one of the MessageListenerSource(s) ( BatchMessageListenerSource / BatchMessageListenerSource) which inserts this value as an attribute in the Mule Event.

x

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:INVALID_ACK_MODE

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:SESSION_NOT_FOUND

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Consume

<kafka:consume>

This operation allows receiving messages from one or more Kafka topics, it works very similarly to the Message Listener source, so all the operations that apply to that, apply to this operation as well.

Note: The Consume operation works in IMMEDIATE mode only and does not return the consumerCommitKey.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Consumption timeout

Number

The number of TimeUnits that this operation will wait for receiving messages.

Timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The unit of time for the timeout property.

Operation Timeout

Number

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Streaming Strategy

Configure to use repeatable streams.

Target Variable

String

The name of a variable to store the operation’s output.

Target Value

String

An expression to evaluate against the operation’s output and store the expression outcome in the target variable.

#[payload]

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

For Configurations

Throws

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:ILLEGAL_STATE

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Seek

<kafka:seek>

Sets the current offset of the consumer for the given topic and partition to the provided offset value.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Topic

String

The name of the topic on which the seek operation will be performed.

x

Partition

Number

The partition number that will have its offset modified.

x

Operation Timeout

Number

Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:INVALID_TOPIC

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:INVALID_OFFSET

  • KAFKA:INVALID_INPUT

  • KAFKA:NOT_FOUND

  • KAFKA:CONNECTIVITY

Publish

<kafka:publish>

Publish a message to the specified Kafka topic, optionally specifying the partition, key, and message content for it. The publish operation supports transactions.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Topic

String

The topic to publish to.

Partition

Number

(Optional) The topic partition.

Key

Binary

(Optional) Key for the published message.

Message

Binary

(Optional) Message content of the message.

#[payload]

Headers

Object

(Optional) Headers for the message.

Transactional Action

Enumeration, one of:

  • ALWAYS_JOIN

  • JOIN_IF_POSSIBLE

  • NOT_SUPPORTED

The type of joining action that operations can take regarding transactions.

JOIN_IF_POSSIBLE

Target Variable

String

The name of a variable to store the operation’s output.

Target Value

String

An expression to evaluate against the operation’s output and store the expression outcome in the target variable.

#[payload]

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:INVALID_TOPIC_PARTITION

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:TIMEOUT

  • KAFKA:CONNECTIVITY

Input Sources

Batch Message Listener

<kafka:batch-message-listener>

This source supports the consumption of messages from a Kafka cluster, producing a list of messages to the flow.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Poll timeout

Number

The amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling.

Acknowledgment mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka broker instance is notified of the consumption of messages.

  • AUTO: Messages are committed only if the flow finishes successfully.

  • MANUAL: The user must commit manually through the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon reception and before triggering the flow.

  • DUPS_OK: Same as the MANUAL mode, but the commit is made asynchronously, which can lead to duplicate records.

Number of parallel consumers.

Number

1

Primary Node Only

Boolean

Whether this source should be executed only on the primary node when running in a cluster.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Array of Record

Attributes Type

For Configurations

Message Listener

<kafka:message-listener>

This source supports the consumption of messages from a Kafka Cluster, producing a single message to the flow.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Poll timeout

Number

The amount of time to block. Defines the total timeout for polling.

Poll timeout time unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. Used with poll timeout to define the total timeout for the polling.

Acknowledgment mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka broker instance is notified of the consumption of messages.

  • AUTO: Messages are committed only if the flow finishes successfully.

  • MANUAL: The user must commit manually through the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon reception and before triggering the flow.

  • DUPS_OK: Same as the MANUAL mode, but the commit is made asynchronously, which can lead to duplicate records.

Number of parallel consumers

Number

1

Primary Node Only

Boolean

Whether this source should be executed only on the primary node when running in a cluster.

Streaming Strategy

Configure to use repeatable streams.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

For Configurations

Types

TLS

Field Type Description Default Value Required

Enabled Protocols

String

A comma-separated list of protocols enabled for this context.

Enabled Cipher Suites

String

A comma-separated list of cipher suites enabled for this context.

Trust Store

Key Store

Revocation Check

Truststore

Field Type Description Default Value Required

Path

String

The location (which will be resolved relative to the current classpath and file system, if possible) of the truststore.

Password

String

The password used to protect the truststore.

Type

String

The type of store used.

Algorithm

String

The algorithm used by the truststore.

Insecure

Boolean

If true, no certificate validations will be performed, rendering connections vulnerable to attacks. Use at your own risk.

Key Store

Field Type Description Default Value Required

Path

String

The location (which will be resolved relative to the current classpath and file system, if possible) of the keystore.

Type

String

The type of store used.

Alias

String

When the keystore contains many private keys, this attribute indicates the alias of the key that should be used. If not defined, the first key in the file will be used by default.

Key Password

String

The password used to protect the private key.

Password

String

The password used to protect the keystore.

Algorithm

String

The algorithm used by the key store.

Standard Revocation Check

Field Type Description Default Value Required

Only End Entities

Boolean

Only verify the last element of the certificate chain.

Prefer Crls

Boolean

Try CRL instead of OCSP first.

No Fallback

Boolean

Do not use the secondary checking method (the one not selected before).

Soft Fail

Boolean

Avoid verification failure when the revocation server cannot be reached or is busy.

Custom OCSP Responder

Field Type Description Default Value Required

Url

String

The URL of the OCSP responder.

Cert Alias

String

Alias of the signing certificate for the OCSP response (must be in the truststore), if present.

CRL File

Field Type Description Default Value Required

Path

String

The path to the CRL file.

Topic Partition

Field Type Description Default Value Required

Topic

String

x

Partition

Number

x

Reconnection

Field Type Description Default Value Required

Fails Deployment

Boolean

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Reconnection Strategy

The reconnection strategy to use.

Reconnect

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

Count

Number

How many reconnection attempts to make.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Reconnect Forever

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Record

Field Type Description Default Value Required

Attributes

Payload

Binary

Kafka Record Attributes

Field Type Description Default Value Required

Consumer Commit Key

String

Creation Timestamp

DateTime

Headers

Object

Key

Binary

Leader Epoch

Number

Log Append Timestamp

DateTime

Offset

Number

Partition

Number

Serialized Key Size

Number

Serialized Value Size

Number

Topic

String

Consumer Context

Field Type Description Default Value Required

Consumer Commit Key

String

Redelivery Policy

Field Type Description Default Value Required

Max Redelivery Count

Number

The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed-message.

Use Secure Hash

Boolean

Whether to use a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

The secure hashing algorithm to use.

SHA-256

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property can be set only if useSecureHash is false.

Object Store

The object store in which to store the redelivery counter for each message.

Repeatable In Memory Stream

Field Type Description Default Value Required

Initial Buffer Size

Number

The amount of memory to allocate to consume the stream and provide random access to it. If the stream contains more data than can fit into this buffer, then the buffer expands according to the bufferSizeIncrement attribute, with an upper limit of maxInMemorySize.

Buffer Size Increment

Number

This is by how much the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, meaning that a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised when the buffer gets full.

Max Buffer Size

Number

The maximum amount of memory to use. If more than that is used, then a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower than or equal to zero means no limit.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit in which all these attributes are expressed.

Repeatable File Store Stream

Field Type Description Default Value Required

In Memory Size

Number

Defines the maximum memory that the stream should use to keep data in memory. If more than that is consumed then content is buffered on disk.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit in which maxInMemorySize is expressed.

Expiration Policy

Field Type Description Default Value Required

Max Idle Time

Number

A scalar time value for the maximum amount of time a dynamic configuration instance should be allowed to be idle before it’s considered eligible for expiration

Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

A time unit that qualifies the maxIdleTime attribute

Kafka Message Metadata

Field Type Description Default Value Required

Offset

Number

Partition

Number

Serialized Key Size

Number

Serialized Value Size

Number

Timestamp

DateTime

Topic

String