Wednesday, May 3, 2017

MuleSoft Polling Scope and Watermark

1.0 Overview

As an Integration developer we are always faced with the need to poll a legacy resource to retrieve new data and to sync it over to another destination endpoint. MuleSoft allows developers to do this via "Polling Scope". The polling scope allows the developer to poll a particular source endpoint based on a timed interval, the polling scope also has this neat caching function known as "Watermark".
"Watermark" allows the polling scope to poll for new resources instead of getting the same resource over and over again. The following illustration shows how this is done.

Figure 1.0 is an abstraction of the Mule's "Polling Scope", it is always implemented in the receive scope, the illustration shows that it is polling a database table, the database table must be ordered so that the "Watermark functionality" can move effectively in the ordered list. Watermark stores the current/last picked up record id. If the following mule application is shut down it will store the last picked up record id in Java Object store, the data will be persisted into files, this operation is transparent to the users and that is the value of having the "Watermark Functionality". Developers do not need to create code to handle caching it is all configurable.

2.0 Creating a Hypothetical Scenario

It is always easier to learn something by doing, so lets make up a hypothetical scenario. In this scenario the Integration developer is required to build a Mule App that is capable of syncing newly created employees from a legacy database to a new source system.
Let’s say our source system is SQL Server database. You can do this via any database the example shown in MuleSoft is a MySql database, and since they already have that example I figure I would do it differently so that developers have more sample codes to play around with.

2.1 Preparing the Legacy Database

You will need to have preinstalled SQL Server in your local machine. And then you need to create a database with the following name "MuleDemoDb", as per the following depiction.

