Mule アプリケーションでのストリーミング

Mule 4 では、データストリームを処理するための新しいフレームワークが追加されています。Mule 4 での変更点を理解するためには、従来はデータストリームがどのようにコンシュームされていたかを理解することが必要です。

  • データストリームは 1 回しかコンシュームできません。

    たとえば、次のフローがあるとします。

    streaming about 49d23

    Mule 3 アプリケーションでは、このフローによって最初のファイルは正しく書き込まれますが、2 つ目のファイルはコンテンツが空になります。その理由は、ストリームをコンシュームする各コンポーネントは、それぞれ新しいストリームを受信することを前提としているためです。最初の Write 操作の実行後、2 つ目の Write 操作は空のストリームを受信するため、ファイルに書き込むコンテンツがありません。

    DataWeave 変換の前後でペイロードをログに記録しようとした場合でも、同様の結果となります。次の例を考えてみましょう。

    streaming about 5f1b3

    このアプリケーションは、Transform Message プロセッサの前にペイロードをログに記録しますが、ロガーがストリームをコンシュームしてメモリに読み込んでしまうため、処理後のペイロードは記録されません。ストリームが Transform Message プロセッサに到達した時点では、ストリームのコンテンツがメモリに格納されているため、Transform Message プロセッサはコンテンツをコンシュームできます。しかし、2 つ目のロガーは空のストリームを受け取ることになります。

  • データストリームを同時にコンシュームすることはできません。

    Scatter-Gather ルータを使用してデータストリームを分割し、ペイロードのログへの記録とファイルへの書き込みを同時に実行する Mule 3 アプリケーションがあるとします。

    streaming about 6af9b

    データストリームのコンテンツを同時に複数のプロセッサチェーンで処理することはできないため、このアプリケーションはエラーとなります。

反復可能ストリーム

Mule 4.0 では、ストリームを処理するためのデフォルトフレームワークとして反復可能ストリームが追加されています。反復可能ストリームを使用することで:

  • ストリームを複数回読み込むことができます。

  • ストリームに同時にアクセスできます。

コンポーネントがストリームをコンシュームすると、Mule はそのコンテンツを一時バッファに格納します。以後、ランタイムはストリームのコンテンツをテンポラリバッファからコンポーネントに供給することで、すでにストリームが他のコンポーネントによって何度コンシュームされていようとも、各コンポーネントが完全なストリームを受信できるようにします。この処理は自動的に行われるため、ユーザは特別な設定をする必要はなく、繰り返しアクセスするためにストリームを格納しておく方法を考える必要もありません。この設定により、上記の 3 つの Mule アプリケーションの例の問題のうち、2 つが自動的に解決されます。

すべての反復可能ストリームは並列アクセスをサポートします。そのため、異なるスレッドで動作している 2 つのコンポーネントが同じストリームに同時にアクセスしないようにと頭を悩ませる必要はありません。Mule は、コンポーネント A がストリームを読み込んでもコンポーネント B に対して一切の副作用を発生させないことを自動的に保証します。そのため、上記の 3 番目の例のような処理も可能になります。

Mule が反復可能ストリームをどのように処理するかは、ストリーミング戦略を使用して設定できます。

ストリーミング戦略

File Stored Repeatable Stream (ファイルに格納された反復ストリーム)

これは Mule Runtime Enterprise Edition のデフォルトストリーミング戦略です。

このオプションは Mule EE でのみ使用できます。

デフォルトでは、メモリ内のバッファサイズとして 512 KB を使用します。ストリームがこのサイズを超えると、ディスク上に一時ファイルを作成して、メモリをオーバーフローさせることなくコンテンツを格納します。

扱うファイルのサイズの大小が事前に分かっている場合は、デフォルトのバッファサイズを変更することでパフォーマンスを最適化できます。
バッファサイズを大きくすると、ランタイムがバッファをディスクに書き込む回数を減らすことができるため、パフォーマンスが向上します。ただし、アプリケーションが同時に処理できる要求の数が制限されます。
同様に、バッファサイズを小さくすると、メモリの負荷が減ります。
また、バッファの測定単位を設定することで、単位の変換を不要にすることができます。

たとえば、常に 1 MB 前後のファイルを読み込むことが分かっているのであれば、次のように 1 MB のバッファを設定できます。

<file:read path="bigFile.json">
  <repeatable-file-store-stream
    inMemorySize="1"
    bufferUnit="MB"/>
</file:read>

あるいは、処理するファイルのサイズが 10 KB を超えることがないということが分かっていれば、次のように メモリを節約できます。

<file:read path="smallFile.json">
  <repeatable-file-store-stream
    inMemorySize="10"
    bufferUnit="KB"/>
</file:read>

パフォーマンステストにより、この戦略のデフォルトである 512 KB のバッファサイズ設定が、ほとんどのシナリオにおいてパフォーマンスに大きな影響を与えないことが分かっています。
実際にテストを行って、ニーズに最適なバッファサイズの設定を見つけてください。

In Memory Repeatable Stream (メモリ内の反復可能ストリーム)

この設定は、Mule Runtime Community Edition のデフォルトです。
デフォルトのバッファサイズは 512 KB です。ストリームがこのサイズを超えると、最大バッファサイズに到達するまで、デフォルトでは 512 KB 単位でメモリが拡張されます。ストリームが最大バッファサイズを超えると、アプリケーションはエラーとなります。

