Streaming in Mule
- June 07, 2022
Introduction
This tutorial will help you understand the concept of streaming in Mule apps:
- How to stream huge volumes of data using different streaming strategies
- How to stream in DataWeave
What is streaming in Mule apps?
Streaming supports the efficient processing of large data volumes through Mule, rather than reading the data into memory. These objects can include files, documents and records.
The advantages of streaming include:
- Allowing flows to consume very large messages efficiently
- Not reading message payloads into memory
- Using simple routing rules based on message metadata
- Combining streaming and non-streaming endpoints
Data streams consumption
Mule 3 consumes data streams in a traditional way. Data streams cannot be consumed more than once, nor can they be consumed at the same time.
In the example below:
- The flow shows the HTTP Listener source that receives a POST method with a body payload to write to the files. The flow writes the first file correctly while the second file is created with empty content because each component that consumes a stream expects to receive a new stream.
- The flow uses a Scatter-Gather router to split a data stream and simultaneously log and write the payload to a file. The application gets some parts of the stream in the file and the rest in the log because different processor chains cannot process the data stream content simultaneously.
After the first File Write operation consumes the stream, the second File Write operation receives an empty stream. So, the second operation has no content to write to a file.
Streaming transformers and filters
Many transformers and filters can read input streams, process the contents and then send them on. Most, however, do not process the stream in real time. Instead, they read the stream, load it into memory, process it and then send it on. So, transformers and filters can become a bottleneck in your application if you regularly stream large files.
The following transformers and filters support true streaming and process the data as streams without loading it into memory first:
- XSLT transformer
- XmlToXMLStreamReader transformer
- DomToOutputHandler transformer (if the incoming XML format is a SAXSource or XMLStreamReader)
- SXC filter
Streams versus iterables
People often confuse these terms. To understand the concept of streaming, it’s important to understand the difference between streams and iterables.
Database and Salesforce connector content, for example, is considered an iterable because the record (database) and the object (Salesforce) can be considered streams. In the case of File, FTP, HTTP and Sockets data, because the data cannot be divided, each of these units needs a buffer. Each buffer acts as one stream transmitted.
Repeatable streams
Mule 4 introduced repeatable streams as the default framework for handling streams. With repeatable streams, you can:
- Read a stream more than once
- Access the stream concurrently
As a component consumes the stream, Mule saves its content into a temporary buffer. The runtime then feeds the component from the temporary buffer, ensuring each component receives the full stream. It does so regardless of how much of the stream any prior component has consumed. This process happens automatically; no special configuration is required, so there’s no need to find workarounds to save the stream elsewhere to access it again.
This configuration automatically fixes the first two Mule 3 examples described above.
All repeatable streams support parallel access. This means you don’t need to worry about whether two components are trying to read the same stream when each component is running on a different thread. Mule automatically ensures that when component A reads the stream it doesn’t generate any side effects in component B.
Streaming strategies
Mule offers three types of streaming strategies:
- Repeatable file-stored stream (this is the default strategy)
- Repeatable in-memory stream
- Non-repeatable streams
Repeatable file-stored stream
File storage is the default streaming strategy in Mule 4. It initially uses an in-memory buffer size of 512kb. For larger streams, the strategy creates a temporary file on the disk to store the contents, so it doesn’t overflow your memory.
Depending on whether you need to handle large or small files, you can change the buffer size (inMemorySize) to optimize performance:
- Configuring a larger buffer size increases performance by avoiding the number of times the runtime needs to write the buffer to the disk. However, it also limits the number of concurrent requests your application can process.
- Configuring a smaller buffer size saves memory load.
You can also set the buffer’s unit of measurement (bufferUnit).
Repeatable in-memory stream
The in-memory strategy is the default configuration in the Mule Kernel (formerly the Mule Runtime Community Edition). It defaults to a buffer size of 512kb. For larger streams, the buffer expands by a default increment of 512kb until it reaches the configured maximum buffer size. If the stream exceeds this limit, the application fails.
You can customize this behavior by setting:
- The initial size of the buffer (initialBufferSize)
- The rate at which the buffer increases (bufferSizeIncrement)
- The maximum buffer size (maxinMemorySize)
- The unit of measurement for the buffer size value (bufferUnit)
Every component in Mule 4 that returns an InputStream or a Streamable collection supports repeatable streams. These components include:
- File connector
- FTP connector
- Database connector
- HTTP connector
- Sockets connector
- SalesForce connector
Repeatable file-stored (iterable)
This configuration is the Mule Enterprise Edition default. This strategy uses a default configured in-memory buffer of 500 objects. If a query returns more results than the buffer size, Mule serializes those objects and writes them to the disk.
You can configure the number of objects Mule stores in the in-memory buffer. The more objects you save in memory, the better performance you get from avoiding writes to disk.
For example, you can set a buffer size of 100 objects in memory for a query from the SalesForce Connector:
Repeatable File Store (Iterable):
<sfdc:query query=”dsql:…”>
<ee:repeatable-file-store-iterable inMemoryObjects=”100″/>
</sfdc:query>
Repeatable in-memory (iterable)
This configuration, which is the Mule Kernel default, configures a default buffer size of 500 objects. If the query result is larger than that, the buffer expands to a default increment size of 100 objects until it reaches the configured maximum buffer size. If the stream exceeds this limit, the app fails.
You can customize the following:
- The initial size of the buffer (initialBufferSize)
- The rate at which the buffer increases (bufferSizeIncrement)
- The maximum buffer size (maxBufferSize)
For example, this configuration sets an in-memory buffer of 100 objects that increments at 100 objects per increment and allows a maximum buffer size of 500 objects.
Repeatable In-Memory (Iterable):
<sfdc:query query=”dsql:…”>
<repeatable-in-memory-iterable
initialBufferSize=”100″
bufferSizeIncrement=”100″
maxBufferSize=”500″ />
</sfdc:query>
Disabling repeatable streaming
You can disable repeatable streaming through the non-repeatable-stream and non-repeatable-iterable strategies. The strategy to use depends on the type of stream.
Use this option only if you’re certain there’s no need to consume the stream several times and only if you need a very tight optimization for performance and resource consumption.
Streaming in DataWeave
DataWeave supports end-to-end streaming through a flow in a Mule application. Streaming speeds the processing of large documents without overloading memory.
DataWeave processes streamed data as its bytes arrive instead of scanning the entire document to index it.
When in deferred mode, DataWeave can also pass streamed output data directly to a message processor without saving it to the disk. This behavior enables DataWeave and Mule to process data faster while consuming fewer resources than the default processes for reading and writing data.
To stream successfully, it’s important to understand that:
- The basic unit of the stream is specific to the data format
- The unit is a record in:
- A CSV document
- An element of an array in a JSON document
- A collection in an XML document
Streaming accesses each unit of the stream sequentially. Streaming does not support random access to a document.
Enabling streaming
Streaming is not enabled by default. You can use two configuration properties to stream data in a supported data format:
- Streaming property, for reading source data as a stream
- Deferred writer property, for passing an output stream directly to the next message processor in a flow
To enable streaming when reading source data, you must set the streaming reader property to true on the data source. You can enable streaming on a data source through the MIME Type setting outputMimeType. You can set the property in a Mule component that accepts source data, such as an HTTP Listener or an On New or Updated File operation.
To pass the streamed output to the next message processor, you can use the output directive in a DataWeave script; for example: output application/json deferred=true
Streaming CSV
Because of its structure, CSV is the simplest streaming format. Each row below the CSV header is a streamable record.
The following CSV example consists of records that contain values for name, last name and age:
name,lastName,age
mariano,achaval,37
leandro,shokida,30
To stream this CSV example, the following script selects values from each record. It uses the map function to iterate over each record in the document.
payload map (record) ->
{
FullName: record.lastName ++ “,” ++
record.name, Age: record.age
}
Although streaming does not support random access to the entire document, a DataWeave script can access data randomly within each record because each record is loaded into memory.
For example, the expression record.lastName “,” record.name, can access a lastName value before it accesses a name value even though the order of values is reversed in the input.
However, streaming does not work in the following script because it requires random access to the entire document to return the elements in a different order than they’re given in the input:
[payload[-2], payload[-1], payload[3]]
Streaming JSON
The unit of a JSON stream is each element in an array. Configuration:
NOTE: DataWeave 2.2.0 support for JSON streaming in Mule 4.2 was limited by the requirement that the root be an array. DataWeave support in Mule 4.3 includes streaming to arrays that are not at the root of the input.
{
“name” :
“Mariano”,
“lastName”:
“Achaval”,
“family”: [
{“name”: “Sara”, “age”: 2},
{“name”: “Pedro”, “age”: 4},
{“name”: “Matias”, “age”: 8}
],
“age”: 37
}
In this example, DataWeave can stream payload.family and perform random access within each element of that array. However, DataWeave cannot randomly access the container object. For example, it isn’t possible to stream { a: payload.age , b: payload.family} because age follows family and DataWeave cannot go backwards.
Streaming XML
XML is more complicated than JSON because there are no arrays in the document.
To enable XML streaming, DataWeave provides the following reader property to define the location in the document to stream:
collectionPath
For example, assume the following XML input:
<order>
<header>
<date>Wed Nov 15 13:45:28 EST 2006</date>
<customer number=”123123″>Joe</customer>
</header>
<order-items>
<order-item id=”31″>
<product>111</product>
<quantity>2</quantity>
<price>8.90</price>
</order-item>
<order-item id=”31″>
<product>222</product>
<quantity>7</quantity>
<price>5.20</price>
</order-item>
</order-items>
</order>
Given this XML source data, you can set the unit of the stream to <order-item> by setting collectionPath=order.order-items in the outputMimeType value:
<flow name=”dw-streaming-example” >
<http:listener doc:name=”Listener”
outputMimeType=”application/xml; collectionpath=order.order-items;
Streaming=true” config-ref=”HTTP_Listener_config” path=”/input”/>
</flow>
NOTE: You need to set both streaming=true and the collectionPath value. If either is missing, DataWeave will not stream the content.
The following DataWeave script streams the XML input using each <order-items> element as the streamable unit:</order-items>
%dw 2.0
output application/xml
—
{
salesorder: {
itemList:
payload.order.”order-items”.*”order-item” map
{ (“i_” ++ $$) : {
id: $.@id,
productId: $.product,
quantity:
$.quantity,
price: $.price
}
}
}
}
Validate a script for streamed data (experimental feature)
To check that your code can process an input stream successfully, DataWeave provides the following advanced, experimental annotation and a related directive:
@StreamCapable()
Use this annotation to validate whether the script can sequentially access a variable (typically, the payload variable).
input directive:
The @StreamCapable() annotation requires the use of an input directive in the DataWeave script that identifies the MIME type of the data source; for example, input payload application/xml.
The DataWeave validator (which the @StreamCapable annotation in the script triggers) checks a script against the following criteria:
- The variable is referenced only once
- No index selector is set for negative access, such as [-1]
- No reference to the variable is found in a nested lambda
If all criteria are met, the selected data is streamable.
The following example validates successfully. The script is designed to act on the JSON input from the JSON streaming section:
%dw 2.0
@StreamCapable()
input payload
application/json output
application/json
—
payload.family filter (member) -> member.age > 3
The script successfully validates and returns the following output:
[
{
“name”
: “Pedro”,
“age”: 4
},
{
“name”:
“Matias”,
“age”: 8
}
]
Validation failures
If any of the criteria that the validator checks is false, the validation fails.
Before proceeding, note that validation can fail in some cases when streaming works. If you write a script in a way that accesses the input variable in a given data source sequentially, streaming works, but that script might not work in all cases.
For example, JSON does not place a restriction on the order of the keys in an object. If the keys in some JSON documents arrive in a different order than the script expects, streaming will fail. The annotation processor follows the rules of the format and cannot assume that the keys will always arrive in the same order.
Error: Variable is referenced more than once
Validation fails if a script attempts to reference the same variable more than once.
The following script is designed to act on the JSON input from the JSON streaming section. Validation fails because the script attempts to reference the payload variable more than once:
Error: Wrong scope reference
Validation fails if a script attempts to reference a variable from a scope that’s different from the scope in which the variable is defined.
The following script fails because the payload variable is referenced from within the lambda expression [1,2,3] map ((item, index) → payload). Even if the expression is [1] map ((item, index) → payload, streaming fails because the payload is in the wrong scope.
Negative index selector isn’t allowed
The above script fails because the payload variable is referenced with a negative index, such as [-1].
Streaming output
After processing the streamed data, you can stream the output directly to another message processor. To facilitate this behavior, use the deferred writer property in the output directive of the DataWeave script; for example, output application/json deferred=true.
NOTE: Exceptions are not handled when you set deferred=true. For example, you can see this behavior in Studio when a flow throws an exception. If you’re running an application in Studio debug mode and an exception occurs in a Transform Message component when deferred=true, the console logs the error but the flow does not stop at the Transform Message component.
Building on the example in JSON streaming — above — the following flow uses a DataWeave script in a transform connector to convert JSON data to XML and write that into the file.
When we don’t give root tags in transform messages, then the error will display on the console but keep the flow working — writing empty content in the file and returning 200 status code to the user.
— By Rohit Singh and Mitali Chaudhary