After you create the database you need to open a query editor and point to the newly created database and execute the following DB Scripts.
CREATE TABLE employees (
no INT NOT NULL,
dob DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender VARCHAR(1) NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (no)
);
CREATE TABLE roles (
id INT  NOT NULL IDENTITY(1,1),
emp_no INT,
role varchar(255) default NULL,
PRIMARY KEY (id)
) ;
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1011,'1985-09-02','Chava','Puckett','F','2008-10-12');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1012,'1971-12-03','Christopher','Tillman','M','2006-11-01');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1013,'1975-07-31','Judith','David','F','10-11-20');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1014,'1957-08-03','Neil','Ford','F','08-09-04');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1015,'1977-01-09','Daryl','Wolfe','M','07-09-14');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1016,'1986-03-08','Maryam','Burt','M','09-09-16');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1017,'1980-08-21','Marny','Alvarez','M','11-01-27');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1018,'1965-04-06','Wanda','Fowler','M','08-02-09');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1019,'1950-02-14','Lillian','Hancock','F','05-11-22');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1020,'1965-11-17','Tatyana','Lucas','M','09-02-16');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1021,'1973-03-13','Rooney','Sears','M','05-09-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1022,'1974-11-23','Ezekiel','Harding','M','10-07-02');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1023,'1961-01-26','Willa','Swanson','F','12-10-24');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1024,'1948-01-24','Eden','Mcclure','F','09-02-13');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1025,'1951-10-31','Maris','Serrano','F','11-10-04');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1026,'1972-11-11','Kyle','Jordan','M','12-10-22');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1027,'1953-10-06','Jolie','Burton','M','06-06-11');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1028,'1970-11-22','Alyssa','Black','M','11-11-10');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1029,'1952-05-23','Rahim','Noel','F','10-08-13');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1030,'1979-03-07','Roth','May','M','12-06-04');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1031,'1961-08-07','Mira','Harding','M','08-02-04');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1032,'1957-04-07','Helen','Pacheco','F','07-11-17');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1033,'1960-08-11','Evangeline','Mullen','M','13-01-25');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1034,'1971-08-07','Isadora','Walsh','F','09-07-02');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1035,'1979-02-25','Sybil','Mccarty','F','10-06-15');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1036,'1989-08-23','Emma','Cardenas','M','10-01-16');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1037,'1965-03-18','Seth','Monroe','M','06-10-16');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1038,'1954-03-21','Herrod','Noel','M','10-07-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1039,'1963-09-06','Devin','Howard','M','11-12-18');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1040,'1989-05-25','Kaden','Ellis','F','10-12-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1041,'1966-02-21','Emery','Walters','M','07-05-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1042,'1957-11-15','Tyrone','Gill','F','12-07-24');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1043,'1957-06-20','Uriah','Morse','M','12-04-22');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1044,'1976-11-15','Ross','Bradford','M','08-11-14');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1045,'1964-05-04','Elton','Wilkins','F','10-12-21');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1046,'1948-06-07','Lillith','Estes','M','08-04-12');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1047,'1960-04-04','Hayfa','Burch','F','06-09-25');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1048,'1966-02-26','Erin','Lane','M','05-03-01');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1049,'1985-08-23','Ella','Robinson','F','06-03-11');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1050,'1967-04-19','Wayne','Fischer','M','07-05-24');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1051,'1970-11-07','Channing','Mccoy','M','06-05-27');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1052,'1993-07-07','Rhonda','Kirby','M','06-05-19');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1053,'1978-06-04','Brenda','Hodge','M','06-05-09');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1054,'1959-10-27','Barbara','Dixon','M','12-12-05');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1055,'1949-04-28','Zephr','Lindsey','M','09-02-16');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1056,'1977-08-30','Joan','Campbell','M','12-10-14');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1057,'1957-04-14','Breanna','Leblanc','F','07-12-29');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1058,'1983-01-15','Hanna','Shaffer','M','11-04-12');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1059,'1966-01-15','Felicia','Burt','F','11-11-16');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1060,'1963-10-16','Nevada','Blackburn','M','07-08-10');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1061,'1961-12-26','Germane','Duncan','F','09-05-31');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1062,'1974-03-18','Vladimir','Becker','M','09-12-10');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1063,'1965-03-04','Stephen','Clarke','F','09-06-25');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1064,'1968-10-18','Jackson','Edwards','F','11-03-02');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1065,'1959-05-16','Brent','Dunn','M','08-01-26');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1066,'1971-10-21','Quentin','Puckett','F','08-09-15');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1067,'1950-09-26','Mona','Sosa','M','07-11-27');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1068,'1977-10-01','Nola','Dillard','F','06-10-17');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1069,'1956-08-04','Destiny','Maldonado','M','11-05-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1070,'1974-07-03','Levi','Dunn','M','11-12-13');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1071,'1987-09-15','Colleen','Mcpherson','M','05-02-05');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1072,'1952-12-11','Igor','Macias','M','11-10-11');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1073,'1984-07-04','Brooke','Hodge','F','06-06-22');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1074,'1969-08-30','Dillon','Stone','F','06-06-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1075,'1975-12-29','Marshall','Acevedo','M','11-12-22');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1076,'1965-03-29','Kylan','Richards','F','10-07-21');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1077,'1991-01-23','Luke','Howard','F','09-07-17');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1078,'1951-01-23','Chelsea','Chan','F','07-03-09');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1079,'1978-02-21','Linus','Hobbs','F','12-04-28');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1080,'1977-01-28','Burke','Ashley','F','08-07-09');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1081,'1990-11-23','Pearl','Dennis','M','10-10-10');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1082,'1981-04-27','Lyle','Myers','F','06-03-02');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1083,'1966-05-04','Kennan','Roman','M','07-07-20');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1084,'1947-12-28','Marcia','Bell','M','05-07-29');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1085,'1987-01-25','Aaron','Parrish','M','12-02-18');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1086,'1960-08-05','Madeline','Elliott','M','08-05-13');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1087,'1951-09-03','Zahir','Stevenson','M','12-06-23');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1088,'1973-01-31','Colette','Berger','F','12-01-22');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1089,'1987-11-09','Molly','Nieves','M','12-04-02');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1090,'1978-10-03','Nicole','Salas','M','07-11-08');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1091,'1955-05-08','Zane','Madden','M','09-07-01');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1092,'1949-03-26','Sydnee','Chen','F','09-11-11');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1093,'1969-02-24','Francesca','Patel','F','08-05-11');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1094,'1949-05-17','Clark','Glenn','F','08-09-25');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1095,'1984-12-07','William','Glover','F','09-12-28');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1096,'1967-10-30','Noble','Wiggins','F','08-04-08');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1097,'1977-10-15','Dai','Weeks','F','10-02-01');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1098,'1955-03-13','Ciara','Chavez','F','11-04-05');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1099,'1977-11-29','Francis','Singleton','M','10-12-07');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1100,'1993-03-25','TaShya','Mack','M','11-01-12');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1101,'1973-08-28','Jameson','Lopez','F','11-12-19');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1102,'1981-08-12','Dora','Hinton','F','07-05-26');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1103,'1948-11-13','Pascale','Ray','F','06-11-27');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1104,'1984-03-15','Abigail','Weiss','F','10-07-09');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1105,'1987-06-10','Fletcher','Underwood','M','13-01-15');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1106,'1947-12-24','Geoffrey','Meyers','M','08-04-15');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1107,'1989-01-09','Mara','Smith','M','05-07-18');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1108,'1963-05-07','Rhoda','Beard','M','10-12-02');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1109,'1964-01-22','Ali','Hanson','M','05-01-26');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1110,'1973-01-25','Vaughan','English','F','11-03-04');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1111,'1961-10-13','Marah','Pollard','M','07-10-28');
INSERT INTO employees (no,dob,first_name,last_name,gender,hire_date) VALUES (1112,'1975-08-18','Tatum','Adams','F','11-03-24');
INSERT INTO roles (emp_no,role) VALUES (1011,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1012,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1013,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1014,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1015,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1016,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1017,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1018,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1019,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1020,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1021,'Sr. Manager');
INSERT INTO roles (emp_no,role) VALUES (1022,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1023,'Manager');
INSERT INTO roles (emp_no,role) VALUES (1024,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1025,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1026,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1027,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1028,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1029,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1030,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1031,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1032,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1033,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1034,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1035,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1036,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1037,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1038,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1039,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1040,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1041,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1042,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1043,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1044,'Software Architect');
INSERT INTO roles (emp_no,role) VALUES (1045,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1046,'Software Architect');
INSERT INTO roles (emp_no,role) VALUES (1047,'Manager');
INSERT INTO roles (emp_no,role) VALUES (1048,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1049,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1050,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1051,'Software Architect');
INSERT INTO roles (emp_no,role) VALUES (1052,'Software Architect');
INSERT INTO roles (emp_no,role) VALUES (1053,'Sr. Manager');
INSERT INTO roles (emp_no,role) VALUES (1054,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1055,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1056,'Software Architect');
INSERT INTO roles (emp_no,role) VALUES (1057,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1058,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1059,'Software Architect');
INSERT INTO roles (emp_no,role) VALUES (1060,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1061,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1062,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1063,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1064,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1065,'Sr. Manager');
INSERT INTO roles (emp_no,role) VALUES (1066,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1067,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1068,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1069,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1070,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1071,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1072,'Sr. Manager');
INSERT INTO roles (emp_no,role) VALUES (1073,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1074,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1075,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1076,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1077,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1078,'Sr. Manager');
INSERT INTO roles (emp_no,role) VALUES (1079,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1080,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1081,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1082,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1083,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1084,'Office Manager');
INSERT INTO roles (emp_no,role) VALUES (1085,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1086,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1087,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1088,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1089,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1090,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1091,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1092,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1093,'CEO');
INSERT INTO roles (emp_no,role) VALUES (1094,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1095,'Sr. Developer');
INSERT INTO roles (emp_no,role) VALUES (1096,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1097,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1098,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1099,'Engineer');
INSERT INTO roles (emp_no,role) VALUES (1100,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1101,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1102,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1103,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1104,'Office Assistant');
INSERT INTO roles (emp_no,role) VALUES (1105,'Intern');
INSERT INTO roles (emp_no,role) VALUES (1106,'Developer');
INSERT INTO roles (emp_no,role) VALUES (1107,'Secretary');
INSERT INTO roles (emp_no,role) VALUES (1108,'Sr. Manager');
INSERT INTO roles (emp_no,role) VALUES (1109,'Operations Manager');
INSERT INTO roles (emp_no,role) VALUES (1110,'Software Architect');