この動作は、初期バッファサイズ、バッファの拡張単位、最大バッファサイズ、そして測定単位を設定することでカスタマイズできます。
たとえば、次の例では、メモリ内の反復可能ストリームの初期バッファサイズを 512 KB、拡張単位を 256 KB、そしてメモリに読み込めるコンテンツの最大サイズを 2 MB に設定しています。

<file:read path="exampleFile.json">
  <repeatable-in-memory-stream
    initialBufferSize="512"
    bufferSizeIncrement="256"
    maxinMemorySize="2000"
    bufferUnit="KB"/>
</file:read>

パフォーマンステストにより、この戦略のデフォルトである 512 KB のバッファサイズと 512 KB の拡張単位が、ほとんどのシナリオにおいてパフォーマンスに大きな影響を与えないことが分かっています。
実際にテストを行って、ニーズに最適なバッファサイズと拡張単位の設定を見つけてください。

Non Repeatable Stream (反復不可能ストリーム)

この戦略は、反復可能ストリームを無効にします。この戦略では、入力ストリームを 1 回だけ読み込めるようにします。 反復可能ストリームによるメモリ消費やパフォーマンスオーバーヘッドが望ましくないユースケース向けです。ストリームがメモリに格納されないため、最もパフォーマンスの高い戦略となります。

<file:read path="exampleFile.json">
  <non-repeatable-stream />
</file:read>

この設定では、実際のストリーミングコンポーネントが実行される前に大きな入力ストリームにアクセスするコンポーネントが存在すると、フローは直ちにエラーとなります。

InputStream または Streamable コレクションを返す Mule 4.0 のすべてのコンポーネントは、反復可能ストリーミングをサポートします。
これらのコンポーネントの一例です。

  • ファイルコネクタ

  • FTP コネクタ

  • データベースコネクタ

  • HTTP コネクタ

  • ソケット

  • SalesForce コネクタ

オブジェクトのストリーミング

自動ページングを使用するように Anypoint のコネクタを設定すると、同じようなシナリオが発生します。Mule 4.0 は、反復可能自動ページングを使用して、ページングされたコネクタ出力を自動的に処理します。
コネクタがオブジェクトを受信した時点で Mule は設定可能なメモリ内バッファを使用してオブジェクトを格納するため、このフレームワークは反復可能ストリーミングと似ています。
ただし、反復可能ストリーミングはバッファサイズをバイト単位で測定しますが、オブジェクトを処理する場合には、ランタイムはインスタンス数でバッファサイズを測定します。

オブジェクトをストリーミングすると、メモリ内のバッファサイズはインスタンス数で測定されます。

反復可能自動ページングでメモリ内のバッファサイズを計算する場合は、メモリ不足にならないように、各インスタンスが使用するメモリ量を見積もる必要があります。

反復可能ストリーミングと同じように、異なる戦略を使用して、Mule が反復可能自動ページングを処理する方法を設定できます。

Repeatable File Store Iterable (反復可能なファイルストアイテラブル)

この設定は、Mule Runtime Enterprise Edition のデフォルトです。
デフォルトのメモリ内バッファサイズは、オブジェクト 500 個分です。このバッファサイズよりも多くの結果がクエリで返されると、Mule はオブジェクトをシリアル化して、ディスクに書き込みます。
メモリ内のバッファに格納されるオブジェクト数は設定可能です。より多くのオブジェクトをメモリに格納することで、ディスクへの書き込みが減るため、パフォーマンスが向上します。

たとえば、次のように、SalesForce Connector のクエリに対してメモリ内のバッファサイズをオブジェクト 100 個分に設定できます。

<sfdc:query query="dsql:...">
  <ee:repeatable-file-store-iterable inMemoryObjects="100"/>
</sfdc:query>

このインターフェースは、Kryo フレームワークを使用してオブジェクトをシリアル化し、ディスクに書き込めるようにします。
オブジェクトでシリアル化可能なインターフェースが実装されていない場合、Plain Old Java Object (POJO) のシリアル化は失敗します。ただし、シリアル化可能なインターフェースを実装していないオブジェクトが含まれていても、Kryo であれば (必ずではありませんが) 成功する可能性が高いと言えます。たとえば、POJO に org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl が含まれているとします。Kryo シリアライザは、JVM がデフォルトではシリアル化できないオブジェクトを Mule でシリアル化できるようにしますが、一部はシリアル化できません。オブジェクトは単純にすることをお勧めします。

このオプションは Mule EE でのみ使用できます。

Repeatable In-Memory Iterable (反復可能なメモリ内イテラブル)

この設定は、Mule Runtime Community Edition のデフォルトです。
デフォルトのバッファサイズは、オブジェクト 500 個分です。クエリ結果がこのサイズを超えると、最大バッファサイズに到達するまで、デフォルトでは オブジェクト 100 個分ずつメモリが拡張されます。ストリームが最大バッファサイズを超えると、アプリケーションはエラーとなります。
初期バッファサイズ、バッファの拡張単位、そして最大バッファサイズをカスタマイズできます。

次の例では、メモリ内のバッファサイズをオブジェクト 100 個分、拡張単位をオブジェクト 100 個分、最大メモリサイズをオブジェクト 500 個分に設定します。

<sfdc:query query="dsql:...">
  <repeatable-in-memory-iterable
    initialBufferSize="100"
    bufferSizeIncrement="100"
    maxBufferSize="500" />
</sfdc:query>

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub