DataWeave でのストリーミング

DataWeave は、Mule アプリケーション内のフローを通してエンドツーエンドのストリーミングをサポートします。ストリーミングにより、大きなドキュメントでもメモリをオーバーフローさせることなく処理をスピードアップできます。

DataWeave は、ドキュメント全体をスキャンしてインデックス化するのではなく、ストリーミングされたデータのバイトが到着するたびに処理します。遅延モードでは、DataWeave はストリーミングされた出力データをディスクに保存せずに、メッセージプロセッサーに直接渡すこともできます。この動作によって DataWeave と Mule は、デフォルトのプロセスでデータを読み書きするよりも速くデータを処理し、リソース消費量を少なく抑えることができます。

データを正常にストリーミングするためには、以下を理解することが重要です。

  • ストリームの基本単位はデータ形式ごとに異なります。
    CSV ドキュメントであればレコード、JSON ドキュメントであれば配列の要素、XML ドキュメントであればコレクションが基本単位となります。

  • ストリーミングでは、各ストリーム単位に順次アクセスします。
    ストリーミングでは、ドキュメントへのランダムアクセスはサポートされません。

ストリーミングの有効化

ストリーミングはデフォルトでは有効化されていません。2 つの設定プロパティを使用して、サポートされているデータ形式でデータをストリーミングできます。

  • ソースデータをストリームとして読み取るための ​streaming​ プロパティ

  • 出力ストリームをフロー内の次のメッセージプロセッサーに直接渡すための ​deferred​ ライタープロパティ

ストリーミングでソースデータを読み取れるようにするには、データソースで ​streaming​ リーダープロパティを ​true​ に設定する必要があります。データソースでストリーミングを有効化するには、​MIME タイプ​設定の ​outputMimeType​ を使用します。このプロパティは、HTTP リスナーや On New 操作または Updated File 操作など、ソースデータを受け入れる Mule コンポーネントで設定できます。

<flow name="dw-streaming-example" >
  <http:listener doc:name="Listener"
      outputMimeType="application/json; streaming=true"
      config-ref="HTTP_Listener_config" path="/input"/>
</flow>

streaming=true​ は ​outputMimeType​ 値の一部ですので注意してください。File コンポーネントや FTP コンポーネントなどの他の Mule コンポーネントも MIME タイプ設定をサポートします。

ストリーミングされた出力を次のメッセージプロセッサーに渡すには、DataWeave スクリプトで ​output​ ディレクティブを次のように使用します。

output application/json deferred=true

deferred​ プロパティの使い方の詳細は、​出力のストリーミング​ を参照してください。

CSV のストリーミング

CSV は、その構造により最もシンプルなストリーミング形式となります。CSV ヘッダーの下にある各行は、ストリーム可能なレコードです。次の CSV の例は、​name​、​lastName​、​age​ の各値を含むレコードから構成されています。 ​

name,lastName,age
mariano,achaval,37
leandro,shokida,30
pedro,achaval,4
christian,chibana,25
sara,achaval,2
matias,achaval,8

この CSV の例をストリーミングするため、次のスクリプトは各レコードから値を選択しています。このスクリプトは、​map​ 関数を使用して、ドキュメント内の各レコードを反復処理しています。 ​

payload map (record) ->
    {
        FullName: record.lastName ++ "," ++ record.name,
        Age: record.age
    }

ストリーミングでは、ドキュメント全体へのランダムアクセスはサポートされませんが、各レコードはメモリに読み込まれるため、DataWeave スクリプトは​各レコード内の​データにはランダムにアクセスできます。たとえば、​record.lastName "," record.name,​ という式により、入力での値の順序が逆であっても、​name​ 値にアクセスする前に ​lastName​ 値にアクセスできます。

ただし、ストリーミングは次のスクリプトでは機能しません。入力とは異なる順序で要素を返すためには、スクリプトでドキュメント全体へのランダムアクセスが必要になるためです。 ​

[payload[-2], payload[-1], payload[3]]

JSON のストリーミング

JSON ストリーミングの単位は配列の各要素です。

DataWeave 2.2.0 を使用して Mule 4.2 で JSON をストリーミングする場合には、配列が入力のルートに存在する必要があります。Mule 4.3 で JSON をストリーミングする場合には、配列が入力のルートに存在しなくても構いません。

{
    "name" : "Mariano",
    "lastName": "Achaval",
    "family": [
        {"name": "Sara", "age": 2},
        {"name": "Pedro", "age": 4},
        {"name": "Matias", "age": 8}
    ],
    "age": 37
}

この例では、DataWeave は ​payload.family​ をストリーミングして、その配列の各要素にランダムアクセスします。ただし、DataWeave はコンテナオブジェクトにはランダムアクセスすることはできません。たとえば、​age​ は ​family​ の後にあり、DataWeave は後戻りすることはできないため、​{ a: payload.age , b: payload.family}​ をストリーミングすることはできません。 ​

