Mule 4 batch processing deep dive
- June 04, 2020
In this technical article, we’ll learn how Mule 4 batch processing works using an example to understand different configuration parameters.
What is Mule 4 batch processing?
One of MuleSoft’s selling points and best features is batch processing. It helps process large volumes of data in chunks and in parallel.
Batch processing is only available in Mule Enterprise Edition (MuleSoft EE).
General -> Name: Name of the Batch JOB Activity in Mule Flow
General -> Max Failed Records: Mule has three options for handling a record-level error: Finish processing, continue processing and continue processing until the batch job accumulates a maximum number of failed records. This behavior can be controlled by Max Failed Records.
- The default value is zero, which corresponds to finish processing.
- The value -1 corresponds to continue processing.
- The value +ve integer corresponds to continue processing until the batch job accumulates a maximum number of failed records.
General -> Scheduling Strategy: Scheduling Strategy decides now more than one batch instance will run in case others start before the current batch job finishes.
- ORDERED_SEQUENTIAL (Default)
- ROUND_ROBIN
General -> Job Instance ID: By default the Batch Job Instance ID is created as UUID automatically by MuleSoft. However, this field can be used to overwrite default behavior.
General -> Batch Block Size: Number of records treated as chunks and processed by one thread. Default value is 100.
General -> Max Concurrency: Max number of Thread to start to process different blocks in parallel. Default and Max Value is 16.
History -> Max Age and Time Unit: Batch process retains the history of batch instances in the temporary directory of Mule Runtime. By default, the retention policy is set to seven days. A monitoring process will remove the temporary data that has met the expiration criteria. Using Max Age and Time Unit, we can change this default behavior.
Overview of Batch Step and its configuration parameters:
General -> Name: Name of the Batch Step Activity in Mule Flow
A batch step uses two attributes to filter records:
General -> Accept Expression: To process only records that evaluate to true; if the record evaluates to false, the batch step skips the record and sends it to the next one. In other words, the records with an accepted expression that resolves to false are the ones that Mule filters out.
General -> Accept Policy: Batch step to process only the records which, relative to the value of the accept policy attribute, evaluate to true. Refer to the table below for a list of the available values for the accept policy.
Now, let’s run an example to cover below points:
- Batch Block Size
- Max Failed Records
- Accept Expression
- Accept Policy
Sample Flow:
<?xml version="1.0" encoding="UTF-8"?>
<mule xmlns:http="http://www.mulesoft.org/schema/mule/http"
xmlns:batch="http://www.mulesoft.org/schema/mule/batch"
xmlns:ee="http://www.mulesoft.org/schema/mule/ee/core"
xmlns="http://www.mulesoft.org/schema/mule/core" xmlns:doc="http://www.mulesoft.org/schema/mule/documentation"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.mulesoft.org/schema/mule/http http://www.mulesoft.org/schema/mule/http/current/mule-http.xsd
http://www.mulesoft.org/schema/mule/batch http://www.mulesoft.org/schema/mule/batch/current/mule-batch.xsd
http://www.mulesoft.org/schema/mule/ee/core http://www.mulesoft.org/schema/mule/ee/core/current/mule-ee.xsd http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd">
<flow name="batchPOC" doc:id="011a4555-c6f8-439f-8fb9-df30137d8287" >
<http:listener doc:name="Listener" doc:id="beaca31d-2d53-47db-b16e-1b0db8043425" config-ref="HTTP_Listener_config" path="/batch"/>
<ee:transform doc:name="Transform Message" doc:id="20b5aec3-9bed-44fe-88d0-9f18a91900e6" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
1 to 15]]></ee:set-payload>
</ee:message>
</ee:transform>
<batch:job jobName="loggingBatch_Job" doc:id="5d7833ae-480a-409a-baa9-7ccb3f4ba6c5" blockSize="3" maxFailedRecords="-1" schedulingStrategy="ROUND_ROBIN">
<batch:process-records >
<batch:step name="Batch_Step_01" doc:id="113e7380-2559-49d3-b3c7-2e62b0cd0cbf" acceptExpression="payload < 10">
<logger level="INFO" doc:name="Logger" doc:id="9392037b-16cb-4bf0-9853-af533a09c641" message='#["Batch_Step_01: " ++ payload as String]'/>
<ee:transform doc:name="Transform Message" doc:id="94a20d33-415b-4b7c-ae4c-10b803ac1df8" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
100 + (payload as Number)]]></ee:set-payload>
</ee:message>
</ee:transform>
</batch:step>
<batch:step name="Batch_Step_02" doc:id="b1db6a6c-2603-41bf-a232-455057b8bdf7">
<logger level="INFO" doc:name="Logger" doc:id="a8b6cfc9-323b-4065-998f-4b73d77cf9c4" message='#["Batch_Step_02: " ++ payload as String]'/>
<ee:transform doc:name="Transform Message" doc:id="d9ff8200-bc9a-4760-85f5-94d818d452f9" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
if ((payload mod 4) == 0)
1/0
else
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
</batch:step>
<batch:step name="Batch_Step_03" doc:id="29d68595-8f9a-4236-82b8-ca8f31577c3e">
<logger level="INFO" doc:name="Logger" doc:id="b45e7ac7-5112-4eca-98b6-06f5dc8b4d96" message='#["Batch_Step_03: " ++ payload as String]'/>
</batch:step>
<batch:step name="Batch_Step_04" doc:id="d5a5f4ef-dc4a-4cb6-860d-557e1839aa27" acceptPolicy="ALL">
<logger level="INFO" doc:name="Logger" doc:id="54efa46a-327d-4b82-9429-f4500c638da2" message='#["Batch_Step_04: " ++ payload as String]'/>
</batch:step>
</batch:process-records>
<batch:on-complete >
<ee:transform doc:name="Transform Message" doc:id="762f6eb3-d3a7-48c9-ba8a-7086a49cc82a" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/json
---
payload]]></ee:set-payload>
</ee:message>
</ee:transform>
<logger level="INFO" doc:name="Logger" doc:id="d6d1f7e8-b4bb-4bb8-beb6-e0f19606064c" message="#[payload]"/>
</batch:on-complete>
</batch:job>
<ee:transform doc:name="Transform Message" doc:id="f5b5bf3c-b9b9-41b3-b6bf-3b8350fdfbcc" >
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output text/plain
---
"Success"]]></ee:set-payload>
</ee:message>
</ee:transform>
</flow>
</mule>
Overview
- Setting Payload as an Array with value [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15] before starting Batch Job
- Batch Job’s Max Failed Records is set to -1 i.e. it will continue processing
- Batch Job’s Batch Block Size is set to 3 i.e. each thread will only take three records and process it.
- Batch Job’s Batch_Step_01 Accept Expression is set to payload < 10 and Accept Policy is set to NO_FAILURES i.e. Batch_Step_01 will process only record 9 (1-9) records and not process the last 6 (10-15) records.
- Batch Job’s Batch_Step_02 Accept Expression is set to BLANK and Accept Policy is set to NO_FAILURES i.e. Batch_Step_02 will process all the records, even ignored records by Batch_Step_01.
- Batch Job’s Batch_Step_02 is doing DataWeave transformation on payload as shown below i.e. every fourth record will be failed because of divide by zero error, so in total only 12 records will be successful and remaining three records will fail.
%dw 2.0
output application/java
---
if ((payload mod 4) == 0)
1/0
else
payload
- Batch Job’s Batch_Step_03 Accept Expression is set to BLANK and Accept Policy is set to NO_FAILURES i.e. Batch_Step_03 will process all (12) successful records and ignore failed (3) records from Batch_Step_02
- Batch Job’s Batch_Step_04 Accept Expression is set to BLANK and Accept Policy is set to ALL i.e. Batch_Step_04 will process all (15) records
Run the flow and collect the logs. I tried to put the logs in sheet format after removing unnecessary content and sorted based on Step Name. See below.
Observations
- Each thread processed exactly three records i.e. defined by Batch Block Size
- Even after three record failures did not stopped batch to process records as Max Failed Records defined as -1
- Batch_Step_01 only processed 9 (1-9) records and Ignored 6 (10-15) records i.e. Accept Expression filtered 6 (10-15) records
- Batch_Step_02 processed all 15 (1-15) records as there is no Accept Expression
- Batch_Step_03 processed only 12 records i.e. processed only successful records from last step Batch_Step_02 as Accept Policy was set to NO_FAILURES
- Batch_Step_04 processed all 15 records as Accept Policy was set to ALL
- Batch_Step_02 for records (10-15) are processed almost at the same time as for Batch_Step_01 for records (1-9)
- Each Batch of records are processed by different threads as thread switching is possible after each step completion
- It’s possible that for a few records, they’re still processing at Batch Step and other records next Batch Step processing kicked off.
Note: Run the example flow with different combinations and see the behavior.
— By Mohammad Mazhar Ansari