Batch Aggregation of files in BPEL process instances based on correlation

3

Remco is an interesting guy with unexpected ideas springing from a creative brain. He can make life interesting, challenging and puzzling. This time he had another interesting challenge – not all that weird to be honest. The challenge in short was:

Our invoicing system produces files that contain one or more invoice entries. Every entry describes an invoice for a certain company we do business with. There can be multiple invoice entries for the same company. The objective is to aggregate together all invoice entries for a company – potentially from many different file. For each invoice entry – some special processing involving service calls is required. Once all entries for a company have been collected and aggregated, some additional action is required – for example recording the company invoice aggregate in a database or in a file and call a webservice to perform additional processing. The files with invoices are produced over a period of a couple of hours. It is important that the processes performing the aggregation are reliable – they should not lose any entries.

The specific question we investigated is: can we solve this puzzle using Oracle SOA Suite 11g? And an early approach towards applying the SOA Suite’s capabilities to this challenge was based on BPEL’s correlation mechanism. In short: every company for which the batches contain invoice entries wil have an instance of a composite called InvoiceAggregator. This instance is carried by a BPEL component that has correlation configured on CompanyId. In our test set up, we have the company instance expire after 5 minutes: it will cease aggregation when it has not received new messages for a period of 5 minutes.

Composite InvoiceProcessor contains a File Adapter that reads the Invoice entries from files arriving in a specified directory. Each entry is passed to a Mediator that forwards it to a BPEL component. This BPEL component instantiates the company specific instance of InvoiceAggregator (if it does not already exist). Then it passes the invoice entry to that instance.

Composite InvoiceProcessor is stateless: after processing an individual invoice entry, it terminates (typically in a couple of 100ms). Composite CompanyAggregator is around for much longer – in our set up at least for 5 minutes and typically longer when multiple invoice entries arrive for the company.

Image

Note: this lay out is a simplification of the real challenge. The essence of the correlation based interaction is captured in this example however.

Stateful Composite CompanyInvoiceAggregator

The composite that runs as the stateful aggregator, processing incoming invoice entries for a specific company – CompanyInvoiceAggregator – consists of a single BPEL component that is wired to a simple outbound File Adapter component to write the final results to a file.

Image

The crucial machinery in this application is hidden away in the deceptively simple BPEL process CompanyInvoicesAggregator. This BPEL process has two ways in: one to instantiate the process instance for a particular company and another to receive an invoice entry for the company represented by the instance.

The Receive at the beginnin of the process is associated with operation Process on partner link CompanyInvoiceAggregatorService. This activity initiates the BPEL process instance. The process continues with the initialization of a few variables and the reply activity that sends a fairly meaningless response message. Why we have a Reply at all will become clear in a moment.

After the Reply – which concludes the first synchronous interaction with this process instance, the process continues. It enters a loop in which it executes a Pick – waiting for a message to arrive – which carries the next invoice entry for the company – or the time out to occur (currently set at 3 minutes for testing purposes).

Image

When a message with a new invoice entry arrives, the total amount for all company invoices is increased and another iteration of the loop is entered. However, when the time out occurs, the loop-flag is reset, the loop is terminated and the final invoice total is written to a file that aggregates all company aggregates. At that point, the process instance for a specific company is complete and will end.

The once really special element in this BPEL process is the onMessage branch in the Pick activity. That is where the running process instances receives an incoming message. Frequently, BPEL process either receive a message at the beginning of the process – to kick off the process – or as callback response to a invoke of an asynchronous service. This is something different: asynchronously receiving unsollicited messages into a running instance. BPEL is capable of this feat thanks to its unique feature of correlation. In short: the BPEL engine can assign a unique identifier to each process instance and it is capable of handing incoming messages to a particular instance, based on that unique identifier – or correlation id as the BPEL terminology is.

Three elements are required to get correlation working:

  • define the correlation set (the composite unique key) for the BPEL process, in terms of properties (note: there can be more than one correlation set in process)
  • specify on a Receive, Invoke or OnMessage activity that the instance should be identified and made available for correlation; at this point we also need to specify what the values are for the properties that constitute the correlation set; these values need to be derived from the incoming or outgoing message
  • configure Receive and OnMessage activities to accepted correlated messages; we need to specify how the values for the properties of the correlation set are found in the incoming messages that we want to correlate on

Let’s see how these three steps are implemented in the CompanyInvoiceAggregator composite application.

1. Define the correlation set

Image

The correlation set has a name and one or more properties. Each property has a name and a type. In this case, the correlation set is called CompanyAndBatchNameCorrelationSet and it has a single property: companyId of type integer.

2. Specify the start of correlation

This BPEL process is identified by the companyId. As soon as the request arrives for the process operation, handled by the Receive activity, should the correlation set be initiated and the process instance be registered with the engine for correlation. This is set up in the Receive activity, on its correlation tab:

Image

The correlation set is added in the correlation tab, to indicate that this Receive activity is related to that set. By setting the value of Initiate to yes we further indicate that this activity is responsible for setting the value of the correlation identifier (the unique key) for this process instance. When this Receive is complete, this instance is published for correlation. We also need to specify what the value is for the correlation identifier. We do this through a so called Property Alias – a mapping of an element in the incoming message that this Receive activity handles to a property in the Correlation Set. In this case, we map the companyId element in the invoiceRecord that is passed to the BPEL process to the property companyId in the correlation set.

3. Configure correlation on incoming messages for a running instance