XML のストリーミング

​ XML ではドキュメントに配列が含まれないため、JSON より複雑になります。

XML をストリーミングするため、DataWeave は次のリーダープロパティを使用して、ストリーミングするドキュメント内の場所を定義します。

  • collectionPath

たとえば、次の XML 入力があるとします。 ​

<order>
    <header>
        <date>Wed Nov 15 13:45:28 EST 2006</date>
        <customer number="123123">Joe</customer>
    </header>
    <order-items>
        <order-item id="31">
            <product>111</product>
            <quantity>2</quantity>
            <price>8.90</price>
        </order-item>
        <order-item id="31">
            <product>222</product>
            <quantity>7</quantity>
            <price>5.20</price>
        </order-item>
        <order-item id="31">
            <product>111</product>
            <quantity>2</quantity>
            <price>8.90</price>
        </order-item>
        <order-item id="31">
            <product>222</product>
            <quantity>7</quantity>
            <price>5.20</price>
        </order-item>
        <order-item id="31">
            <product>222</product>
            <quantity>7</quantity>
            <price>5.20</price>
        </order-item>
    </order-items>
</order>

​ この XML ソースデータに対して、​outputMimeType​ 値で ​collectionPath=order.order-items​ を設定することにより、ストリーミング単位を ​<order-item/>​ に設定できます。

<flow name="dw-streaming-example" >
  <http:listener doc:name="Listener"
      outputMimeType="application/xml; collectionpath=order.order-items; streaming=true"
      config-ref="HTTP_Listener_config" path="/input"/>
</flow>

streaming=true​ と ​collectionPath​ 値の両方を設定する必要があります。いずれかがない場合、コンテンツはストリーミングされません。 次の DataWeave スクリプトは、各 ​<order-items/>​ 要素をストリーム可能単位として使用することで、XML 入力をストリーミングします。

%dw 2.0
output application/xml
---
{
  salesorder: {
    itemList: payload.order."order-items".*"order-item" map {
      ("i_" ++ $$) : {
        id: $.@id,
        productId: $.product,
        quantity: $.quantity,
        price: $.price
      }
    }
  }
}

このスクリプトの XML 出力は次のようになります。

<?xml version='1.0' encoding='UTF-8'?>
<salesorder>
  <itemList>
    <i_0>
      <id>31</id>
      <quantity>2</quantity>
      <productId>111</productId>
      <price>8.90</price>
    </i_0>
  </itemList>
  <itemList>
    <i_1>
      <id>31</id>
      <quantity>7</quantity>
      <productId>222</productId>
      <price>5.20</price>
    </i_1>
  </itemList>
  <itemList>
    <i_2>
      <id>31</id>
      <quantity>2</quantity>
      <productId>111</productId>
      <price>8.90</price>
    </i_2>
  </itemList>
  <itemList>
    <i_3>
      <id>31</id>
      <quantity>7</quantity>
      <productId>222</productId>
      <price>5.20</price>
    </i_3>
  </itemList>
  <itemList>
    <i_4>
      <id>31</id>
      <quantity>7</quantity>
      <productId>222</productId>
      <price>5.20</price>
    </i_4>
  </itemList>
</salesorder>

ストリーミングされたデータのスクリプトの検証 (実験的機能)

コードが入力ストリームを正常に処理できることを確認するため、DataWeave は次の​高度で実験的な​アノテーションと関連ディレクティブを提供しています。

  • @StreamCapable()
    このアノテーションを使用して、スクリプトが変数 (特に ​payload​ 変数) に順次アクセスするかどうかを確認してください。

  • input​ ディレクティブ:
    @StreamCapable()​ アノテーションでは、データソースの MIME タイプを識別するための入力ディレクティブを DataWeave スクリプトで使用する必要があります (例: input payload application/xml​)。

DataWeave バリデーター (スクリプト内の ​@StreamCapable​ アノテーションでトリガーされます) は、スクリプトが以下の条件を満たしているかどうかを確認します。

  • 変数が 1 回だけ参照される。

  • インデックスセレクターが負の値にアクセスしていない (例: [-1]​)

  • ネストされたラムダで変数への参照がない。

すべての条件が満たされていれば、選択されたデータはストリーム可能です。

次の例は、正常に検証されます。このスクリプトは、​JSON ストリーム​セクションの​JSON 入力​を処理します。

%dw 2.0

@StreamCapable()
input payload application/json
output application/json
---
payload.family filter (member) -> member.age > 3

スクリプトは検証を正しく実行し、次の出力を返します。

[
  {
    "name": "Pedro",
    "age": 4
  },
  {
    "name": "Matias",
    "age": 8
  }
]

