Tuesday, September 12, 2017

Mule Batch Jobs and It’s Variables


1.0 Overview (the case for flow variables)

I came across a an exam question in Mule and pertaining to the relationship of Flow Variables and Batch Jobs. You need to pay attention to questions like this if you are to pass the mule certificaiton exams. When I try to google for the answer there is no article that has been written about it. Not in the mule documentation no any where on the web. So read on and get an understanding of how each type of mule variables behave in the context of a batch job (mule construct).

The question is something like this, given the following batch job.
Figure 1.0a

And given the following input payload, what would flowVar.aCounter be at the end of On Complete?
[
 {
   "ID":"001",
   "OriginNum":"1"
 },
 {
   "ID":"002",
   "OriginNum":"2"  
 }
]

This is how your thinking process would be, the payload has an array of two elements, which means one instance of the batch job would run and both batch_step_1 and batch_step_2 would be executed two times (one time for each record).

If at the beginning of the batch flowVars.aCounter starts with one (1). Then theoretically flowVars.aCounter would have a total of 5 at the end of “On Complete” phase of the batch job, Right ? Wrong !!! that notion is totally wrong !!! The following is the XML mule configuration of the batch job.

   <batch:job name="batchjob_withnoInput">
       <batch:input>
           <logger message="Batch Job Input: #[payload]" level="INFO" doc:name="Logger"/>
           <set-variable variableName="aCounter" value="#[1]" doc:name="flowVars.aCounter = 0"/>
       </batch:input>
       <batch:process-records>
           <batch:step name="Batch_Step_1">
               <scripting:component doc:name="flowVars.aCounter add 1">
                   <scripting:script engine="Groovy"><![CDATA[flowVars.aCounter = flowVars.aCounter + 1]]></scripting:script>
               </scripting:component>
               <logger message="Batch Step 1 flowVars.aCounter: #[flowVars.aCounter]" level="INFO" doc:name="Logger"/>
           </batch:step>
           <batch:step name="Batch_Step_2" >
               <scripting:component doc:name="flowVars.aCounter add 1">
                   <scripting:script engine="Groovy"><![CDATA[flowVars.aCounter = flowVars.aCounter + 1]]></scripting:script>
               </scripting:component>
               <logger message="Batch Step 2 flowVars.aCounter: #[flowVars.aCounter]" level="INFO" doc:name="Logger"/>
           </batch:step>
       </batch:process-records>
       <batch:on-complete>
           <logger message="oncomplete flowVars.aCounter : #[flowVars.aCounter]" level="INFO" doc:name="Logger"/>
       </batch:on-complete>
   </batch:job>


A batch job is not a flow but it can reference and execute a flow. Each step in the batch job executes as an independant flow, so the scope of the flow variable is really constrained to the individual steps, after execution leaves a batch step the flowVars is gone.

2.0 The Real Answer

The following is logs collected if you run the batch job at Figure 1.0a.
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase
[[batchplayground].HTTP_Listener_Configuration.worker.01] org.mule.api.processor.LoggerMessageProcessor: Batch Job Input: [{OriginNum=1, ID=001}]
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance '67b3a840-974e-11e7-956a-b62d20524153' of job 'batchjob_withnoInput'
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance 67b3a840-974e-11e7-956a-b62d20524153 of job batchjob_withnoInput. 1 records were loaded
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance '67b3a840-974e-11e7-956a-b62d20524153' of job 'batchjob_withnoInput'
[batch-job-batchjob_withnoInput-work-manager.01] org.mule.api.processor.LoggerMessageProcessor: Batch Step 1 flowVars.aCounter: 2
[batch-job-batchjob_withnoInput-work-manager.01] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step_1 finished processing all records for instance 67b3a840-974e-11e7-956a-b62d20524153 of job batchjob_withnoInput
[batch-job-batchjob_withnoInput-work-manager.03] org.mule.api.processor.LoggerMessageProcessor: Batch Step 2 flowVars.aCounter: 2
[batch-job-batchjob_withnoInput-work-manager.03] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance 67b3a840-974e-11e7-956a-b62d20524153 of job batchjob_withnoInput
[batch-job-batchjob_withnoInput-work-manager.03] org.mule.api.processor.LoggerMessageProcessor: oncomplete flowVars.aCounter : 1
[batch-job-batchjob_withnoInput-work-manager.03] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution of onComplete phase for instance 67b3a840-974e-11e7-956a-b62d20524153 of job batchjob_withnoInput
[batch-job-batchjob_withnoInput-work-manager.03] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance '67b3a840-974e-11e7-956a-b62d20524153' of job 'batchjob_withnoInput'.
                                                                                                     Total Records processed: 1. Successful records: 1. Failed Records: 0
