Contact Free trial Login

Streaming in DataWeave

DataWeave supports end-to-end streaming through a flow in a Mule application. Streaming speeds the processing of large documents without overloading memory.

DataWeave processes streamed data as its bytes arrive instead of scanning the entire document to index it. When in deferred mode, DataWeave can also pass streamed output data directly to a message processor without saving it to the disk. This behavior enables DataWeave and Mule to process data faster and consume fewer resources than the default processes for reading and writing data.

To stream successfully, it is important to understand the following:

  • The basic unit of the stream is specific to the data format.
    The unit is a record in a CSV document, an element of an array in a JSON document, or a collection in an XML document.

  • Streaming accesses each unit of the stream sequentially.
    Streaming does not support random access to a document.

Enabling Streaming

Streaming is not enabled by default. You can use two configuration properties to stream data in a supported data format:

  • streaming property, for reading source data as a stream

  • deferred writer property, for passing an output stream directly to the next message processor in a flow

To enable streaming when reading source data, you must set the streaming reader property to true on the data source. You can enable streaming on a data source through the MIME Type setting outputMimeType. You can set the property in a Mule component that accepts source data, such as an HTTP listener or an On New or Updated File operation.

<flow name="dw-streaming-example" >
  <http:listener doc:name="Listener"
      outputMimeType="application/json; streaming=true"
      config-ref="HTTP_Listener_config" path="/input"/>
</flow>

Notice that streaming=true is part of the outputMimeType value. Other Mule components, such as the File and FTP components, also support the MIME Type setting.

To pass the streamed output to the next message processor, you can use the output directive in a DataWeave script, for example:

output application/json deferred=true

For more detail on use of the deferred property. see Streaming Output.

Streaming CSV

CSV is simplest format for streaming because of its structure. Each row below the CSV header is a streamable record. The following CSV example consists of records that contain name, lastName, and age values: ​

name,lastName,age
mariano,achaval,37
leandro,shokida,30
pedro,achaval,4
christian,chibana,25
sara,achaval,2
matias,achaval,8

​ To stream this CSV example, the following script selects values from each record. It uses the map function to iterate over each record in the document. ​

payload map (record) ->
    {
        FullName: record.lastName ++ "," ++ record.name,
        Age: record.age
    }

Although streaming does not support random access to the entire document, a DataWeave script can access data randomly within each record because each record is loaded into memory. For example, the expression record.lastName "," record.name, can access a lastName value before it accesses a name value even though the order of values is reversed in the input.

However, streaming does not work in the following script. The script requires random access to the entire document to return the elements in a different order than they are given in the input. ​

[payload[-2], payload[-1], payload[3]]

Streaming JSON

​ The unit of a JSON stream is each element in an array.

Note that DataWeave 2.2.0 support for JSON streaming for Mule 4.2 was limited by the requirement that the root be an array. DataWeave support in Mule 4.3 includes streaming to arrays that are not at the root of the input.

{
    "name" : "Mariano",
    "lastName": "Achaval",
    "family": [
        {"name": "Sara", "age": 2},
        {"name": "Pedro", "age": 4},
        {"name": "Matias", "age": 8}
    ],
    "age": 37
}

​ In this example, DataWeave can stream payload.family and perform random access within each element of that array. However, DataWeave cannot randomly access the container object. For example, it is not possible to stream { a: payload.age , b: payload.family} because age follows family, and DataWeave cannot go backwards. ​

Streaming XML

​ XML is more complicated than JSON because there are no arrays in the document.

To enable XML streaming, DataWeave provides the following reader property to define the location in the document to stream:

  • collectionPath

For example, assume the following XML input: ​

<order>
    <header>
        <date>Wed Nov 15 13:45:28 EST 2006</date>
        <customer number="123123">Joe</customer>
    </header>
    <order-items>
        <order-item id="31">
            <product>111</product>
            <quantity>2</quantity>
            <price>8.90</price>
        </order-item>
        <order-item id="31">
            <product>222</product>
            <quantity>7</quantity>
            <price>5.20</price>
        </order-item>
        <order-item id="31">
            <product>111</product>
            <quantity>2</quantity>
            <price>8.90</price>
        </order-item>
        <order-item id="31">
            <product>222</product>
            <quantity>7</quantity>
            <price>5.20</price>
        </order-item>
        <order-item id="31">
            <product>222</product>
            <quantity>7</quantity>
            <price>5.20</price>
        </order-item>
    </order-items>
</order>

​ Given this XML source data, you can set the unit of the stream to <order-item/> by setting collectionPath='order.order-items' in the outputMimeType value:

<flow name="dw-streaming-example" >
  <http:listener doc:name="Listener"
      outputMimeType="application/xml; collectionpath='order.order-items'; streaming=true"
      config-ref="HTTP_Listener_config" path="/input"/>
</flow>

The following DataWeave script streams the XML input using each <order-items/> element as the streamable unit.

%dw 2.0
output application/xml
---
{
  salesorder: {
    itemList: payload.order."order-items".*"order-item" map {
      ("i_" ++ $$) : {
        id: $.@id,
        productId: $.product,
        quantity: $.quantity,
        price: $.price
      }
    }
  }
}

The script produces the following XML output:

<?xml version='1.0' encoding='UTF-8'?>
<salesorder>
  <itemList>
    <i_0>
      <id>31</id>
      <quantity>2</quantity>
      <productId>111</productId>
      <price>8.90</price>
    </i_0>
  </itemList>
  <itemList>
    <i_1>
      <id>31</id>
      <quantity>7</quantity>
      <productId>222</productId>
      <price>5.20</price>
    </i_1>
  </itemList>
  <itemList>
    <i_2>
      <id>31</id>
      <quantity>2</quantity>
      <productId>111</productId>
      <price>8.90</price>
    </i_2>
  </itemList>
  <itemList>
    <i_3>
      <id>31</id>
      <quantity>7</quantity>
      <productId>222</productId>
      <price>5.20</price>
    </i_3>
  </itemList>
  <itemList>
    <i_4>
      <id>31</id>
      <quantity>7</quantity>
      <productId>222</productId>
      <price>5.20</price>
    </i_4>
  </itemList>
</salesorder>

Validate a Script for Streamed Data (Experimental Feature)

To check that your code can process an input stream successfully, DataWeave provides the following advanced, experimental annotation and a related directive:

  • @StreamCapable()
    Use this annotation to validate whether the script can sequentially access a variable (typically the payload variable).

  • input directive:
    The @StreamCapable() annotation requires the use of an input directive in the DataWeave script that identifies the MIME type of the data source, for example, input payload application/xml.

The DataWeave validator (which is triggered by the @StreamCapable annotation in the script) checks a script against the following criteria:

  • The variable is referenced only once.

  • No index selector is set for negative access, such as [-1].

  • No reference to the variable is found in a nested lambda.

If all criteria are met, the selected data is streamable.

The following example validates successfully. The script is designed to act on the JSON input from the JSON streaming section:

%dw 2.0

@StreamCapable()
input payload application/json
output application/json
---
payload.family filter (member) -> member.age > 3

The script successfully validates and returns the following output:

[
  {
    "name": "Pedro",
    "age": 4
  },
  {
    "name": "Matias",
    "age": 8
  }
]

Validation Failures

If any of the criteria that the validator checks is false, the validation fails.

Before proceeding, note that validation can fail in some cases when streaming works. If you write a script in a way that sequentially accesses the input variable in a given data source, streaming works, but that script might not work in all cases. For example, JSON does not place a restriction on the order of the keys in an object. If the keys in some JSON documents arrive in a different order than the script expects, streaming will fail in those cases. The annotation processor follows the rules of the format and cannot assume that the keys always arrive in the same order.

Error: Variable Is Referenced More Than Once

Validation fails if a script attempts to reference the same variable more than once.

The following script is designed to act on the JSON input from the JSON streaming section. Validation fails because the script attempts to reference the payload variable more than once:

%dw 2.0

@StreamCapable()
input payload application/json
output application/json
---
 {
     family: payload.family filter (member) -> member.age > 3,
     name: payload.name
 }

The script fails with the following error:

4| input payload application/json streaming=true
         ^^^^^^^
Parameter `payload` is not stream capable.
Reasons:
 - Variable payload is referenced more than once. Locations:
 ---------------------------

8|      family: payload.family filter (member) -> member.age > 3,
                ^^^^^^^
 ---------------------------

9|      name: payload.name
              ^^^^^^^ at
4| input payload application/json streaming=true

Error: Wrong Scope Reference

Validation fails if a script attempts to reference a variable from a scope that is different from the scope in which the variable is defined.

The following script fails because the payload variable is referenced from within the lambda expression [1,2,3] map ((item, index) → payload). Even if the expression is [1] map ((item, index) → payload, streaming fails because payload is in the wrong scope.

%dw 2.0

@StreamCapable()
input payload application/json
output application/json
---
[1,2,3] map ((item, index) -> payload)

The example fails with the following error:

4| input payload application/json
         ^^^^^^^
Parameter `payload` is not stream capable.
Reasons:
 - Variable payload is referenced in a different scope from where it was defined. Locations:
 ---------------------------

9| [1,2,3] map ((item, index) -> payload)
                ^^^^^^^^^^^^^^^^^^^^^^^^ at
4| input payload application/json

Streaming Output

After processing streamed data, you can stream the output directly to another message processor. To facilitate this behavior, use the deferred writer property in the output directive of the DataWeave script, for example, output application/json deferred=true.

NOTE

Exceptions are not handled when you set deferred = true. For example, you can see this behavior in Studio when a flow throws an exception. If you are running an application in Studio debug mode and an exception occurs in a Transform Message component when deferred = true, the console logs the error, but the flow does not stop at the Tranform Message component.

Building on the example in JSON Streaming, the following flow uses a DataWeave script to filter streamed input and then streams the output directly to a Write operation:

<flow name="dw-streamingexample">
  <file:listener doc:name="On New or Updated File"
    config-ref="File_Config" directory="/Users/me/testing/json" recursive="false" outputMimeType="application/json;
    streaming=true">
    <scheduling-strategy>
      <fixed-frequency timeUnit="SECONDS" />
    </scheduling-strategy>
    <file:matcher />
  </file:listener>
  <ee:transform doc:name="Transform Message">
    <ee:message>
      <ee:set-payload><![CDATA[%dw 2.0

@StreamCapable()
input payload application/json
output application/json deferred = true
---
{
   family: payload.family filter (member) -> member.age > 1
}]]></ee:set-payload>
    </ee:message>
  </ee:transform>
  <file:write doc:name="Write"
      config-ref="File_Config2"
      path="/Users/me/testing/output.json"/>
</flow>

The flow provides the following configuration:

  1. The listener (<file:listener>) uses streaming=true to stream the incoming JSON data.

  2. The DataWeave script in <ee:transform/> filters records in the streamed data and uses the deferred = true property to stream the resulting records directly to the next processor in the flow.

  3. The next component in the flow, <file:write/>, receives the filtered stream directly and writes the records to a file.

Was this article helpful?

💙 Thanks for your feedback!

Edit on GitHub