Contact Free trial Login

Use Flow Designer to Configure Apache Kafka Connector v4.2 - Mule 4

Design Center enables you to create apps visually. To use Design Center, work with your Anypoint Platform administrator to ensure that you have a Design environment. For more information, see the Flow Designer Tour.

To create an app in Design Center:

  • Configure the input source (trigger) for your app.

  • Add the connector as a component to process the input for the app.

For information on field values, see the Apache Kafka Connector Reference.

Configure the Trigger

  1. In Design Center, click Create new.

  2. Click Create new application.

  3. Enter a Project name, and select the Target Environment.

  4. Click Create.

  5. Click Go straight to canvas to exit from Let’s get started.

  6. Click the name of the trigger card.

  7. If you are using the Anypoint Connector for Apache Kafka (Apache Kafka Connector) as a trigger, search for the connector; otherwise, search for HTTP Listener or Scheduler.

  8. Select the source.
    Apache Kafka Connector provides Batch message listener and Message listener as sources.

  9. To configure Consumer, set the field values:

Name Type Description Default Value Required

Name

String

The name for this configuration. Connectors reference the configuration with this name.

x

Default Acknowledgment Mode

Enumeration, one of:

  • AUTO

  • MANUAL

  • IMMEDIATE

  • DUPS_OK

Defines the way that the Kafka broker instance is notified of message consumption.

  • AUTO: Messages are committed only if the flow finishes successfully.

  • MANUAL: The user must commit manually through the Commit operation.

  • IMMEDIATE: Mule automatically commits the messages upon reception and before triggering the flow.

  • DUPS_OK: Same as the MANUAL mode, but the commit is made asynchronously, which can lead to duplicate records.

AUTO

Default Listener Poll Timeout

Number

The time, in time units, to wait to perform a poll if data is not available in the buffer (fetched). If no value is set, the poll is returned immediately with any records that are currently available in the buffer or else returns empty if there is no data. Must not be negative.

100

Default Listener Poll Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the polling timeout. This combines with pollTimeout to define the total timeout for the polling.

MILLISECONDS

Default Listener Poll Timeout

Number

The time, in time units, to wait for an operation to finish. If no value is set or a negative value is set, the operation waits forever.

-1

Default Operation Timeout Time Unit

Enumeration, one of:

  • NANOSECONDS

  • MICROSECONDS

  • MILLISECONDS

  • SECONDS

  • MINUTES

  • HOURS

  • DAYS

The time unit for the operation timeout. This combines with operationTimeout to define the total default timeout for the operations that use this configuration.

SECONDS

Zone ID

String

Converts the provided timestamps into ZonedLocalDateTimes in the results. Default value is the one provided by the system.

Design Center automatically saves changes you make in a session.

Kafka Consumer Plaintext Connection