検証の失敗

いずれかの条件が満足していないとバリデーターは検証に失敗します。

ストリーミングが機能しても検証に失敗する場合がありますので注意してください。指定したデータソースの入力変数に順次アクセスするようにスクリプトを作成しても、スクリプトが機能しない場合があります。たとえば、JSON では、オブジェクト内のキーの順序に制約を設けていません。JSON ドキュメント内のキーがスクリプトの想定とは異なる順序で入力されていると、ストリーミングは失敗します。アノテーションプロセッサーは、書式設定のルールに従い、キーが常に同じ順序で入力されるとは想定できません。

Error: Variable Is Referenced More Than Once (エラー: 変数が複数回参照されています)

スクリプトが同じ変数を複数回参照しようとすると、検証は失敗します。

次のスクリプトは、​JSON ストリーム​セクションの​JSON 入力​を処理します。スクリプトが ​payload​ 変数を複数回参照しようとしているため、検証は失敗します:

%dw 2.0

@StreamCapable()
input payload application/json
output application/json
---
 {
     family: payload.family filter (member) -> member.age > 3,
     name: payload.name
 }

スクリプトは次のエラーを返して失敗します。

4| input payload application/json streaming=true
         ^^^^^^^
Parameter `payload` is not stream capable.
Reasons:
 - Variable payload is referenced more than once. Locations:
 ---------------------------

8|      family: payload.family filter (member) -> member.age > 3,
                ^^^^^^^
 ---------------------------

9|      name: payload.name
              ^^^^^^^ at
4| input payload application/json streaming=true

Error: Wrong Scope Reference (エラー: スコープ参照が正しくありません)

スクリプトが、変数が定義されているスコープとは異なるスコープの変数を参照しようとすると、検証は失敗します。

次のスクリプトは、​payload​ 変数がラムダ式 ​[1,2,3] map ((item, index) → payload)​ 内から参照されているために失敗します。式は ​[1] map ((item, index) → payload​ ですが、​payload​ が間違ったスコープにあるため、ストリーミングは失敗します。

%dw 2.0

@StreamCapable()
input payload application/json
output application/json
---
[1,2,3] map ((item, index) -> payload)

この例は次のエラーを返して失敗します。

4| input payload application/json
         ^^^^^^^
Parameter `payload` is not stream capable.
Reasons:
 - Variable payload is referenced in a different scope from where it was defined. Locations:
 ---------------------------

9| [1,2,3] map ((item, index) -> payload)
                ^^^^^^^^^^^^^^^^^^^^^^^^ at
4| input payload application/json

出力のストリーミング

ストリーミングデータの処理後は、別のメッセージプロセッサーに出力を直接ストリーミングすることができます。この動作を簡単に実装するには、​deferred​ ライタープロパティを DataWeave スクリプトの output ディレクティブで使用します (例: output application/json deferred=true​)。

NOTE

deferred = true​ を設定すると、例外は処理されません。たとえば、フローで例外が発生したときに、Studio でこの動作を確認できます。Studio デバッグモードでアプリケーションを実行中に、​deferred = true​ に設定されていて Transform Message コンポーネントで例外が発生した場合、コンソールにエラーは記録されますが、フローは Tranform Message コンポーネントで停止しません。

「JSON のストリーミング」​の例に基づいた次のフローは、DataWeave スクリプトを使用して、ストリーミングされた入力を絞り込み、出力を Write 操作に直接ストリーミングします。

<flow name="dw-streamingexample">
  <file:listener doc:name="On New or Updated File"
    config-ref="File_Config" directory="/Users/me/testing/json" recursive="false" outputMimeType="application/json;
    streaming=true">
    <scheduling-strategy>
      <fixed-frequency timeUnit="SECONDS" />
    </scheduling-strategy>
    <file:matcher />
  </file:listener>
  <ee:transform doc:name="Transform Message">
    <ee:message>
      <ee:set-payload><![CDATA[%dw 2.0

@StreamCapable()
input payload application/json
output application/json deferred = true
---
{
   family: payload.family filter (member) -> member.age > 1
}]]></ee:set-payload>
    </ee:message>
  </ee:transform>
  <file:write doc:name="Write"
      config-ref="File_Config2"
      path="/Users/me/testing/output.json"/>
</flow>

フローは次の設定を使用します。

  1. リスナー (​<file:listener>​) は ​streaming=true​ を使用して、受信した JSON データをストリームします。

  2. <ee:transform/>​ の DataWeave スクリプトは、ストリーミングデータのレコードを絞り込み、​deferred = true​ プロパティを使用して、結果レコードをフロー内の次のプロセッサーに直接ストリーミングします。

  3. フロー内の次のコンポーネントである ​<file:write/>​ は、絞り込まれたストリームを直接受け取り、レコードをファイルに書き込みます。