Flex Gateway新着情報
Governance新着情報
Monitoring API ManagerDataWeave は、Mule アプリケーション内のフローを通してエンドツーエンドのストリーミングをサポートします。ストリーミングにより、大きなドキュメントでもメモリをオーバーフローさせることなく処理をスピードアップできます。
DataWeave は、ドキュメント全体をスキャンしてインデックス化するのではなく、ストリーミングされたデータのバイトが到着するたびに処理します。遅延モードでは、DataWeave はストリーミングされた出力データをディスクに保存せずに、メッセージプロセッサーに直接渡すこともできます。この動作によって DataWeave と Mule は、デフォルトのプロセスでデータを読み書きするよりも速くデータを処理し、リソースコンシューム量を少なく抑えることができます。
データを正常にストリーミングするためには、以下を理解することが重要です。
ストリームの基本単位はデータ形式ごとに異なります。
CSV ドキュメントであればレコード、JSON ドキュメントであれば配列の要素、XML ドキュメントであればコレクションが基本単位となります。
ストリーミングでは、各ストリーム単位に順次アクセスします。
ストリーミングでは、ドキュメントへのランダムアクセスはサポートされません。
ストリーミングはデフォルトでは有効化されていません。2 つの設定プロパティを使用して、サポートされているデータ形式でデータをストリーミングできます。
ソースデータをストリームとして読み取るための streaming
プロパティ
出力ストリームをフロー内の次のメッセージプロセッサーに直接渡すための deferred
ライタープロパティ
DataWeave がソースデータをストリームとして読み取るには、データソースで streaming
リーダープロパティを true
に設定する必要があります。Mule アプリケーション内で、この設定を MIME Type (MIME タイプ) プロパティ outputMimeType
または mimeType
の値に追加します。HTTP Listener 操作、HTTP Request 操作、On New or Updated File 操作、Set Payload コンポーネントなど、データを生成する任意のコネクタ操作または Mule コンポーネントでプロパティを設定できます。DataWeave は、ストリーミングプロパティを設定したアプリケーション内のポイントから、DataWeave 式とスクリプトを含むすべてのダウンストリームコンポーネントとコネクタ操作を介してデータをストリーミングデータとして読み取ります。
<flow name="dw-streaming-example" >
<http:listener doc:name="Listener"
outputMimeType="application/json; streaming=true"
config-ref="HTTP_Listener_config" path="/input"/>
</flow>
streaming=true
は outputMimeType
値の一部ですので注意してください。File コンポーネントや FTP コンポーネントなどの他の多くの Mule コンポーネントも MIME タイプ設定をサポートします。
ストリーミングされた出力を次のメッセージプロセッサーに渡すには、DataWeave スクリプトで output
ディレクティブを次のように使用します。
output application/json deferred=true
deferred
プロパティの使い方の詳細は、出力のストリーミング を参照してください。
CSV は、その構造により最もシンプルなストリーミング形式となります。CSV ヘッダーの下にある各行は、ストリーム可能なレコードです。次の CSV の例は、name
、lastName
、age
の各値を含むレコードから構成されています。
name,lastName,age
mariano,achaval,37
leandro,shokida,30
pedro,achaval,4
christian,chibana,25
sara,achaval,2
matias,achaval,8
この CSV の例をストリーミングするため、次のスクリプトは各レコードから値を選択しています。このスクリプトは、map
関数を使用して、ドキュメント内の各レコードを反復処理しています。
payload map (record) ->
{
FullName: record.lastName ++ "," ++ record.name,
Age: record.age
}
ストリーミングでは、ドキュメント全体へのランダムアクセスはサポートされませんが、各レコードはメモリに読み込まれるため、DataWeave スクリプトは各レコード内のデータにはランダムにアクセスできます。たとえば、record.lastName "," record.name,
という式により、入力での値の順序が逆であっても、name
値にアクセスする前に lastName
値にアクセスできます。
ただし、ストリーミングは次のスクリプトでは機能しません。入力とは異なる順序で要素を返すためには、スクリプトでドキュメント全体へのランダムアクセスが必要になるためです。
[payload[-2], payload[-1], payload[3]]
JSON ストリーミングの単位は配列の各要素です。
DataWeave 2.2.0 を使用して Mule 4.2 で JSON をストリーミングする場合には、配列が入力のルートに存在する必要があります。Mule 4.3 で JSON をストリーミングする場合には、配列が入力のルートに存在しなくても構いません。
{
"name" : "Mariano",
"lastName": "Achaval",
"family": [
{"name": "Sara", "age": 2},
{"name": "Pedro", "age": 4},
{"name": "Matias", "age": 8}
],
"age": 37
}
この例では、DataWeave は payload.family
をストリーミングして、その配列の各要素にランダムアクセスします。ただし、DataWeave はコンテナオブジェクトにはランダムアクセスすることはできません。たとえば、age
は family
の後にあり、DataWeave は後戻りすることはできないため、{ a: payload.age , b: payload.family}
をストリーミングすることはできません。
XML ではドキュメントに配列が含まれないため、JSON より複雑になります。
XML をストリーミングするため、DataWeave は次のリーダープロパティを使用して、ストリーミングするドキュメント内の場所を定義します。
collectionPath
たとえば、次の XML 入力があるとします。
<order>
<header>
<date>Wed Nov 15 13:45:28 EST 2006</date>
<customer number="123123">Joe</customer>
</header>
<order-items>
<order-item id="31">
<product>111</product>
<quantity>2</quantity>
<price>8.90</price>
</order-item>
<order-item id="31">
<product>222</product>
<quantity>7</quantity>
<price>5.20</price>
</order-item>
<order-item id="31">
<product>111</product>
<quantity>2</quantity>
<price>8.90</price>
</order-item>
<order-item id="31">
<product>222</product>
<quantity>7</quantity>
<price>5.20</price>
</order-item>
<order-item id="31">
<product>222</product>
<quantity>7</quantity>
<price>5.20</price>
</order-item>
</order-items>
</order>
この XML ソースデータに対して、outputMimeType
値で collectionPath=order.order-items
を設定することにより、ストリーミング単位を <order-item/>
に設定できます。
<flow name="dw-streaming-example" >
<http:listener doc:name="Listener"
outputMimeType="application/xml; collectionpath=order.order-items; streaming=true"
config-ref="HTTP_Listener_config" path="/input"/>
</flow>
streaming=true
と collectionPath
値の両方を設定する必要があります。いずれかがない場合、コンテンツはストリーミングされません。
次の DataWeave スクリプトは、各 <order-items/>
要素をストリーム可能単位として使用することで、XML 入力をストリーミングします。
%dw 2.0
output application/xml
---
{
salesorder: {
itemList: payload.order."order-items".*"order-item" map {
("i_" ++ $$) : {
id: $.@id,
productId: $.product,
quantity: $.quantity,
price: $.price
}
}
}
}
このスクリプトの XML 出力は次のようになります。
<?xml version='1.0' encoding='UTF-8'?>
<salesorder>
<itemList>
<i_0>
<id>31</id>
<quantity>2</quantity>
<productId>111</productId>
<price>8.90</price>
</i_0>
</itemList>
<itemList>
<i_1>
<id>31</id>
<quantity>7</quantity>
<productId>222</productId>
<price>5.20</price>
</i_1>
</itemList>
<itemList>
<i_2>
<id>31</id>
<quantity>2</quantity>
<productId>111</productId>
<price>8.90</price>
</i_2>
</itemList>
<itemList>
<i_3>
<id>31</id>
<quantity>7</quantity>
<productId>222</productId>
<price>5.20</price>
</i_3>
</itemList>
<itemList>
<i_4>
<id>31</id>
<quantity>7</quantity>
<productId>222</productId>
<price>5.20</price>
</i_4>
</itemList>
</salesorder>
コードが入力ストリームを正常に処理できることを確認するため、DataWeave は次の高度で実験的なアノテーションと関連ディレクティブを提供しています。
@StreamCapable()
このアノテーションを使用して、スクリプトが変数 (特に payload
変数) に順次アクセスするかどうかを確認してください。
input
ディレクティブ:
@StreamCapable()
アノテーションでは、データソースの MIME タイプを識別するための入力ディレクティブを DataWeave スクリプトで使用する必要があります (例: input payload application/xml
)。
DataWeave バリデーター (スクリプト内の @StreamCapable
アノテーションでトリガーされます) は、スクリプトが以下の条件を満たしているかどうかを確認します。
変数が 1 回だけ参照される。
インデックスセレクターが負の値にアクセスしていない (例: [-1]
)。
ネストされたラムダで変数への参照がない。
すべての条件が満たされていれば、選択されたデータはストリーム可能です。
次の例は、正常に検証されます。このスクリプトは、JSON ストリームセクションのJSON 入力を処理します。
%dw 2.0
@StreamCapable()
input payload application/json
output application/json
---
payload.family filter (member) -> member.age > 3
スクリプトは検証を正しく実行し、次の出力を返します。
[
{
"name": "Pedro",
"age": 4
},
{
"name": "Matias",
"age": 8
}
]
いずれかの条件が満足していないとバリデーターは検証に失敗します。
ストリーミングが機能しても検証に失敗する場合がありますので注意してください。指定したデータソースの入力変数に順次アクセスするようにスクリプトを作成しても、スクリプトが機能しない場合があります。たとえば、JSON では、オブジェクト内のキーの順序に制約を設けていません。JSON ドキュメント内のキーがスクリプトの想定とは異なる順序で入力されていると、ストリーミングは失敗します。アノテーションプロセッサーは、書式設定のルールに従い、キーが常に同じ順序で入力されるとは想定できません。
スクリプトが同じ変数を複数回参照しようとすると、検証は失敗します。
次のスクリプトは、JSON ストリームセクションのJSON 入力を処理します。スクリプトが payload
変数を複数回参照しようとしているため、検証は失敗します:
%dw 2.0
@StreamCapable()
input payload application/json
output application/json
---
{
family: payload.family filter (member) -> member.age > 3,
name: payload.name
}
スクリプトは次のエラーを返して失敗します。
4| input payload application/json streaming=true
^^^^^^^
Parameter `payload` is not stream capable.
Reasons:
- Variable payload is referenced more than once. Locations:
---------------------------
8| family: payload.family filter (member) -> member.age > 3,
^^^^^^^
---------------------------
9| name: payload.name
^^^^^^^ at
4| input payload application/json streaming=true
スクリプトが、変数が定義されているスコープとは異なるスコープの変数を参照しようとすると、検証は失敗します。
次のスクリプトは、payload
変数がラムダ式 [1,2,3] map ((item, index) → payload)
内から参照されているために失敗します。式は [1] map ((item, index) → payload
ですが、payload
が間違ったスコープにあるため、ストリーミングは失敗します。
%dw 2.0
@StreamCapable()
input payload application/json
output application/json
---
[1,2,3] map ((item, index) -> payload)
この例は次のエラーを返して失敗します。
4| input payload application/json
^^^^^^^
Parameter `payload` is not stream capable.
Reasons:
- Variable payload is referenced in a different scope from where it was defined. Locations:
---------------------------
9| [1,2,3] map ((item, index) -> payload)
^^^^^^^^^^^^^^^^^^^^^^^^ at
4| input payload application/json
ストリーミングデータの処理後は、別のメッセージプロセッサーに出力を直接ストリーミングすることができます。この動作を簡単に実装するには、deferred
ライタープロパティを DataWeave スクリプトの output ディレクティブで使用します (例: output application/json deferred=true
)。
deferred = true
を設定すると、例外は処理されません。たとえば、フローで例外が発生したときに、Studio でこの動作を確認できます。Studio デバッグモードでアプリケーションを実行中に、deferred = true
に設定されていて Transform Message コンポーネントで例外が発生した場合、コンソールにエラーは記録されますが、フローは Tranform Message コンポーネントで停止しません。
「JSON のストリーミング」の例に基づいた次のフローは、DataWeave スクリプトを使用して、ストリーミングされた入力を絞り込み、出力を Write 操作に直接ストリーミングします。
<flow name="dw-streamingexample">
<file:listener doc:name="On New or Updated File"
config-ref="File_Config" directory="/Users/me/testing/json" recursive="false" outputMimeType="application/json;
streaming=true">
<scheduling-strategy>
<fixed-frequency timeUnit="SECONDS" />
</scheduling-strategy>
<file:matcher />
</file:listener>
<ee:transform doc:name="Transform Message">
<ee:message>
<ee:set-payload><![CDATA[%dw 2.0
@StreamCapable()
input payload application/json
output application/json deferred = true
---
{
family: payload.family filter (member) -> member.age > 1
}]]></ee:set-payload>
</ee:message>
</ee:transform>
<file:write doc:name="Write"
config-ref="File_Config2"
path="/Users/me/testing/output.json"/>
</flow>
フローは次の設定を使用します。
リスナー (<file:listener>
) は streaming=true
を使用して、受信した JSON データをストリームします。
<ee:transform/>
の DataWeave スクリプトは、ストリーミングデータのレコードを絞り込み、deferred = true
プロパティを使用して、結果レコードをフロー内の次のプロセッサーに直接ストリーミングします。
フロー内の次のコンポーネントである <file:write/>
は、絞り込まれたストリームを直接受け取り、レコードをファイルに書き込みます。