Nav
You are viewing an older version of this section. Click here to navigate to the latest version.

DataWeave Streaming

DataWeave supports the processing of files without the need to load the entire content into memory. All input formats, with the exception of XML, and all output formats are supported. This is especially helpful when dealing with large files. No additional configuration is required to support this streaming. As long as the file input is a stream, DataWeave will handle it as such. All output types are automatically handled as streams.

Input Streams

In order for DataWeave to process the file stream, the input must enter DataWeave as a stream. For example, flows that start with a file transport inbound endpoint must have streaming enabled.

The example below details a CSV file input that will be processed as a stream. Its output is then written to another file in JSON format.

file_to_file

=== The File Connector automatically enables streaming. ===

    
            
         
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
<?xml version="1.0" encoding="UTF-8"?>

<mule
  xmlns:file="http://www.mulesoft.org/schema/mule/file"
  xmlns:dw="http://www.mulesoft.org/schema/mule/ee/dw"
  xmlns="http://www.mulesoft.org/schema/mule/core"
  xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
      http://www.mulesoft.org/schema/mule/file http://www.mulesoft.org/schema/mule/file/current/mule-file.xsd
      http://www.mulesoft.org/schema/mule/ee/dw http://www.mulesoft.org/schema/mule/ee/dw/current/dw.xsd">

  <file:connector name="File" autoDelete="true" streaming="true" validateConnections="true" doc:name="File" />

  <flow name="file-to-file">
    <file:inbound-endpoint path="C:\input" connector-ref="File" responseTimeout="10000" doc:name="Read CSV" />
    <dw:transform-message doc:name="Transform Message">
      <dw:set-payload>
        <![CDATA[
          %dw 1.0
          %output application/json
          ---
          {
            people: payload map ((payload01 , indexOfPayload01) -> {
            id: payload01.id as :number,
            firstName: payload01.firstName,
            lastName: payload01.lastName
          })
        }]]>
      </dw:set-payload>
    </dw:transform-message>
    <file:outbound-endpoint responseTimeout="10000" doc:name="Write as JSON" connector-ref="File" path="C:\output" />
  </flow>

</mule>

Output Streams and The WeaveOutputHandler

After transformation, DataWeave returns a WeaveOutputHandler. This handler is capable of deferring writing the Mule Message’s payload until there is a stream available to write it to. This allows for the DataWeave output to remain outside of the heap as processing continues.

Keep in mind that when inspecting your payload, this behavior makes things work differently. Getting a the message’s payload will return a WeaveOutputHandler object rather than a String representation. For example, consider a logger you wish to log your payload with:

log DW output

The output of this logger will appear as the following:


         
      
1
2
3
4
5
6
org.mule.transport.file.FileMessageReceiver: Lock obtained on file: /Users/mulesoft/inputCSV.csv
org.mule.api.processor.LoggerMessageProcessor: Result was:
com.mulesoft.weave.mule.WeaveMessageProcessor$WeaveOutputHandler@3528770d
Initialising: 'File.dispatcher.657964187'. Object is: FileMessageDispatcher
org.mule.lifecycle.AbstractLifecycleManager: Starting: 'File.dispatcher.657964187'. Object is: FileMessageDispatcher
org.mule.transport.file.FileConnector: Writing file to: /Users/mulesoft/output/inputCSV.csv

If you wish to log the payload as a String representation, you’ll need to request the payload in a String representation. This can be achieved by using the expression #[message.payloadAs(java.lang.String)].

log DW output as String

This expression is the equivalent of consuming the DataWeave output and transforming it into a String. Even when this expression is used in the context of a logger. The payload will reach the next processor as a String. It’s also important to note that once consumed as such, the entire payload will exist in memory.

The output of this logger will appear as the following.


         
      
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
org.mule.transport.file.FileMessageReceiver: Lock obtained on file: /Users/josh/inputCSV.csv
org.mule.api.processor.LoggerMessageProcessor: Result was:
{
  "people": [
    {
      "id": 1,
      "firstName": "Max",
      "lastName": "Mule"
    },
    {
      "id": 2,
      "firstName": "Sally",
      "lastName": "Mule"
    }
  ]
}
org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'File.dispatcher.2036619369'. Object is: FileMessageDispatcher
org.mule.lifecycle.AbstractLifecycleManager: Starting: 'File.dispatcher.2036619369'. Object is: FileMessageDispatcher
org.mule.transport.file.FileConnector: Writing file to: /Users/josh/output/inputCSV.csv