[batch-job-batchjob_withnoInput-work-manager.03] com.mulesoft.module.batch.engine.DefaultBatchEngine:
[batch-job-batchjob_withnoInput-work-manager.03] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step_2 finished processing all records for instance 67b3a840-974e-11e7-956a-b62d20524153 of job batchjob_withnoInput

From the logs you can see that the state of the flow variable flowVars.aCounter is not persisted when it traverse through the batch steps, but the state of flow variables when it was first initialized is  being carried through to each phase and batch step. In short the flow variable flowVars.aCounter will always start with 1 in each batch step irregardless if the the batch step try to increment it’s value. And finally the “On Complete” phase still shows a flowVars.aCounter with the value of 1 which was what the variable was initialized with at the Input phase.

So if you ever get a question like this during any of your MuleSoft exams, and if the question ask you to state the value of a flow variable a the “On Complete” phase, then the answer is 1 if the flow variable was initialized with 1 (or zero if the flow variable’s initialized value is zero).

3.0 The Case for Record Variables

Figure 3.0a
Record variable is definitely inaccessible in the oncomplete phase. Users cannot declare record variable in the input phase, the reason behind this is that records only exist after the implicit load and dispatch phase. Figure 3.0a summarizes the availability of record variables, but they are a bit fuzzy about the actual scope/lifetime of the record variable.

To illustrate to you the life time of the record variable I have created the following flow. You can only start setting a record variable in a batch step, I have done it in Batch_step_1 (as per figure 3.0b).
 
Figure 3.0b