2.2 The Hypothetical Requirement

You are to build a Mule Application that will pool this employee database for newly created employees and send it across to be handled by different message processors, and in the event of an execution failure/message processor exception, the Mule App must be capable of resuming database polling from and employee ID that is larger than the previous successful watermark value. The following flow chart illustrate this requirement.

3.0 Developing the Mule Application

Figure 3.0 depics how the end solution looks like, the main flow that depicts the solution is "muledbpollingflow", the second flow "resetObjectStore" is for us the developers to reset the watermark value, this is so that we do not need to keep creating new records.

I will start to talk about the internal configuration of the main flow ("muledbpollingflow") from left to right so that you could understand what is happening under the hood.

3.1 Polling Scope Settings


First we in the receive scope we have the Polling Scope, the following print screen is the settings we have employed into the polling scope (Figure 3.1).

I have set the frequency for polling to happen every 5 seconds by entering 5000 milliseconds to the text box, there will be a zero start delay, and I have put in the time measurement as milliseconds. If you look at Figure 3.0 you will see an alternate option to set up a Cron Scheduler.
Next up you have the Watermark pane, here is where you key in all the configurations for the Watermark functionality. The first field would be the flow variable name that you want Watermark operation to serialize. Second field is the default value that you want your flow variable to store on it's first run. Here I have used the "Update Expression" instead of the "Selector Expression", because that is the only thing that will work for the mentioned requirement in section 2.2 (and besides "Selector Expression" is already demonstrated in MuleSoft documentation, I chose "Update Expression" because its usage is not demonstrated).
Next we have the selector expression, this is important as it tells the watermark function on which field on the database to focus on. Here you can see that it is implied where 1 record from the database is equivalent to one payload, in other words when a record's data is retrieved from the database on to a Mule flow, it is then represented as a payload, the field name of the records are the same whether it be from the table (in the database) or from the message payload in mule (during runtime).
Next field is the "Object Store", developers has an option to write their own Java Object Store (for more convoluted use cases) or to use the default object store implemented in Mule. I have made a reference to the default/implied object store in mule "_defaultUserObjectStore". 

 3.2 The Database Connector

