Contact Free trial Login

Apache Kafka Connector Reference

Support Category: Select

Apache Kafka Connector v3.0

Configurations


Consumer Configuration

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Connection

The connection types to provide to this configuration.

x

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before the runtime considers it eligible for expiration. This does not mean that the platform expires the instance at the exact moment that it becomes eligible. Mule purges the instances as appropriate.

Connection Types

Kafka Basic Consumer Connection
Parameters
Name Type Description Default Value Required

Consumer Partitions

Number

The number of partitions to use for the consumer.

1

Group Id

String

A unique string that identifies the consumer group this consumer belongs to.

x

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Kafka Kerberos Consumer Connection
Parameters
Name Type Description Default Value Required

Consumer Partitions

Number

The number of partitions to use for the consumer.

1

Group Id

String

A unique string that identifies the consumer group this consumer belongs to.

x

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Kerberos Config File

String

Kerberos configuration file. The essential Kerberos configuration information is the default realm and the default KDC.

Principal

String

Kerberos principal

x

Keytab

String

Path to keytab file associated with the principal.

x

Service Name

String

The Kerberos principal name that the Kafka broker runs as.

x

Additional JAAS Properties

Object

Additional properties such as a key-value pair that you need to set in sasl.jaas.config and that you usually include in the JAAS configuration file.

Kafka Kerberos SSL Consumer Connection
Parameters
Name Type Description Default Value Required

Consumer Partitions

Number

The number of partitions to use for the consumer.

1

Group Id

String

A unique string that identifies the consumer group this consumer belongs to.

x

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Key Store Type

String

The file format of the key store file. This is optional and the default value is JKS.

JKS

Key Store Password

String

The store password for the key store file. This is optional and needed only if Key Store Location is configured.

Key Store Location

String

The location of the key store file. This is optional and can be used for two-way authentication for the connector.

Trust Store Type

String

The file format of the trust store file.

JKS

Trust Store Password

String

The password for the trust store file.

x

Trust Store Location

String

The location of the trust store file.

x

Kerberos Config File

String

Kerberos configuration file. The essential Kerberos configuration information is the default realm and the default KDC.

Principal

String

Kerberos principal

x

Keytab

String

Path to keytab file associated with the principal.

x

Service Name

String

The Kerberos principal name that the Kafka broker runs as.

x

Additional JAAS Properties

Object

Additional properties such as a key-value pair that you need to set in sasl.jaas.config and that you usually include in the JAAS configuration file.

Kafka SSL Consumer Connection
Parameters
Name Type Description Default Value Required

Consumer Partitions

Number

The number of partitions to use for the consumer.

1

Group Id

String

A unique string that identifies the consumer group this consumer belongs to.

x

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Key Store Type

String

The file format of the key store file. This is optional and the default value is JKS.

JKS

Key Store Password

String

The store password for the key store file. This is optional and needed only if Key Store Location is configured.

Key Store Location

String

The location of the key store file. This is optional and can be used for two-way authentication for the connector.

Trust Store Type

String

The file format of the trust store file.

JKS

Trust Store Password

String

The password for the trust store file.

x

Trust Store Location

String

The location of the trust store file.

x

Associated Sources


Producer Configuration

Parameters

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Connection

The connection types to provide to this configuration.

x

Expiration Policy

Configures the minimum amount of time that a dynamic configuration instance can remain idle before the runtime considers it eligible for expiration. This does not mean that the platform expires the instance at the exact moment that it becomes eligible. Mule purges the instances as appropriate.

Connection Types

Kafka Basic Producer Connection
Parameters
Name Type Description Default Value Required

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Kafka Kerberos Producer Connection
Parameters
Name Type Description Default Value Required

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Kerberos Config File

String

Kerberos configuration file. The essential Kerberos configuration information is the default realm and the default KDC.

Principal

String

Kerberos principal

x

Keytab

String

Path to keytab file associated with the principal.

x

Service Name

String

The Kerberos principal name that the Kafka broker runs as.

x

Additional JAAS Properties

Object

Additional properties such as a key-value pair that you need to set in sasl.jaas.config and that you usually include in the JAAS configuration file.

Kafka Kerberos SSL Producer Connection
Parameters
Name Type Description Default Value Required

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Key Store Type

String

The file format of the key store file. This is optional and the default value is JKS.

JKS

Key Store Password

String

The store password for the key store file. This is optional and needed only if Key Store Location is configured.

Key Store Location

String

The location of the key store file. This is optional and can be used for two-way authentication for the connector.

Trust Store Type

String

The file format of the trust store file.

JKS

Trust Store Password

String

The password for the trust store file.

x

Trust Store Location

String

The location of the trust store file.

x

Kerberos Config File

String

Kerberos configuration file. The essential Kerberos configuration information is the default realm and the default KDC.

Principal

String

Kerberos principal.

x

Keytab

String

Path to keytab file associated with the principal.

x

Service Name

String

The Kerberos principal name that the Kafka broker runs as.

x

Additional JAAS Properties

Object

Additional properties such as a key-value pair that you need to set in sasl.jaas.config and that you usually include in the JAAS configuration file.

Kafka SSL Producer Connection
Parameters
Name Type Description Default Value Required