Our BPEL process needs to receive additional invoice entry messages for this company. We use a Pick activity with an OnMessage branch. This OnMessage activity needs to participate in correlation too. It does not set the identity of the process instance – that is already done in the Receive activity – but it needs to specify that it wants to correlate and how the matching value can be read from the message is processes.

Image

It is important that initiate is set to no for this activity – only one activity can initiate a correlation set. The property companyId was set by the Receive activity, so the OnMessage knows what the companyId is in which it is interested. It should specify how that companyId is found on the CompanyInvoicesAggregatorRequestMessage that it is intent on receiving. A property alias is configured for property companyId, mapping it to the companyId element in the payload of the request message. This instructs the BPEL engine to pass the message to a BPEL process instance identified with the value for companyId found in the message.

With this configuration in place, we can deploy the composite and we could test it.

Stateless Composite InvoiceProcessor

Our second composite is not stateful. It reads a single Invoice Entry from a file, hands it over to a Mediator that forwards it to BPEL process. This BPEL process has a simple responsibility: make sure that the invoice entry is delivered in the instance of CompanyInvoiceAggregator that handles all invoice entries for the company specified in this invoice entry.

Image

This composite nor the BPEL process inside it are aware of correlation. This is important to realize. Even though the BPEL process will benefit from and make use of the correlation functionality built into the CompanyInvoiceAggregator composite, it is not actively aware of it or hooks into it. The InvoiceRecordDistributor simple invokes the Process operation on the CompanyInvoiceAggregator composite’s web service (1) to instantiate the company specific instance and then the operation processNextInvoiceRecord (2) to pass the InvoiceEntry itself.

Image

However, the first call, the invoke of the process operation, will fail on many occasions! The companyId is like a unique key for instances of the composite CompanyInvoiceAggregator and there can be only one instance of that composite for a specific company. Only for the very first invoice entry for a specific company will the call to process on the CompanyInvoiceAggregator succeed. For all subsequenty invoice entries, this call will fail. The BPEL engine will throw a fault because it cannot execute the call to process successfully, because that would result in a second instance with the same unique key value.

However, we do not really mind that the call fails. The only reason for making the call to operation process is to ensure that an instance for the companyId is created. And the only reason for this call to fail (well, the main reason) is that the instance already exists. Which is what we tried to establish, so that is okay.

The scope that makes the call to the process operation has a fault handler that will handle the fault that occurs when an instance for the companyId already exists. It needs to do absolutely nothing, as it is perfectly alright for the company instance to already exist.

After either a successful or a failed call to process on CompanyInvoiceAggregator, the operation processNextInvoiceRecord is invoked. Thanks to correlation, the invoiceEntry record is passed to the BPEL instance that is identified by the companyId value that is contained in the record.

And… Action

The composite InvoiceProcessor is configured to read files with InvoiceEntries from a directory on the SOA Suite server’s file system. For example:

Image

This file is copied to the configured incoming directory:

Image

And removed to be processed after a few seconds. A copy of the file is written to an archive directory:

Image

Processing is underway meanwhile. No outcomes are produced – all company specific instances are waiting for at least three minutes before they terminate.

After a few minutes – well, three to be exact – the following output is produced -with the aggregated invoice amount toal per company:

Image

When we check the instances for the CompanyInvoiceAggregator, we will find more than the 8 instances that produced a line in the output file:

Image

There are many instances that have faulted: one for every second or subsequent invoice entry for a certain company. Because every second or subsequent invoice entry for a company tries to instantiate a correlation id that already exists, it will produce a fault. That fault is caught and the calling InvoiceProcessor instance continues. However, the SOA Suite registers the faulted CompanyInvoiceAggregator as well.

Note that this composite has been adorned with a composite sensor – for the company id. This means we can search instances on the company id- for example for company id equals 4.

The message flow trace for the successfully completed instances looks like this:

Image

The BPEL flow for an instance of CompanyInvoicesAggregator is shown here. After the receive, assign and reply is the start of the scope UntilTimeOutWaitForMoreInvoices. When the invoiceRecord message is received, it can be correlated on the companyId (4) that it contains, to the already running instance.

Image

The fault registered for the failed instances – tried to initiate with a correlation id value that is not unique – can be inspected too:

Image

Drilling into a failed instances provides some insight in what went wrong:

Image

The fault is described as “Conflicting Receive. A similar receive activity is being declared in the same process.” It is not entirely clearly worded, but it is clear that a unique violation on unique identifier has occurred.

Image

Resources

JDeveloper/SOA Suite 11gR1 PS 2 Projects: CompanyInvoiceProcessorAndAggregator.zip

Share.

About Author

Lucas Jellema, active in IT (and with Oracle) since 1994. Oracle ACE Director for Fusion Middleware. Consultant, trainer and instructor on diverse areas including Oracle Database (SQL & PLSQL), Service Oriented Architecture, BPM, ADF, Java in various shapes and forms and many other things. Author of the Oracle Press book: Oracle SOA Suite 11g Handbook. Frequent presenter on conferences such as JavaOne, Oracle OpenWorld, ODTUG Kaleidoscope, Devoxx and OBUG. Presenter for Oracle University Celebrity specials.

3 Comments

  1. Well written article as always Lucas, and I love the intro :).
    So, any idea what happens if you add a delay in CompanyInvoiceAggegator, after AddInvoiceSum? This would simulate the processing taking non-trivial time, and the process might not be waiting for a new message when a new message arrives.
     

Leave a Reply