Inside the polling scope we have the database connector, you can obtain the details of the databse connector configuration from the sourcode that I have checked into GIT Hub.

We will be executing a select operation, in the select operation you have to key in the SQL statement that will be executed by the poll, here you see that I am using the watermark flow variable called "lastRecordID", and notice the order by section of the query, that is really important because we want an ordered result to be able to apply the watermark function. Now if you look at the advanced tab, depicted by Figure 3.2.2 below. You will see that I have configured the database connector to retrieve 5 records at a time.

3.3 The "For Each" Scope
Before we delve into "For Each" scope notice that we have set a message processor to initialize a flow variable name "previousRecordID".
<set-variable variableName="previousRecordID" value="#[flowVars['lastRecordID']]" doc:name="Initialize previousRecordID" />
This is so that we could use this flow variable later in the For Each Scope.
Next we move on to the "For Each Scope" (at Figure 3.3), here you notice that we need not configure anything and the scope intuitively knows how to split the 5 records payload, neat aye :).

This would mean that the steps we employ in side "For Each Scope" is to process 1 record at a time, because the "For Each scope" splits the records implicitly for us. The first opertaion of the for each scope is to set a flow variable called "currentRecord" with the value of "#[payload['no']]".
<set-variable variableName="currentRecord" value="#[payload['no']]" doc:name="currentRecord" />
This is so that we could keep track of the current record being processed, and at the end of the "For Each Scope" we will set the value of "currentRecord" into a new flow variable called "previousRecordID".
<set-variable variableName="previousRecordID" value="#[flowVars['currentRecord']]" doc:name="previousRecordID" />
"previousRecordID" is the same flow variable that is used in section 3.1 for the "Watermark's" update expression.
Now let’s look at the groovy expression in between the flow variable setting message processors.
flowVars.dividend = Math.abs(new Random().nextInt() % 3 - 1);
flowVars.divisor = Math.abs(new Random().nextInt() % 3 - 1);
System.out.println("Curent Record ID:" + flowVars['currentRecord']);
System.out.println("Payload: " + payload );
System.out.println("Executing " + flowVars.dividend + " / " + flowVars.divisor );
def ans = flowVars.dividend / flowVars.divisor ;

What I am trying to do here is to intermittently create a division by zero exception/error. The reason I am doing this is to simulate a real world example an exception/error can happen anytime while the records are being processed.
When an exception happens we want the Watermark function to be able to revert back to the previous successful record and start from there. So in other words the sole purpose of the Groovy message processor is to simulate unexpected and intermittent errors while we are processing the records in runtime.
4.0 Testing the Mule App
Its time to put the rubber on the road and see what really happens during run time.
4.1 Resetting the Watermark during runtime
You could reset watermark value by using postman or any web browser, just pass in the Employee ID as the URL parameter, and the Mule App will pick up the next biggest employee ID from the table (refer to the following depiction).

