Keeping your process clean: Hiding technology complexity behind a service

This blog will explain how you could abstract technology behind a service so your main process will be kept clean of all kind of technology pollution like exception handling, technology adapters and correlation.

In this case you need to interact with a backend database. First you need to start some processing by calling a PL/SQL routine. After some time you will receive a callback containing output values.  Message queueing is used as preferred transport for the callback. You could use a web service callout from the database but this is not the best thing to do.  Error handling at the database side tends to be pretty tedious and could result in hanging database sessions.
Keeping your process clean: Hiding technology complexity behind a service image12
The service to hide the complexity is implemented as a composite with an asynchronous interface/WSDL. The main process will only interact with this composite. To make things a little more complicated the composite has been split in two composites. The receiving of the messages from the queue is implemented as a separate composite called the dispatcher. It is responsible for receiving the messages and sending them to the right place. In the future this dispatcher could process new message types.
Keeping your process clean: Hiding technology complexity behind a service image23
Zooming in and adding some more details the implementation looks like:
Keeping your process clean: Hiding technology complexity behind a service image33
The dispatcher uses a mediator to do some on the fly transformations and to decide to whom to post the message based on the message content.  Correlation is used between the asynchronous service composite and the dispatcher to send the message to the correct instance so it will eventually be returned to the right main process.

Technical error handling like RemoteFaults because of unavailability of the database are handled inside the asyn service. If you expect the response message from the database within a certain amount of time you can add an onAlarm or use a Pick instead of a Receive.

There is one error situation more complicated to handle.  Suppose the database post a message to the queue and no async service process instance could be found to correlate with. As by design the BPEL engine will create an empty instance for you hoping you will actually create the real instance yourself in the near future that can continue processing the response message. That is not what you want. So you want to be sure before you actually call the async service from the dispatcher an instance exists it can correlate with avoiding ‘ghost’ instances.

A solution is to do a pre check inside the dispatcher. You can uniquely identify an async service instance by adding a sensor (value).  The soa management API provides a locator with which you can search composite and component instances based on filters. A filter can contain a sensor (value).  Here is some sample code:

String flowId = (String)getVariableData("flowId");
String PROVIDER_URL = (String)getVariableData("PROVIDER_URL");
String SECURITY_PRINCIPAL = (String)getVariableData("SECURITY_PRINCIPAL");
String SECURITY_CREDENTIALS = (String)getVariableData("SECURITY_CREDENTIALS");

java.util.Hashtable jndiProps = new java.util.Hashtable();

jndiProps.put(javax.naming.Context.PROVIDER_URL,PROVIDER_URL);
jndiProps.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY,"weblogic.jndi.WLInitialContextFactory");
jndiProps.put(javax.naming.Context.SECURITY_PRINCIPAL, SECURITY_PRINCIPAL);
jndiProps.put(javax.naming.Context.SECURITY_CREDENTIALS, SECURITY_CREDENTIALS);
jndiProps.put("dedicated.connection", "true");

boolean flowFound = false;
oracle.soa.management.facade.Locator locator = null;

try {
locator = oracle.soa.management.facade.LocatorFactory.createLocator(jndiProps);
java.util.List sFilterList = new java.util.ArrayList();
oracle.soa.management.util.SensorFilter sFilter = new oracle.soa.management.util.SensorFilter("flowId",
oracle.soa.management.facade.Sensor.SensorDataType.STRING, oracle.soa.management.util.Operator.EQUALS,
flowId);
sFilterList.add(sFilter);

oracle.soa.management.util.CompositeInstanceFilter filter = new oracle.soa.management.util.CompositeInstanceFilter();
filter.setSensorFilter(sFilterList);
filter.setState(0);
filter.setCompositeName("DocGenService");
java.util.List obInstances = locator.getCompositeInstances(filter);

if (obInstances != null && obInstances.size()==1) {
addAuditTrailEntry("CompositeInstance found.");
oracle.soa.management.facade.CompositeInstance compositeInstance = obInstances.get(0);
oracle.soa.management.util.ComponentInstanceFilter componentInstanceFilter = new oracle.soa.management.util.ComponentInstanceFilter();
java.util.List cis = compositeInstance.getChildComponentInstances(componentInstanceFilter);
if (cis != null && cis.size()==1) {
if (cis.get(0).getNormalizedState() == 0) {
addAuditTrailEntry("DocGenService flow found with flowId: "+flowId);
flowFound = true;
}
}
}
} catch (java.rmi.RemoteException re) {
} catch (Exception e) {
} finally {
locator.close();
}
setVariableData("found",(flowFound? "J":"N")) ;
if (flowFound) {
addAuditTrailEntry("DocGenService flow found with flowId: "+flowId);
else
addAuditTrailEntry("DocGenService flow **not** found with flowId: "+flowId);
}

In this sample code the flowId is the unique identifier/sensor. This code is implemented inside a BPEL Java embedded activity.

Depending on the message load this approach could be a little expensive. So instead of using the soa management API you can directly query the SOAINFRA database.Â

FUNCTION PROCESS_EXISTS(
P_FLW_IDÂ IN NUMBER,
P_BPEL_NAME IN VARCHAR2)
RETURN INT
IS
BEGIN
DECLARE
CURSOR C_PCS_I (B_FLOW_ID IN VARCHAR2, B_BPEL_NAME IN VARCHAR2)
IS
SELECT CXCI.CIKEY
FROM FUSION_SOAINFRA.CUBE_INSTANCE CXCI,
FUSION_SOAINFRA.COMPOSITE_SENSOR_VALUE CSV
WHERE CXCI.CMPST_IDÂ = CSV.COMPOSITE_INSTANCE_ID
AND CSV.SENSOR_NAMEÂ = 'FLOWID'
AND CSV.STRING_VALUEÂ = B_FLOW_ID
AND CXCI.COMPONENT_NAME = B_BPEL_NAME
AND CXCI.STATEÂ = 1;

R_PCS_I C_PCS_I%ROWTYPE;

BEGIN
OPEN C_PCS_I(B_FLOW_ID => P_FLW_ID, B_BPEL_NAME => P_BPEL_NAME);
FETCH C_PCS_I INTO R_PCS_I;
IF C_PCS_I%FOUND THEN
CLOSE C_PCS_I;
RETURN 1;
END IF;
CLOSE C_PCS_I;
RETURN 0;
END;
End Process_Exists;

If you wonder why I return an integer instead of a boolean. This has to do with the fact the database adapter cannot handle Boolean return values and will wrap them as integers anyways.