Add a connection configuration:

  1. Click Add Connection.

  2. Enter a unique Connection Name.

  3. Optionally, select Share this connection with my business group.

  4. Select the Connection Type, and enter the values for the connection:

    Name Type Description Default Value Required

    Bootstrap Server URLs

    Array of String

    The list of servers to use to bootstrap the connection with the Apache Kafka cluster. This can be a partial list of the available servers.

    x

    Group ID

    String

    Default group ID for all the Kafka consumers that use this configuration.

    Consumer Amount

    Number

    Determines the number of consumers the connection will initially create.

    1

    Maximum Polling Interval

    Number

    Controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client resends the request; or the request fails if the specified number of retries are exhausted. This parameter can be overridden at the source level.

    300

    Maximum Polling Interval Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Determines the time unit for request timeout scalar. This parameter can be overridden at source level.

    SECONDS

    Isolation Level

    Enumeration, one of:

    • READ_UNCOMMITTED

    • READ_COMMITTED

    Controls how to read messages that are written transactionally.

    If set to read_committed, consumer.poll() will return only transactional messages that have been committed. If set to read_uncommitted’ (default), `consumer.poll() returns all messages, even transactional messages that were aborted. Non-transactional messages are returned unconditionally in either mode. Messages are always returned in offset order. Hence, in read_committed mode, consumer.poll() returns only messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction.

    In particular, any messages appearing after messages belonging to ongoing transactions are withheld until the relevant transaction is completed. As a result, read_committed consumers are not able to read up to the high watermark when there are in-flight transactions. Further, when in read_committed the seekToEnd method returns the LSO.

    READ_UNCOMMITTED

    Exclude Internal Topics

    Boolean

    Whether internal topics matching a subscribed pattern should be excluded from the subscription. It is always possible to explicitly subscribe to an internal topic.

    true

    Auto Offset Reset

    Enumeration, one of:

    • EARLIEST

    • LATEST

    • ERROR

    Determines what to do when there is no initial offset in Apache Kafka or if the current offset no longer exists on the server (for example, because the data was deleted):

    • EARLIEST: Automatically reset the offset to the earliest offset.

    • LATEST: Automatically reset the offset to the latest offset.

    • ERROR: Throw an error if no previous offset is found for the consumer’s group.

    LATEST

    Retry Backoff Timeout

    Number

    The amount of time to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios.

    100

    Retry Backoff Timeout Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Determines the time unit for the reconnect backoff timeout scalar.

    MILLISECONDS

    Check CRC

    Boolean

    Automatically check the CRC32 of the records consumed. This ensures that no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so in cases seeking extreme performance, this can be disabled.

    true

    Default Receive Buffer Size

    Number

    The size of the TCP receive buffer (SO_RCVBUF) to use when reading data. If the value is -1, the OS default is used. This parameter can be overridden at the source level.

    64

    Default Receive Buffer Size Unit

    Enumeration, one of:

    • BYTE

    • KB

    • MB

    • GB

    The unit of measure for the receive buffer size scalar. This parameter can be overridden at the source level.

    KB

    Default Send Buffer Size

    Number

    The size of the TCP send buffer (SO_SNDBUF) to use when sending data. If the value is -1, the OS default is used. This parameter can be overridden at the source level.

    128

    Default Send Buffer Size Unit

    Enumeration, one of:

    • BYTE

    • KB

    • MB

    • GB

    The unit of measure for the send buffer size scalar. This parameter can be overridden at the source level.

    KB

    Request Timeout

    Number

    The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client resends the request if necessary or fails the request if the retries are exhausted. This parameter can be overridden at the source level.

    30

    Request Timeout Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Determines the time unit for request timeout scalar. This parameter can be overridden at the source level.

    SECONDS

    Default Record Limit

    Number

    The maximum number of records returned on a poll call to the Apache Kafka cluster. You can override this parameter at the source level.

    500

    DNS Lookups

    Enumeration, one of:

    • DEFAULT

    • USE_ALL_DNS_IPS

    • RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY

    Controls how the client uses DNS lookups. If set to use_all_dns_ips then, when the lookup returns multiple IP addresses for a hostname, a connection is attempted to all of the IP addresses before the connection fails. Applies to both bootstrap and advertised servers. If the value is resolve_canonical_bootstrap_servers_only each entry is resolved and expanded into a list of canonical names.

    DEFAULT

    Heartbeat Interval

    Number

    The expected time between heartbeats to the consumer coordinator when using Apache Kafka’s group management facilities. Heartbeats are used to ensure that the consumer’s session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically no higher than 1/3 of that value. You can adjust it to be even lower to control the expected time for normal rebalances.

    3

    Heartbeat Interval Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Determines the time unit for fetching the heartbeat interval time scalar.

    SECONDS

    Session Timeout

    Number

    The timeout used to detect consumer failures when using Apache Kafka’s group management facility. The consumer sends periodic heartbeats to confirm to the broker that it is still alive. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker removes this consumer from the group and initiates a rebalance. The value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

    10

    Session Timeout Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Determines the time unit for session timeout scalar.

    SECONDS

    Connection Maximum Idle Time

    Number

    Close idle connections after the number of milliseconds specified by this configuration.

    540

    Connection Maximum Idle Time Time Unit

    Enumeration, one of:

    • NANOSECONDS

    • MICROSECONDS

    • MILLISECONDS

    • SECONDS

    • MINUTES

    • HOURS

    • DAYS

    Determines the time unit for connections maximum idle time scalar.

    SECONDS

  5. Click Save.

Add Topics and Assignments

  1. In Topics, next to Topic Subscription Patterns, click Add.

  2. In the Add Item dialog, enter the Topic Subscription Pattern.

  3. To add topic-partition pairs to assign, next to Assignments, click Add.

Add a Component

  1. Click + next to the trigger card.

  2. In Select a component, search for the connector name.

  3. Select Apache Kafka Connector as the component.

  4. Select the Publish operation and configure the following values:

    • Topic
      Enter the topic to which to send a message.

    • Partition
      Enter the number for the partition that will have its offset modified.

    • Key
      Enter the key of the message to send.

    • Message
      Enter the content for the message to send.

    • Headers
      Optionally, click Add to add a header for the message to send.

      kafka publish dc config

Next

Now that you have completed configuring Design Center, see the Apache Kafka Connector topic for more information.