Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerApache Kafka 用 Anypoint Connector (Apache Kafka Connector) をバージョン 4.x にアップグレードします。
開始バージョン | 終了バージョン |
---|---|
3.2.x |
4.x |
Apache Kafka Connector 4.x には、効率を向上するための多くの変更が含まれます。たとえば、新しい操作を使用すると、コネクタの使用方法についての理解を深めることができます。
このバージョンに含まれるすべての変更により、後方互換性が失われます。
このリリースには、次の変更が含まれています。
新しいパラメーターを使用する 4 個の新しい操作が追加されました。
Commit
この操作では、Message Listener (ソース) または Batch Message Listener (ソース) 内でコンシュームされるメッセージまたはメッセージのバッチに関連付けられたオフセットをコミットします。この操作は、MANUAL Ack モードでのみ機能します。
Consume
この操作を使用して、1 つ以上の Kafka トピックからメッセージを受信します。Consume 操作は Message Listener ソースと同様に動作します。そのため、Message Listener ソースに適用されるすべての操作がこの操作にも適用されます。
Publish
この操作を使用すると、メッセージを Kafka トピックに送信できます。これは非ブロックであり、この動作は接続パラメーターの特定の値に応じて異なります。この操作はトランザクションをサポートするため、Kafka プロデューサーのトランザクション ID がランダムに生成されます。
Seek
この操作を使用して、特定のトピックおよびパーティションに対するコンシューマーの現在のオフセットを、指定されたオフセット値に設定します。
「新規操作」の表を参照してください。
Message Listener は Message Consumer ソースを置き換えます。
「変更されたソース」の表を参照してください。
Batch Message Listener ソースが追加されました。これは Message Listener と同様に動作しますが、一度に 1 つのメッセージではなくメッセージのリストを処理するという点で異なります。ポーリングで取得されたメッセージリストは Mule フローにより 1 つのイベントとして処理されます。
新しいパラメーターについての詳細は、「新規ソース」の表を参照してください。
コンシューマー設定にさまざまな肯定応答モードが追加されました。
AUTO
Mule は、フローが正常に終了した場合にのみ、メッセージに肯定応答します。
MANUAL
ユーザーはフロー内でメッセージに手動で肯定応答する必要があります。
DUPS_OK
AUTO モードと同じですが、コミットが非同期で実行されるため、重複するレコードが生成される可能性があります。
IMMEDIATE
Mule は受信時にメッセージに自動的に肯定応答します。
トランザクションをサポートする 1 つのプレーンテキスト接続種別が追加されました。
コネクタのバージョン 3.x では、接続に関連するカスタマイズ可能な設定プロパティセットを追加できました。コネクタのバージョン 4.x では、この機能は削除されます。プロパティを設定できる一連の接続種別がコネクタに追加されました。
Kafka ソース | 以前のバージョン | パラメーター |
---|---|---|
Message Listener |
Message Consumer |
|
このリリースには、次の新しい操作が含まれています。
Kafka 操作 | 説明 | パラメーター |
---|---|---|
Commit |
メッセージリスナーがコンシュームされるメッセージまたはメッセージのバッチに関連付けられているオフセットをコミットします。 |
コンシューマーコミットキー。メッセージをコミットするために使用するコンシューマーコミットキー。 |
Consume |
1 つ以上の Kafka トピックからメッセージを受信できます。これは Message Listener ソースと同様に動作します。そのため、Message Listener ソースに適用されるすべての操作がこの操作にも適用されます。 |
|
Publish |
Kafka トピックにメッセージを送信できます。また、これは非ブロックです。この動作は接続パラメーターの特定の値に応じて異なります。この操作はトランザクションをサポートします。つまり、Kafka プロデューサーのトランザクション ID がランダムに生成され、接続内で処理されます。 |
|
Seek |
特定のトピックおよびパーティションのコンシューマーの現在のオフセットを指定のオフセット値に設定します。 |
|
Commit 操作を除き、timeout パラメーターが指定されていない場合、フローはブロックされます。
|
Kafka 操作 | 説明 | パラメーター |
---|---|---|
Batch Message Listener |
一度に 1 つのメッセージではなくメッセージのリストを処理するという点を除き、Message Listener と同様に動作します。ポーリングで取得されたメッセージリストはフローにより 1 つのイベントとして処理されるため、同時実行の処理が、単純な Message Listener よりも簡単になります。つまり、Kafka コンシューマーでコミットをコールすると、すべてのメッセージに対してメッセージのコミットがまとめて実行されます。 |
|
アップグレードを実行する前に、以前のバージョンに戻す必要がある場合に備えて、ファイル、データ、および設定のバックアップを作成する必要があります。
以下の手順に従って、Apache Kafka Connector 4.0.x へのアップグレードを実行します。
Studio で Mule プロジェクトを作成します。
[Mule Palette (Mule パレット)] ビューで、[Search in Exchange (Exchange 内を検索)] をクリックします。
[Add Modules to Project (モジュールをプロジェクトに追加)] で、検索項目に「Apache Kafka」と入力します。
[Available modules (使用可能なモジュール)] で、[Apache Kafka Connector] を選択して [Add (追加)] をクリックします。
[Finish (完了)] をクリックします。
Anypoint Studio はコネクタを自動的にアップグレードします。
pom.xml で kafka-connector
連動関係バージョンが 4.0.0
であることを確認します。
コネクタの最新バージョンをインストールしたら、次の手順に従ってアップグレードを完了します。
Anypoint Studio で、[Problems (問題)] ビューまたは [Console (コンソール)] ビューにエラーがないことを確認します。
プロジェクトの pom.xml を確認し、問題がないことを確認します。
接続をテストして、操作が機能することを確認します。
パラメーターのキャッシュおよびメタデータのキャッシュで問題がある場合、Anypoint Studio の再起動をお試しください。
前のバージョンの Apache Kafka Connector に戻す必要がある場合、Anypoint Studio で、プロジェクトの pom.xml 内の kafka-connector
連動関係バージョン 4.0.0
を前のバージョンに変更します。