Not too long ago, one of my customers had the following requirement: a file with invoice-entries has to be processed each night; for all invoice entries for the same customer, we would like to start a single BPEL process instance that aggregates the entries and creates a single invoice. To process the entire file, one BPEL process instance needs to be created for every unique customer who has invoice entries in that file. Note however that the Inbound File Adapter knows nothing about the customers or about previously started process instances, it will simply invoke a BPEL process ‘service’ for each line it processes.
The figure illustrates the situation. Note however that the invoice entries need not be sorted, and could well look like this:
BPEL of course offers correlation – the mechanism that allows us to feed messages into already running BPEL process instances. However, before SOA Suite 11g PS5, an inbound operation in a BPEL process – Receive, OnMessage – either initiates an instance (and possibly a correlation set) or it can correlate the inbound message into a running instance. But it cannot do both. So for the customer requirement at hand, correlation as it was is not good enough. See this article for the problems we ran into before the PS5 release: https://technology.amis.nl/2011/02/24/batch-aggregation-of-files-in-bpel-process-instances-based-on-correlation/.
The File Adapter processing the file with invoice entries is not aware of any BPEL process instances that may already be running. So it will simply try to invoke some operation – something like processInvoiceEntry – on a BPEL process. This operation will be configured to receive into a running instances based on correlation on the customer identifier or it will receive and start a new instance. If there is not already a running instance for the specific customer, no correlation can take place and the first implementation of the operation would fail (to correlate). However, a call to the second implementation will fail after the first call to process an invoice entry for a customer because a correlation set for the customer can be created only once. Note that a failure to correlate is not evident very rapidly, as is clear from a mail on this topic that I sent in March 2011 to the SOA Suite product development team:
“[…] We only find out that our message did not correlate into a running instance because of the time out that occurs on the synchronous reply. In order to have a fast answer to the question whether or not an instance is running – we currently always assume that an instance does not already exist, catching the ConflictingReceive fault as an indication that an instance does exist, then correlating into that one. Apart from the fact that far too much knowledge about the CompanyInvoiceAggregator composite is now assumed in the InvoiceProcessor , it is also quite horrible for the administrator to see a faulted instanced as a result of this workaround. It would be very convenient if we could very rapidly find out – on the first call for a company [id]from InvoiceProcessor to CompanyInvoiceAggregator that there is not yet a running instance (preferably without even a fault) – rather than waiting for 45 secs (or whathever systemwide value we set). Even being able to specify a sub-second time out on a specific synchronous Invoke activity would be useful.”
The short term response from the team helped us to understand our situation and find some workaround. The longer term reply was a new feature in the BPEL engine, introduced in PS5, that allows us to configure a Receive operation to have the conditional correlation behavior we were looking for:
- if the consumer invokes the BPEL process via the operation using a correlation identifier (in this case the customer) for which there is not a running instance, then a new instance is initiated
- if the operation is invoked with a correlation identifier matching an existing running instance, then correlation takes place into that instance
This means that our customer-by-customer aggregation can now be implemented: the first invoice entry for a customer will cause a BPEL instance to be initiated for that customer and every subsequent call to the same operation by the file adapter will result in correlation into that instance.
This article demonstrates the steps required to implement a BPEL process that makes use of this new correlation facility.
BPEL Process to process InvoiceEntries
The SOA Composite to demonstrate the functionality in its simplest form is nothing special at all just a BPEL process with an exposed SOAP Web Service (with a single operation processInvoiceLine):
The WSDL for this process is trivial:
a single, one-way operation with a request message with one part. This part is described in the XSD document:
Still nothing special. Now comes the BPEL process. The BPEL process that we can create to process invoice entries is relatively simple. And only a few steps in the development process are specific for the aggregation pattern.
It kicks off with a Receive activity. This activity is associated with the single operation in the WSDL definition. It is configured to initiate the BPEL process instance and to instantiate a correlation set:
The correlation settings for the Receive activity – note that a Correlation Set called CustomerCorrelationSet is initiated by the Receive activity:
The CustomerCorrelationSet is defined with a single property called CustomerIdentifier of type String:
We need one additional link the correlation set up: the mapping between the request message sent to the operation processInvoiceLine and the property CustomerIdentification in the correlation set. When you take a look at the XSD that describes this request message, it is not hard to come up with this mapping:
the element customerIdentifier in the request message contains the value used to identify the BPEL process instance – through the correlation set which is really nothing more than the Unique Key for the BPEL process [instances].
The functionality in the BPEL process for now will be limited: count all invoice lines for the customer and sum the amounts in all lines to a grand total. Two variables are created in the BPEL process to hold these values:
These variables are called LineCount and TotalInvoiceAmount.
When the first invoice line message is received for a customer – and a new instance of the process is initiated – these variables are also initialized, in the first Assign activity in the BPEL process:
Note how LineCount is set to 1 and the TotalInvoiceAmount variable is set to the amount in the first invoice line being processed (copied from the inputVariable that holds this first invoice line).
You also see a new variable called continueProcessing. This is a flag that indicates whether the process should wait for additional invoice line messages for the customer for which the current BPEL instance is created. This flag is referred to in the While loop that waits for more messages to arrive into the instance.
The question is of course how the process knows if there are more invoice lines to be processed. The entries in the file could contain a ‘last message’ indicator – that would be one strategy. Or each message could indicate the total number of lines for the customer. Or we could make use of a timer in the process that interprets a wait of X seconds after the last message has been processed as an indication that no more messages are to be processed for the customer. The exact situation is outside the scope of this article – let’s go for the timer based approach. Which does not necessarily need the flag variable by the way…
Below is the relevant section in the BPEL process. The While loop that is initiated after initializing the variables based on the initial message – and that will keep running for as long as the variable continueProcess has the the value Y (the value it has been assigned in the initialization step). Inside the While loop you see the Receive activity ProcessNextInvoiceLine. This activity is configured exactly the same as the initial Receive activity, but for two important differences: this mid-process receive activity has the checkbox Create Instance unchecked and it will not Initiate the correlation set.
This receive activity will consume messages that were also sent to the processInvoiceLine operation – but with a value for the customer identifier that has already been used for creating a running BPEL instance.
The essence of the new functionality is right here: two Receive activities associated with the same operation – one that will instantiate the instance and the correlation set and the other one that will correlate into a running instance. The engine will decide for itself if it has to do one or the other.
Note: in order for this behavior to work, we need to set a (deployment) property on the BPEL component – in the composite.xml file.
The name of this property is bpel.config.reenableAggregationOnComplete. Its default value is false; in order to make the BPEL engine display the behavior described here, we need to set the property to true. Note: the document (see list of resources at the end of this article) describes in detail what situations you may encounter with this setting; it also makes it clear that this behavior is the exception.
The further processing required in the BPEL process is a way to conclude the instance when we determine that no more messages are forthcoming for the customer. The somewhat roundabout approach I have adopted here goes as follows:
the Scope inside the While loop has an onAlarm event handler that is triggered after 1 minute; that means that if no message has been received in this instance within 60 seconds after processing the previous message, the event is triggered. The handler will throw a fault that causes the while-loop to be aborted. The scope that contains the While loop has a Fault Handler that catches the fault thrown when the alarm is triggered and does nothing. This means that normal processing of the process beyond this scope can resume. This processing could consist of taking the aggregated data and producing an invoice. At the moment – there is no further processing as it is not required to make the point of this article.
The detailed configuration for the Throw and Catch activities is shown here:
The final activity in the BPEL process is the processing of the Invoice Line. In this case, the processing is nothing more than adding one to the LineCount and adding the incoming invoiceAmount to the TotalInvoiceAmount variable.
Running and testing the BPEL process
The SOA Composite that has now been created can easily be tested – even without the file adapter. By invoking the service and operation exposed by the SOA Composite with the following values for CustomerIdentifier and Amount we can see the effect of the aggregation support:
After deploying the SOA Composite application, we can test the service from the EM FMW Console:
Here the message flow trace and the audit trail/flow for Customer D:
After the initial instance creation, two additional invoice line messages are received and processed in that instance. The audit trail for the BPEL process instance looks like this and ends with an interrupted scope (mid-While Loop).
The aggregate over all invoice lines for Customer D is 3700 – as determined by the BPEL process.
The audit flow for the interruption:
This test-run demonstrated that the BPEL engine is capable of aggregating multiple messages into the same BPEL process instance – using the same operation for both the initial receive and the mid process receive.
Simplify process using Pick
The process as is – with the OnMessage Handler, the Throw Fault Activity and the Catch Fault handler – is overly complex. Cannot the same functionality be implemented using a Pick activity: either wait for one minute or receive a message into a running instance? The documentation does not mention the onMessage branch of the Pick activity as being capable of this same feat. So let’s just try the following process and see if it works too.
Here the Pick has an OnMessage branch that is configured similar to the mid process Receive in the previous process definition and an OnAlarm that is configured just like the OnAlarm Scope Event Handler we previously used.
The good news is: this behaves exactly the same as the previous process definition. In other words: the capability to aggregate multiple messages into the same running instance applies also to Pick + On Message and not just to Receive. Here is the audit trail from testing the revised and simplified process:
Download the JDeveloper application with the SOA Composite containing the BPEL process: AggregationOfInvoiceLines.zip.
SOA Suite 11g Developer Guide – Using Correlation Sets and Message Aggregation http://docs.oracle.com/cd/E23943_01/dev.1111/e10224/bp_correlate.htm#CHDEIGCG