Bootstrap Servers

String

Comma-separated host-port pairs used for establishing the initial connection to the Kafka cluster. The same as the bootstrap.servers value you must provide to Kafka clients (producer and consumer).

x

Additional Properties

Object

Additional properties such as a key-value pair that you need for your connection.

Reconnection

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Key Store Type

String

The file format of the key store file. This is optional and the default value is JKS.

JKS

Key Store Password

String

The store password for the key store file. This is optional and needed only if Key Store Location is configured.

Key Store Location

String

The location of the key store file. This is optional and can be used for two-way authentication for the connector.

Trust Store Type

String

The file format of the trust store file.

JKS

Trust Store Password

String

The password for the trust store file.

x

Trust Store Location

String

The location of the trust store file.

x

Supported Operations

Operations

Publish Message

<kafka:producer>

Sends messages to a Kafka topic.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Topic

String

Topic to send the message to

x

Key

String

Key belonging to the message that is going to be sent.

x

Message

Binary

Message to be sent

#[payload]

Reconnection Strategy

A retry strategy in case of connectivity errors.

For Configurations

Throws

  • KAFKA:CONNECTIVITY

  • KAFKA:RETRY_EXHAUSTED

  • KAFKA:UNABLE_TO_SEND_MESSAGE

  • KAFKA:UNKNOWN

  • KAFKA:VALIDATION

Sources

Message Consumer

<kafka:consumer>

Source that consumes messages from a given topic.

Parameters

Name Type Description Default Value Required

Configuration

String

The name of the configuration to use.

x

Topic

String

Name of the topic to consume messages from.

x

Partition Offsets

Array of Offset

List of offsets for configuring partitions. For each element in the list you have to specify partition index and offset.

Primary Node Only

Boolean

Whether this source should be executed only on the primary node when running in a cluster.

Streaming Strategy

Configure to use repeatable streams.

Redelivery Policy

Defines a policy for processing the redelivery of the same message.

Reconnection Strategy

A retry strategy in case of connectivity errors.

Output

Type

Binary

Attributes Type

For Configurations

Types

Reconnection

Field Type Description Default Value Required

Fails Deployment

Boolean

When the application is deployed, a connectivity test is performed on all connectors. If set to true, deployment fails if the test doesn’t pass after exhausting the associated reconnection strategy.

Reconnection Strategy

The reconnection strategy to use.

Reconnect

Field Type Description Default Value Required

Frequency

Number

How often to reconnect (in milliseconds).

Count

Number

The number of reconnection attempts to make.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Reconnect Forever

Field Type Description Default Value Required

Frequency

Number

How often in milliseconds to reconnect.

blocking

Boolean

If false, the reconnection strategy runs in a separate, non-blocking thread.

true

Expiration Policy

Field Type Description Default Value Required

Max Idle Time

Number

A scalar time value for the maximum amount of time a dynamic configuration instance is allowed to remain idle before it’s considered eligible for expiration.

Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAY

A time unit that qualifies the maxIdleTime attribute.

Kafka Message Attributes

Field Type Description Default Value Required

Topic

String

x

Key

String

x

Partition

Number

x

Offset

Number

x

Offset

Field Type Description Default Value Required

Partition Number

String

Partition Offset

String

Repeatable In Memory Stream

Field Type Description Default Value Required

Initial Buffer Size

Number

The amount of memory to allocate to provide random access and to consume the stream. If the stream contains more data than can be fit into this buffer, then the buffer expands according to the value in the Buffer Size Increment attribute, with an upper limit of maxInMemorySize.

Buffer Size Increment

Number

The amount the buffer size expands if it exceeds its initial size. Setting a value of zero or lower means that the buffer should not expand, and to raise a STREAM_MAXIMUM_SIZE_EXCEEDED error if the buffer gets full.

Max Buffer Size

Number

The maximum amount of memory to use. If more than to use before a STREAM_MAXIMUM_SIZE_EXCEEDED error is raised. A value lower than or equal to zero means no maximum limit.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit in which all these attributes are expressed.

Repeatable File Store Stream

Field Type Description Default Value Required

Max In Memory Size

Number

Defines the maximum memory that the stream should use to keep data in memory. If more than that is consumed then it will start to buffer the content on disk.

Buffer Unit

Enumeration, one of:

  • BYTE

  • KB

  • MB

  • GB

The unit in which maxInMemorySize is expressed.

Redelivery Policy

Field Type Description Default Value Required

Max Redelivery Count

Number

The maximum number of times a message can be redelivered and processed unsuccessfully before triggering a process-failed message.

Use Secure Hash

Boolean

Whether to use a secure hash algorithm to identify a redelivered message.

Message Digest Algorithm

String

The secure hashing algorithm to use. If not set, the default is SHA-256.

Id Expression

String

Defines one or more expressions to use to determine when a message has been redelivered. This property may be set only if useSecureHash is false.

Object Store

Object Store

The object store where the redelivery counter for each message is stored.

We use cookies to make interactions with our websites and services easy and meaningful, to better understand how they are used and to tailor advertising. You can read more and make your cookie choices here. By continuing to use this site you are giving us your consent to do this.