Streaming in Mule Apps
Mule 4 introduces a new framework to work with streamed data. To understand the changes introduced in Mule 4, it is necessary to understand how traditional data streams are consumed:
Data streams cannot be consumed more than once.
For example, consider the following flow:
In a Mule 3 app, this flow results in writing the first file correctly, while the second is created with empty content. This happens because each component that consumes a stream expects to receive a new stream. After the stream is consumed by the first Write operation, the second Write operation receives an empty stream, so it has no content to write to a file.
Something similar happened when trying to log the payload before and after a DataWeave transformation. Consider this example:
This app logs the payload before the Transform Message processor, but does not log the resulting payload after this because the Logger consumes the stream, loading it into memory. When the stream gets to the Transform Message processor, the stream content is available in the memory, so it is possible for the Transform Message processor to consume it. However, after this, the second Logger receives an empty stream.
Data Streams cannot be consumed at the same time.
Consider a Mule 3 app that uses a Scatter-Gather router to split a data stream and simultaneously log and write the payload to a file.
This app fails because the streamed content cannot be processed by different processor chains simultaneously.
Mule 4 provides Repeatable Streams as its default framework for handling streams.
Repeatable Streams enable you to:
Read a stream more than once.
Have concurrent access to the stream.
As a component consumes the stream, Mule saves its content into a temporary buffer. The runtime then feeds the component from the temporary buffer, ensuring that each component receives the full stream, regardless of how much of the stream was already consumed by any prior component. This happens automatically and requires no special configurations from your end, which saves you the trouble of finding workarounds to save the stream somewhere so you can access it again. This configuration automatically fixes the first two Mule 3 examples outlined above.
All repeatable streams support parallel access. This means that you don’t need to worry about whether two components are trying to read the same stream when each component is running on a different thread. Mule automatically makes sure that when component
A reads the stream it doesn’t generate any side effects in component
B. This enables you to perform tasks like the one described in the third example above.
You can configure how Mule handles the repeatable stream by using streaming strategies.
This is the default streaming strategy in Mule Runtime Enterprise Edition.
This option is only available on Mule Enterprise Edition.
This strategy initially uses an in-memory buffer size of 512 KB. If the stream is larger than that, it creates a temporary file on your disk to store the contents without overflowing your memory.
When you know you need to deal with large or small files, you can change the buffer size to optimize performance. Configuring a bigger buffer size increases performance by avoiding the number of times the runtime needs to write the buffer to your disk, but it also limits the number of concurrent requests your application can process.
The same way, configuring a smaller buffer size saves you memory load.
You can even set the buffer’s unit of measurement, so you don’t have to go through unit conversions.
For example, if you know that you are going to read a file that’s always around 1 MB size, you can configure a 1 MB buffer:
<file:read path="bigFile.json"> <repeatable-file-store-stream inMemorySize="1" bufferUnit="MB"/> </file:read>
Or if you know you are always processing a file no bigger than 10 KB, you can save memory:
<file:read path="smallFile.json"> <repeatable-file-store-stream inMemorySize="10" bufferUnit="KB"/> </file:read>
Based on performance test, the default 512 KB buffer size configuration of this strategy does not significantly impact performance in most scenarios. You need to run tests and find the proper buffer size configuration that fits your needs.
This configuration is the default for Mule Runtime Community Edition.
It uses a default configured buffer size of 512 KB. If the stream is larger than that, the buffer is expanded to a default increment size of 512 KB until it reaches the configured maximum buffer size. If the stream exceeds this limit, the application fails.
You can customize this behavior by setting the initial size of the buffer, the rate at which the buffer increases, the maximum buffer size, and the measurement unit.
For example, these settings configure an in-memory repeatable stream with a 512 KB initial size, which grows at a rate of 256 KB and allows up to 2 MB of content in memory:
<file:read path="exampleFile.json"> <repeatable-in-memory-stream initialBufferSize="512" bufferSizeIncrement="256" maxinMemorySize="2000" bufferUnit="KB"/> </file:read>
Based on performance test, the default 512 KB buffer size, and 512 KB increment size configuration of this strategy does not significantly impact performance in most scenarios.
You need to run tests and find the proper buffer size and size increment configuration that fits your needs.
This strategy disables repeatable streams. It allows you to read an input stream only once. In case your use case does not really require the extra memory or performance overhead that come with repeatable stream. Since the stream is not being saved to memory, this is the most performant strategy that you can use.
<file:read path="exampleFile.json"> <non-repeatable-stream /> </file:read>
Having this kind of configuration allows your flows to fail promptly if there’s a component in the configuration that is trying to access a big input stream before the actual streaming component is executed.
Every component in Mule 4 that returns an InputStream or a Streamable collection supports repeatable streams.
Some of these components are:
A similar scenario happens when an Anypoint Connector is configured to use auto-paging. Mule 4 automatically handles the paged output of the connector using Repeatable Auto Paging.
This framework is similar to repeatable streams, as the connector receives the object, Mule sets a configurable in-memory buffer to save the object.
However, while repeatable streams measure the buffer size in byte measurements, when handling objects the runtime measures the buffer size using instance counts.
|When streaming objects, the in-memory buffer size is measured in instance counts.|
When calculating the in-memory buffer size for repeatable auto-paging, you need to estimate how much memory space each instance takes to avoid running out of memory.
As with repeatable streams, you can use different strategies to configure how Mule handles the repeatable auto paging:
This configuration is the default for Mule Runtime Enterprise Edition.
It uses a default configured in-memory buffer of 500 objects. If your query returns more results than the buffer size, Mule serializes those objects and writes them to your disk.
You can configure the number of objects Mule stores in the in-memory buffer. The more objects you save in memory, the better performance you get by avoiding writing to disk.
For example, you can set a buffer size of 100 objects in memory for a query from the Salesforce Connector:
<sfdc:query query="dsql:..."> <ee:repeatable-file-store-iterable inMemoryObjects="100"/> </sfdc:query>
This interface uses the Kryo framework to serialize objects so it can write them to your disk.
Plain old Java serialization fails if the object does not implement the Serializable interface. However if serialization contains another object that doesn’t implement the Serializable interface, Kryo is likely (but not guaranteed) to succeed. For example, a POJO containing an
org.apache.xerces.jaxp.datatype.XMLGregorianCalendarImpl. Although Kryo serializer allows Mule to serialize objects that the JVM cannot serialize by default, some things can’t be serialized. It’s recommended to keep your objects simple.
This option is only available on Mule EE.
This configuration is the default for Mule Runtime Community Edition.
It uses a default configured buffer size of 500 Objects. If the query result is larger than that, the buffer is expanded to a default increment size of 100 objects until it reaches the configured maximum buffer size. If the stream exceeds this limit, the application fails.
You can customize the initial size of the buffer, the rate at which the buffer increases, and the maximum buffer size.
For example, this configuration would set an in-memory buffer of 100 objects, that increments per 100 objects and allow a maximum size of 500 objects.
<sfdc:query query="dsql:..."> <repeatable-in-memory-iterable initialBufferSize="100" bufferSizeIncrement="100" maxBufferSize="500" /> </sfdc:query>