Notice that I have used the expression message processor instead of the groovy message processor to set the recordVars.rACounter, the reason being is that the Groovy shell in mule does not recognize recordVars (take note, so that you don't have to spend hours trying to figure out why).

You cannot access a record variable at the On Complete phase else it will give you a runtime exception. The following is the full batch job XML to show how to correctly use record variables.

   <batch:job name="batchjob_withnoInput">
       <batch:input>
           <logger message="Batch Job Input: #[payload]" level="INFO" doc:name="Logger"/>
           <set-variable variableName="aCounter" value="#[1]" doc:name="flowVars.aCounter = 1"/>
       </batch:input>
       <batch:process-records>
           <batch:step name="Batch_Step_1">
               <batch:set-record-variable variableName="rACounter" value="#[1]" doc:name="recordVars.rACounter = 1"/>
               <scripting:component doc:name="flowVars.aCounter add 1">
                   <scripting:script engine="Groovy"><![CDATA[flowVars.aCounter = flowVars.aCounter + 1]]></scripting:script>
               </scripting:component>
               <expression-component doc:name="recordVars.rACounter add 1"><![CDATA[recordVars.rACounter = recordVars.rACounter + 1]]></expression-component>
               <logger message="Batch Step 1 flowVars.aCounter: #[flowVars.aCounter + &quot;\n&quot;] Batch Step 1 recordVars.rACounter: #[recordVars.rACounter + &quot;\n&quot;]" level="INFO" doc:name="Logger"/>
           </batch:step>
           <batch:step name="Batch_Step_2" >
               <scripting:component doc:name="flowVars.aCounter add 1">
                   <scripting:script engine="Groovy"><![CDATA[flowVars.aCounter = flowVars.aCounter + 1]]></scripting:script>
               </scripting:component>
               <expression-component doc:name="recordVars.rACounter add 1"><![CDATA[recordVars.rACounter = recordVars.rACounter + 1]]></expression-component>
               <logger message="Batch Step 2 flowVars.aCounter: #[flowVars.aCounter + &quot;\n&quot;] Batch Step 2 recordVars.rACounter: #[recordVars.rACounter + &quot;\n&quot;]" level="INFO" doc:name="Logger"/>
           </batch:step>
       </batch:process-records>
       <batch:on-complete>
           <logger message="oncomplete flowVars.aCounter : #[flowVars.aCounter + &quot;\n&quot;]" level="INFO" doc:name="Logger"/>
       </batch:on-complete>
   </batch:job>
Listing 3.0a

The code snippet at listing 3.0a will generate the following output:-
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Created instance a3e26950-9758-11e7-956a-b62d20524153 for batch job batchjob_withnoInput
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting input phase
[[batchplayground].HTTP_Listener_Configuration.worker.01] org.mule.api.processor.LoggerMessageProcessor: Batch Job Input: [{OriginNum=1, ID=001}]
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Input phase completed
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Starting loading phase for instance 'a3e26950-9758-11e7-956a-b62d20524153' of job 'batchjob_withnoInput'
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.queue.BatchQueueLoader: Finished loading phase for instance a3e26950-9758-11e7-956a-b62d20524153 of job batchjob_withnoInput. 1 records were loaded
[[batchplayground].HTTP_Listener_Configuration.worker.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Started execution of instance 'a3e26950-9758-11e7-956a-b62d20524153' of job 'batchjob_withnoInput'
[batch-job-batchjob_withnoInput-work-manager.02] org.mule.api.processor.LoggerMessageProcessor: Batch Step 1 flowVars.aCounter: 2
                                                                                               Batch Step 1 recordVars.rACounter: 2
[batch-job-batchjob_withnoInput-work-manager.02] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step_1 finished processing all records for instance a3e26950-9758-11e7-956a-b62d20524153 of job batchjob_withnoInput
[batch-job-batchjob_withnoInput-work-manager.01] org.mule.api.processor.LoggerMessageProcessor: Batch Step 2 flowVars.aCounter: 2
                                                                                               Batch Step 2 recordVars.rACounter: 3
[batch-job-batchjob_withnoInput-work-manager.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Starting execution of onComplete phase for instance a3e26950-9758-11e7-956a-b62d20524153 of job batchjob_withnoInput
[batch-job-batchjob_withnoInput-work-manager.01] org.mule.api.processor.LoggerMessageProcessor: oncomplete flowVars.aCounter : 1
[batch-job-batchjob_withnoInput-work-manager.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution of onComplete phase for instance a3e26950-9758-11e7-956a-b62d20524153 of job batchjob_withnoInput
       _withnoInput-work-manager.01] com.mulesoft.module.batch.engine.DefaultBatchEngine: Finished execution for instance 'a3e26950-9758-11e7-956a-b62d20524153' of job 'batchjob_withnoInput'. Total Records processed: 1. Successful records: 1. Failed Records: 0
[batch-job-batchjob_withnoInput-work-manager.01] com.mulesoft.module.batch.engine.DefaultBatchEngine:
[batch-job-batchjob_withnoInput-work-manager.01] com.mulesoft.module.batch.DefaultBatchStep: Step Batch_Step_2 finished processing all records for instance a3e26950-9758-11e7-956a-b62d20524153 of job batchjob_withnoInput

If you observe both the values of “flowVars.aCounter” and the “recordVars.rACounter”, you will see that only record variable can transfer its state across batch step, but once it is outside of the batch process area (where all the batch step is defined), it is no on longer accessible by the execution context.

4.0 How about Session Variables in Batch Jobs ?


Figure 4.0a
Figure 4.0a shows that I have included the session variable with the fully qualified name of “sessionVars.SaCounter”, its behaviour is the same as a flow variable. In a batch job the notion of the context of scope for both flow variable and session variables does not work as if they are in a flow.

If you observe the log file closely the clues on why it is behaving that way is really obvious.
Figure 4.0b

From the logs you get to know that there is essentially 3 threads that is running. Setting of the flow and session variable happens at a thread named “[batchplayground].HTTP_Listener_Configuration.worker.01”, this is the receiver thread from the flow “batchplaygroundFlow”, because this flow has an inbound endpoint of HTTP it would automatically be an a flow with synchronous processing strategy. The other two threads “batch-job-batchjob_withnoInput-work-manager.01” and “batch-job-batchjob_withnoInput-work-manager.02” are disperae threads allocated by the mule run time to run both batch step 1 and batch step 2. In this multithreading environment both flow variable and session variables are not synchronized only the record variables are synchronized.

5.0 Conclusion


When using batch jobs, it always runs asynchronously and it always executes its batch steps in multithreading mode. Remember batch jobs are built for high throughput processing. If you want the behaviours of your batch job to be predictable always use only record variables because they are synchronized between each batch steps.

8 comments: