Apache Kafka Connector の 4.x へのアップグレードおよび移行

Apache Kafka 用 Anypoint Connector (Apache Kafka Connector) をバージョン 4.x にアップグレードします。

サポートされているアップグレードパス

開始バージョン 終了バージョン

3.2.x

4.x

このリリースでの変更

Apache Kafka Connector 4.x には、効率を向上するための多くの変更が含まれます。たとえば、新しい操作を使用すると、コネクタの使用方法についての理解を深めることができます。

このバージョンに含まれるすべての変更により、後方互換性が失われます。

このリリースには、次の変更が含まれています。

  • 新しいパラメーターを使用する 4 個の新しい操作が追加されました。

    • Commit
      この操作では、Message Listener (ソース) または Batch Message Listener (ソース) 内でコンシュームされるメッセージまたはメッセージのバッチに関連付けられたオフセットをコミットします。この操作は、MANUAL Ack モードでのみ機能します。

    • Consume
      この操作を使用して、1 つ以上の Kafka トピックからメッセージを受信します。Consume 操作は Message Listener ソースと同様に動作します。そのため、Message Listener ソースに適用されるすべての操作がこの操作にも適用されます。

    • Publish
      この操作を使用すると、メッセージを Kafka トピックに送信できます。これは非ブロックであり、この動作は接続パラメーターの特定の値に応じて異なります。この操作はトランザクションをサポートするため、Kafka プロデューサーのトランザクション ID がランダムに生成されます。

    • Seek
      この操作を使用して、特定のトピックおよびパーティションに対するコンシューマーの現在のオフセットを、指定されたオフセット値に設定します。
      「新規操作」​の表を参照してください。

  • Message Listener は Message Consumer ソースを置き換えます。
    「変更されたソース」​の表を参照してください。

  • Batch Message Listener ソースが追加されました。これは Message Listener と同様に動作しますが、一度に 1 つのメッセージではなくメッセージのリストを処理するという点で異なります。ポーリングで取得されたメッセージリストは Mule フローにより 1 つのイベントとして処理されます。
    新しいパラメーターについての詳細は、​「新規ソース」​の表を参照してください。

  • コンシューマー設定にさまざまな肯定応答モードが追加されました。

    • AUTO
      Mule は、フローが正常に終了した場合にのみ、メッセージに肯定応答します。

    • MANUAL
      ユーザーはフロー内でメッセージに手動で肯定応答する必要があります。

    • DUPS_OK​
      AUTO モードと同じですが、コミットが非同期で実行されるため、重複するレコードが生成される可能性があります。

    • IMMEDIATE
      Mule は受信時にメッセージに自動的に肯定応答します。

  • トランザクションをサポートする 1 つのプレーンテキスト接続種別が追加されました。

    コネクタのバージョン 3.x では、接続に関連するカスタマイズ可能な設定プロパティセットを追加できました。コネクタのバージョン 4.x では、この機能は削除されます。プロパティを設定できる一連の接続種別がコネクタに追加されました。

変更されたソース

Kafka ソース 以前のバージョン パラメーター

Message Listener

Message Consumer

  • Poll timeout (ポーリングタイムアウト)

  • Poll timeout time unit (ポーリングタイムアウト時間単位)

  • Acknowledgment mode (肯定応答モード)

  • Number of parallel consumers (並列コンシューマー数)

新規操作

このリリースには、次の新しい操作が含まれています。

Kafka 操作 説明 パラメーター

Commit

メッセージリスナーがコンシュームされるメッセージまたはメッセージのバッチに関連付けられているオフセットをコミットします。

コンシューマーコミットキー。メッセージをコミットするために使用するコンシューマーコミットキー。

Consume

1 つ以上の Kafka トピックからメッセージを受信できます。これは Message Listener ソースと同様に動作します。そのため、Message Listener ソースに適用されるすべての操作がこの操作にも適用されます。
[NOTE] タイムアウトパラメーターを指定する必要があります。指定しない場合、フローがブロックされます。

  • Consumption timeout (コンシュームタイムアウト)

  • Timeout time unit (タイムアウト時間単位)

Publish

Kafka トピックにメッセージを送信できます。また、これは非ブロックです。この動作は接続パラメーターの特定の値に応じて異なります。この操作はトランザクションをサポートします。つまり、Kafka プロデューサーのトランザクション ID がランダムに生成され、接続内で処理されます。
[NOTE] タイムアウトパラメーターを指定する必要があります。指定しない場合、フローがブロックされます。

  • Topic (トピック) (トピック名)

  • Partition (パーティション) (パーティション名)

  • キー

  • Message (メッセージ)

  • Headers (ヘッダー)

Seek

特定のトピックおよびパーティションのコンシューマーの現在のオフセットを指定のオフセット値に設定します。
[NOTE] タイムアウトパラメーターを指定する必要があります。指定しない場合、フローがブロックされます。

  • Topic (トピック) (トピック名)

  • Partition (パーティション) (パーティション名)

  • Offset (オフセット) (シークの対象のオフセット)

Commit 操作を除き、​timeout​ パラメーターが指定されていない場合、フローはブロックされます。

新規ソース

Kafka 操作 説明 パラメーター

Batch Message Listener

一度に 1 つのメッセージではなくメッセージのリストを処理するという点を除き、Message Listener と同様に動作します。ポーリングで取得されたメッセージリストはフローにより 1 つのイベントとして処理されるため、同時実行の処理が、単純な Message Listener よりも簡単になります。つまり、Kafka コンシューマーでコミットをコールすると、すべてのメッセージに対してメッセージのコミットがまとめて実行されます。

  • Poll timeout (ポーリングタイムアウト)

  • Poll timeout time unit (ポーリングタイムアウト時間単位)

  • Acknowledgment mode (肯定応答モード)

  • Number of parallel consumers (並列コンシューマー数)

アップグレード前提条件

アップグレードを実行する前に、以前のバージョンに戻す必要がある場合に備えて、ファイル、データ、および設定のバックアップを作成する必要があります。

アップグレード手順

以下の手順に従って、Apache Kafka Connector 4.0.x へのアップグレードを実行します。

  1. Studio で Mule プロジェクトを作成します。

  2. [Mule Palette (Mule パレット)] ビューで、​[Search in Exchange (Exchange 内を検索)]​ をクリックします。

  3. [Add Modules to Project (モジュールをプロジェクトに追加)] で、検索項目に「Apache Kafka」と入力します。

  4. [Available modules (使用可能なモジュール)] で、​[Apache Kafka Connector]​ を選択して ​[Add (追加)]​ をクリックします。

  5. [Finish (完了)]​ をクリックします。
    Anypoint Studio はコネクタを自動的にアップグレードします。

  6. pom.xml で ​kafka-connector​ 連動関係バージョンが ​4.0.0​ であることを確認します。

アップグレード後の手順

コネクタの最新バージョンをインストールしたら、次の手順に従ってアップグレードを完了します。

  1. Anypoint Studio で、​[Problems (問題)]​ ビューまたは ​[Console (コンソール)]​ ビューにエラーがないことを確認します。

  2. プロジェクトの pom.xml を確認し、問題がないことを確認します。

  3. 接続をテストして、操作が機能することを確認します。

トラブルシューティング

パラメーターのキャッシュおよびメタデータのキャッシュで問題がある場合、Anypoint Studio の再起動をお試しください。

アップグレードの取り消し

前のバージョンの Apache Kafka Connector に戻す必要がある場合、Anypoint Studio で、プロジェクトの pom.xml 内の ​kafka-connector​ 連動関係バージョン ​4.0.0​ を前のバージョンに変更します。