If you click send on postman, you will see the follwing log being printed in your console.
org.mule.api.processor.LoggerMessageProcessor: Reset WaterMark to : 1000
When you set the watermark value to 1000, the first record in the employee table actually begins with the ID 1011, the polling scope can then start from the first record and work its way thourough to the last record.

When Polling scope has finished picking up all records you will see the following being logged into the console window.
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
Foreach$CollectionMapSplitter: Splitter returned no results. If this is not expected, please check your split expression
4.2 Polling Starts
When polling starts you will see the following logs being printed in the cosole window.
Curent Record ID:1011
Payload: [gender:F, no:1011, dob:1985-09-02, last_name:Puckett, hire_date:2008-10-12, first_name:Chava]
Executing 2 / 1
Curent Record ID:1012
Payload: [gender:M, no:1012, dob:1971-12-03, last_name:Tillman, hire_date:2006-11-01, first_name:Christopher]
Executing 1 / 3
Curent Record ID:1013
Payload: [gender:F, no:1013, dob:1975-07-31, last_name:David, hire_date:2020-10-11, first_name:Judith]
Executing 1 / 1
Curent Record ID:1014
Payload: [gender:F, no:1014, dob:1957-08-03, last_name:Ford, hire_date:2004-08-09, first_name:Neil]
Executing 3 / 2
Curent Record ID:1015
Payload: [gender:M, no:1015, dob:1977-01-09, last_name:Wolfe, hire_date:2014-07-09, first_name:Daryl]
Executing 0 / 3
Curent Record ID:1016
Payload: [gender:M, no:1016, dob:1986-03-08, last_name:Burt, hire_date:2016-09-09, first_name:Maryam]
Executing 0 / 1
Curent Record ID:1017
Payload: [gender:M, no:1017, dob:1980-08-21, last_name:Alvarez, hire_date:2027-11-01, first_name:Marny]
Executing 1 / 0
Exception Encoutered ...
Resetting Back to Last Successful Record ID:1016
Curent Record ID:1017
Payload: [gender:M, no:1017, dob:1980-08-21, last_name:Alvarez, hire_date:2027-11-01, first_name:Marny]
Executing 2 / 2
Curent Record ID:1018
Payload: [gender:M, no:1018, dob:1965-04-06, last_name:Fowler, hire_date:2009-08-02, first_name:Wanda]
Executing 1 / 2
Curent Record ID:1019
Payload: [gender:F, no:1019, dob:1950-02-14, last_name:Hancock, hire_date:2022-05-11, first_name:Lillian]
Executing 3 / 1
Curent Record ID:1020
Payload: [gender:M, no:1020, dob:1965-11-17, last_name:Lucas, hire_date:2016-09-02, first_name:Tatyana]
Executing 0 / 1
Curent Record ID:1021
Payload: [gender:M, no:1021, dob:1973-03-13, last_name:Sears, hire_date:2007-05-09, first_name:Rooney]
Executing 3 / 3
Curent Record ID:1022
Payload: [gender:M, no:1022, dob:1974-11-23, last_name:Harding, hire_date:2002-10-07, first_name:Ezekiel]
Executing 0 / 2
Curent Record ID:1023
Payload: [gender:F, no:1023, dob:1961-01-26, last_name:Swanson, hire_date:2024-12-10, first_name:Willa]
Executing 2 / 0
Exception Encoutered ...
Resetting Back to Last Successful Record ID:1022
Curent Record ID:1023
Payload: [gender:F, no:1023, dob:1961-01-26, last_name:Swanson, hire_date:2024-12-10, first_name:Willa]
Executing 3 / 0
Exception Encoutered ...
Resetting Back to Last Successful Record ID:1022
Curent Record ID:1023
Payload: [gender:F, no:1023, dob:1961-01-26, last_name:Swanson, hire_date:2024-12-10, first_name:Willa]
Executing 1 / 0
Exception Encoutered ...
Resetting Back to Last Successful Record ID:1022
Curent Record ID:1023
Payload: [gender:F, no:1023, dob:1961-01-26, last_name:Swanson, hire_date:2024-12-10, first_name:Willa]
Executing 2 / 1
Curent Record ID:1024
Payload: [gender:F, no:1024, dob:1948-01-24, last_name:Mcclure, hire_date:2013-09-02, first_name:Eden]
Executing 1 / 2
Curent Record ID:1025
Payload: [gender:F, no:1025, dob:1951-10-31, last_name:Serrano, hire_date:2004-11-10, first_name:Maris]
Executing 1 / 1
Curent Record ID:1026
Payload: [gender:M, no:1026, dob:1972-11-11, last_name:Jordan, hire_date:2022-12-10, first_name:Kyle]
Executing 2 / 1
Curent Record ID:1027
Payload: [gender:M, no:1027, dob:1953-10-06, last_name:Burton, hire_date:2011-06-06, first_name:Jolie]
Executing 1 / 3
Curent Record ID:1028
Payload: [gender:M, no:1028, dob:1970-11-22, last_name:Black, hire_date:2010-11-11, first_name:Alyssa]
Executing 1 / 1
Curent Record ID:1029
Payload: [gender:F, no:1029, dob:1952-05-23, last_name:Noel, hire_date:2013-10-08, first_name:Rahim]
Executing 2 / 3
Curent Record ID:1030
Payload: [gender:M, no:1030, dob:1979-03-07, last_name:May, hire_date:2004-12-06, first_name:Roth]
Executing 0 / 1
Curent Record ID:1031
Payload: [gender:M, no:1031, dob:1961-08-07, last_name:Harding, hire_date:2004-08-02, first_name:Mira]
Executing 1 / 1
Curent Record ID:1032
Payload: [gender:F, no:1032, dob:1957-04-07, last_name:Pacheco, hire_date:2017-07-11, first_name:Helen]
Executing 1 / 2
Curent Record ID:1033
Payload: [gender:M, no:1033, dob:1948-01-24, last_name:Ting, hire_date:2004-11-10, first_name:Kian]
Executing 1 / 3
Curent Record ID:1034
Payload: [gender:F, no:1034, dob:1951-10-31, last_name:Lee, hire_date:2004-11-10, first_name:Kay]
Executing 1 / 1
Curent Record ID:1035
Payload: [gender:M, no:1035, dob:1948-01-24, last_name:Ting, hire_date:2004-11-10, first_name:Noah]
Executing 0 / 2
Curent Record ID:1036
Payload: [gender:F, no:1036, dob:1951-10-31, last_name:Lee, hire_date:2004-11-10, first_name:Kay]
Executing 0 / 1
Curent Record ID:1037
Payload: [gender:M, no:1037, dob:1948-01-24, last_name:Ting, hire_date:2004-11-10, first_name:Noah]
Executing 1 / 3
Curent Record ID:1038
Payload: [gender:F, no:1038, dob:1951-10-31, last_name:Lee, hire_date:2004-11-10, first_name:Kay]
Executing 2 / 1
Curent Record ID:1039
Payload: [gender:F, no:1039, dob:1951-10-31, last_name:Lee, hire_date:2004-11-10, first_name:Kay]
Executing 1 / 0
Exception Encoutered ...
Resetting Back to Last Successful Record ID:1038
Curent Record ID:1039
Payload: [gender:F, no:1039, dob:1951-10-31, last_name:Lee, hire_date:2004-11-10, first_name:Kay]
Executing 2 / 2
Curent Record ID:1040
Payload: [gender:M, no:1040, dob:1948-01-24, last_name:Ting, hire_date:2004-11-10, first_name:Noah]
Executing 1 / 3
When you look at the logs you notice that there are intemitent errors cuased by division by zero, when this happens, the polling scope will resume polling back to the failed record ID, and yes we have achieved our objective of creating a mule app that will fufill the requirements specified in section 2.2.
5.0 Full Source code
Full working source code is as per the following link.
6.0 Conclusion
There are so many other ways you can test the watermark functionality, one of the way my son have helped me test is that, while I was away on a coffee break, he took hold of my laptop and pressed the shutdown button while the polling was half way running (my son as about 3 years old at the time of this writing).

I thought I would need to reset the watermark to have it run from scratch all over again, but when I turned back up my lap top, started Anypoint Studio and started this mule app, it just started where it left off, amazing isn't it :). This watermark functionality will really come in handy with it's implied serialization to disk when we shut down the app or the server. My son has discovered this feature for me :P. You could download the app and do more testing yourself, the only way to learn something is to experience it and play around with it, so happy exploring !!!

8 comments: