Processing large amounts of data using MuleSoft
- August 31, 2021
In this article, we’ll talk about how to process large amounts of data using MuleSoft. There are many scenarios where we need to process hundreds of thousands or millions of records.
Let’s consider a use case where we get data from Oracle Database and load it into Salesforce Einstein. Here, we’re moving data from one system to another. The source is the Oracle database, and the destination is Salesforce Einstein analytics.
We’re supposed to process many records in this scenario — let’s say 500,000 records. Also, we need to perform intermediary steps before the data is loaded into the end system. In our scenario, we’ll perform a transform on these 500,000 records before loading them into Salesforce analytics.
The first way to process this data is to break it into chunks and use a batch job component. Processing too much data at once might result in out-of-memory issues. We generally have a limited number of vCores, so memory management must be optimized.
Processing using batch job
There are various configurations we can have with the batch job itself.
In the above diagram, we’re first transforming our data outside the batch job, passing it into chunks using batch job and finally aggregating it. We’re then performing the loading using the Einstein Analytics component.
The drawback of this method is that because all the records are being transformed at once, outside the batch job, it takes too much space in memory, and the Mule runtime may go out of memory, causing issues.
In the above diagram, the transform component is inside the batch job. Here, the transformation will happen in chunks and not at once. This consumes less memory at a given step. Here, we have some properties that we can customize:
- Batch block size: This is the number of records given to each thread for execution. The greater this number is, the thread will perform the less input/output (I/O) operation, but more memory is needed to hold the block.
- Max concurrency: This is the total number of threads used for batch processing. By default, this is twice the number of processor cores. Each thread processes data in parallel.
- Batch aggregator: This is the component to aggregate records before processing them. This helps to send bulk data to the target system. Here we have two options:
- Aggregator size: Here, we give a numeric value, and indicate many records will be aggregated and processed together.
- Streaming: This enables us to aggregate all records.
By setting these properties to required values, we can decide the performance and memory usage of the batch job. We can try varying different values here to get our desired result. This should work for most cases, but we can use another way of processing if it doesn’t.
Processing using For Each
In For Each also, we can have two configurations.
In the above diagram, we’re transforming our data outside each component and passing it into chunks to the Einstein Analytics component to load the data.
This has the same drawback as the batch job scenario. Because we transform all data at once, the application may go out of memory.
In the above diagram, we now have the transform component inside the for each. This has better memory usage as the whole data is broken into smaller pieces and then processed. Here we have one useful property:
- Batch Size: This is the number of records processed together in one step of the For Each.
For example, if my batch size is 20,000, the first 20,000 records out of 500,000 will be processed in one step. First, we’ll transform these records and then load them into Salesforce analytics. At one given time, only memory for 20,000 records is occupied.
We can try varying the batch size to get our desired result in both performance and memory usage.
For Each can be useful for scenarios where a batch job doesn’t get the work done. As it has less overhead, it can be better for certain use cases.
Memory optimization
One common error that can be caused by memory getting full is:
- The application is not responding to the Mule system health monitor.
There are many reasons for this error, but memory overflow is one of the major ones. When this happens, the application on CloudHub restarts.
The amount of memory available is directly proportional to how much data we can process. On on prem systems, we can have a huge amount of memory, and it’s easier to process millions of records. When the application is on CloudHub, we have limited vCores, so it becomes especially important to optimize memory on CloudHub. A few tips for memory optimization:
- Process data in smaller chunks instead of processing all data at once
- Sometimes, unused variables may be taking up too much memory. We can use the ‘Remove Variable’ component in the core module to free up the memory used by it once the function of the variable is completed.
- Try to use dataweave for processing large amounts of information, as it’s the most efficient compared to other methods.
Another important way is to reduce the input data. For example, we have 500,000 records that we get from the SQL query and are processing them in a single execution. Instead, we can split this query into two, returning 250,000 records and process them in two executions.
The core takeaway from this article is that we should process data into smaller pieces and not all at once.
— By Ashish Yadav