Processors are not going to get much faster. No higher clockspeeds are foreseen. The speed of processing will be further increasing through parallellization, engaging multiple CPU cores for handling all tasks rather than a single faster core.
This is but one reason for taking a closer look at the threading model in Java and the way we can do asynchronous and parallel processing as of Java 5. Another reason for my interest in asynchronous processing has to do with (perceived) performance. If an application performs a task on behalf of a user, it may block until the task is completed. The user cannot do anything until the task completes – watching the hourglass or whatever busy cursor is used. With asynchronous processing, a task which the user does not immediately require the results from can be processed in a separate thread. The perception of the user therefore is that the task is performed (or at least processed) much faster than in the synchronous case. And even though it is only perception – perception is usually all that counts!
Furthermore, if the task can be broken in smaller pieces that can be executed in parallel, we really can speed up the task – provided processing power is available. Many tasks involve IO-processing, database access or web service calls – all operations that do not burden the CPU very much and leave room for parallel activities in other threads.
In this article I will tell about my first explorations of the world of Futures, ExecutorServices, CompletionService, Callback interfaces and ThreadPools.
We will look at some very simple classes – to isolate the essence.
Let’s start with the class SlowWorker. It is like an employee who can do work for us. It has a method doWork() that performs some crucial, long-running task. Well, in this case the task is sleeping for 2 seconds. But you get the idea.
package future; public class SlowWorker { public SlowWorker() { } public void doWork() { try { System.out.println("==== working, working, working ====== "); Thread.sleep(2000); System.out.println("==== ready! ======"); } catch (InterruptedException e) { } } public static void main(String[] args) { SlowWorker worker = new SlowWorker(); System.out.println("Start Work" + new java.util.Date()); worker.doWork(); System.out.println("... try to do something while the work is being done...."); System.out.println("End work" + new java.util.Date()); System.exit(0); } }
In the main method, a SlowWorker instance is created and the doWork() is invoked. Then the main method tries to perform some other important task – printing to the system output – while (!) the doWork() churns away on its task. However, since this is a synchronous call, this attempt at parallel activity fails. The output of running this class is:
Start WorkWed Feb 18 07:06:41 CET 2009 ==== working, working, working ====== ==== ready! ====== ... try to do something while the work is being done.... End workWed Feb 18 07:06:43 CET 2009
This tells us – no surprise – that first doWork() completed and only then the “try to do something while…” is processed and sent to the output.
First stab at asynchronous, parallel execution
In Java 5, organizing work in parallel executing tasks has become much easier. The low level thread manipulation of Java 1.4 and before is no longer required or desired. An ExecutorService – almost like a central business unit in an organization where we can submit tasks assignments – takes our task (a Callable object) and has it executed. The ExecutorService returns a Future, an object that has a reference to the task we handed over to the ExecutorService. We can use that ‘claim slip’ to later learn about the progress of our task. Just like we would ask our business unit, using some task identifier they returned to us when we submitted the task, whether the task is complete.
And just like the central business unit would have one or more staff members that can work on a task assignment when the previous one was finished – the ExecutorService has a ThreadPool with one or multiple threads. When a thread is idle, it can take on a Callable object that was submitted to the ExecutorService. When the thread completes the task, it will notify the ExecutorService that in turn updates the Future object.
Anyone with a reference to the Future object can inspect the task’s progress using for example the isDone() method on the Future. With a call to get() on the Future, we can get the result of the executing the task. Note however that this call will block until the result is available! It is like asking the business unit for the progress of the task and being forced to wait until the task is complete and an answer is given. As soon as future.get() is called, the parallellism vanishes as the calling thread is blocked until the task executed on the parallel thread completes.
A code example of this:
package future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class AsynchronousWorker { public AsynchronousWorker() { } public static void main(String[] args) { System.out.println("Start Work" + new java.util.Date()); ExecutorService es = Executors.newFixedThreadPool(3); final Future future = es.submit(new Callable() { public Object call() throws Exception { new SlowWorker().doWork(); return null; } }); System.out.println("... try to do something while the work is being done...."); System.out.println("... and more ...."); try { future.get(); // blocking call - the main thread blocks until task is done } catch (InterruptedException e) { } catch (ExecutionException e) { } System.out.println("End work" + new java.util.Date()); System.exit(0); } }
We instantiate an ExecutorService with a thread pool consisting of three threads. We submit a Callable object to this service – that does nothing more than calling a SlowWorker object to perform doWork(). The ExecutorService hands us the claim slip – the Future object. We can then continue processing – “try to do something while…” – and leave it to the ExecutorService to find an available thread to handle the Callable object.
When we run this AsynchronousWorker, the output is like this:
Start WorkWed Feb 18 07:28:09 CET 2009 ... try to do something while the work is being done.... ... and more .... ==== working, working, working ====== (Worker Id = 1) ==== ready! ====== End workWed Feb 18 07:28:12 CET 2009
This tells us that after we started the main thread did the “something while” (after sending the Callable task to the ExecutorService) and “… and more …”. At that point we see the first sign of life from the SlowWorker – an indication that the ExecutorService has found a thread that is now busy processing our task. At some point the SlowWorker is done (ready!) and the main thread ends as well. Note that the call future.get() has the main thread blocked until the SlowWorker is done.
Executing multiple tasks – true parallel activity
Having one task processed asynchronously is only mildly useful – especially if you do not really have much useful to do yourself. If you hand your task to the central business unit – say Office Management – for ‘parallel processing’ then go take a cup of coffee yourself while you wait for the task to be done does not seem overly useful.
We will now look at the situation where multiple tasks have to be performed. Engaging multiple parallel threads for handling those tasks should speed up the over all process.
First the sequential situation:
package future; import java.util.Date; public class SequentialWorker { public SequentialWorker() { } private static int numberOfJobs = 5; public static void main(String[] args) { Date startTime = new java.util.Date(); System.out.println("Start Work" + startTime); for(int i=0;i <numberOfJobs;i++) { System.out.println("* Start worker "+i); SlowWorker worker = new SlowWorker(i); worker.doWork(); } System.out.println("... try to do something while the work is being done...."); Date endTime = new java.util.Date(); System.out.println("End work at " + endTime); System.out.println("Job took " + new Double(0.001*(endTime.getTime() - startTime.getTime()))+ " seconds"); System.exit(0); } }[/code] Here we have the normal situation: five jobs are performed - five calls to SlowWorker's doWork() method. And since we do not engage parallel processing, we get sequential processing. Since a job takes 2 seconds, the entire program will run for at least 10 seconds: [code language="java"]Start WorkWed Feb 18 07:51:07 CET 2009 * Start worker 0 ==== working, working, working ====== (Worker Id = 0) ==== ready! ====== * Start worker 1 ==== working, working, working ====== (Worker Id = 1) ==== ready! ====== * Start worker 2 ==== working, working, working ====== (Worker Id = 2) ==== ready! ====== * Start worker 3 ==== working, working, working ====== (Worker Id = 3) ==== ready! ====== * Start worker 4 ==== working, working, working ====== (Worker Id = 4) ==== ready! ====== ... try to do something while the work is being done.... End work at Wed Feb 18 07:51:17 CET 2009 Job took 10.046 seconds[/code] We see that the overall job takes 10 seconds and a bit and the 'do something while' is done only after all jobs have been processed. Very sequentially all of this. Now we will parallellize that same workload, using the ExecutorService: [code language="java"]package future; ... imports public class SequentialAsynchronousWorker { public SequentialAsynchronousWorker() { } private static int numberOfJobs = 5; public static void main(String[] args) { Date startTime = new java.util.Date(); System.out.println("Start Work" + startTime); ExecutorService es = Executors.newFixedThreadPool(3); List<Future> futures = new ArrayList<Future>(); for(int i=0;i<numberOfJobs;i++) { System.out.println("* Start worker "+i); futures.add(es.submit(new Callable() { public Object call() throws Exception { new SlowWorker().doWork(); return null; } })); } System.out.println("... try to do something while the work is being done...."); System.out.println("... and more ...."); int ctr=0; for (Future future:futures) try { future.get(); // blocking call, explicitly waiting for the response from a specific task, not necessarily the first task that is completed System.out.println("** response worker "+ ++ctr +" is in"); } catch (InterruptedException e) { } catch (ExecutionException e) { } Date endTime = new java.util.Date(); System.out.println("End work at " + endTime); System.out.println("Job took " + new Double(0.001*(endTime.getTime() - startTime.getTime()))+ " seconds"); System.exit(0); } }[/code] When we run this - the throughput time is decreased to little over 4 seconds. [code language="java"]Start WorkWed Feb 18 08:08:47 CET 2009 * Start worker 0 * Start worker 1 * Start worker 2 * Start worker 3 * Start worker 4 ... try to do something while the work is being done.... ... and more .... ==== working, working, working ====== (Worker Id = 1) ==== working, working, working ====== (Worker Id = 1) ==== working, working, working ====== (Worker Id = 1) ==== ready! ====== ==== ready! ====== ==== working, working, working ====== (Worker Id = 1) ** response worker 1 is in ** response worker 2 is in ==== working, working, working ====== (Worker Id = 1) ==== ready! ====== ** response worker 3 is in ==== ready! ====== ** response worker 4 is in ==== ready! ====== ** response worker 5 is in End work at Wed Feb 18 08:08:51 CET 2009 Job took 4.078 seconds[/code] This is explained from the size of the ThreadPool: with 3 threads at its disposal, the ExecutorService can have three tasks executed in parallel. Since we submitted five tasks, it can start processing the last two tasks only when the first two threads are done processing their task - after about two seconds. Processing the second batch of tasks takes another two seconds, hence the overall time of about 4 seconds. Note that the loop over the futures checks the completion of the futures in the same order as the tasks were submitted. Each future.get() call is blocking. If the first task would take much longer to complete than the second, we would be waiting for the result of the first task while we could already proceed with the result of the second task, if only we had asked for it. One solution is to first call future.isDone() and only call future.get() when future.isDone() returns true. Another is use of a CompletionService - as wel will see shortly. When we increase the size of the ThreadPool, we make more threads (workers) available to the ExecutorService - so all tasks can processed in parallel and the overall processing time goes down to about two seconds. [code language="java"]Start WorkWed Feb 18 08:08:07 CET 2009 * Start worker 0 * Start worker 1 * Start worker 2 * Start worker 3 * Start worker 4 ... try to do something while the work is being done.... ... and more .... ==== working, working, working ====== (Worker Id = 1) ==== working, working, working ====== (Worker Id = 1) ==== working, working, working ====== (Worker Id = 1) ==== working, working, working ====== (Worker Id = 1) ==== working, working, working ====== (Worker Id = 1) ==== ready! ====== ==== ready! ====== ==== ready! ====== ==== ready! ====== ** response worker 1 is in ** response worker 2 is in ** response worker 3 is in ** response worker 4 is in ==== ready! ====== ** response worker 5 is in End work at Wed Feb 18 08:08:09 CET 2009 Job took 2.093 seconds[/code] When we use a CompletionService on top of the ExecutorService, we provide ourselves with a intermediate that we can consult to learn whether any of the submitted tasks has completed. For the first completed task that we have not handled before, we get the future returned, that we then can process in the same way as before. So instead of checking the futures blindly hoping that the one we inspect has completed, we ask the CompletionService to do that for us and have it return the task that is done. Now we may have the responses returned in a slightly different order. [code language="java"]package future; import java.util.Date; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ParallelWorker { public ParallelWorker() { } private static int numberOfJobs = 5; public static int workerId; public static void main(String[] args) { Date startTime = new java.util.Date(); System.out.println("Start Work" + startTime); ExecutorService es = Executors.newFixedThreadPool(3); CompletionService<Object> cs = new ExecutorCompletionService<Object>(es); for (int i=0;i<numberOfJobs;i++) { workerId = i; cs.submit(new Callable<Object>() { public Object call() throws Exception { new SlowWorker( ParallelWorker.workerId).doWork(); return null; }}); } System.out.println("... try to do something while the work is being done...."); System.out.println("... and more ...."); for (int i = 0; i < numberOfJobs; i++) { Object x; try { x = cs.take().get(); // find the first completed task } catch (InterruptedException e) { } catch (ExecutionException e) { } } Date endTime = new java.util.Date(); System.out.println("End work at " + endTime); System.out.println("Job took " + new Double(0.001*(endTime.getTime() - startTime.getTime()))+ " seconds"); System.exit(0); } }[/code] <h3>Please call us as we will not call you: the call back interface</h3> Instead of having to ask whether a task has been done, we could prefer to have the workers inform us of the fact they have completed a job. That is an approach we can take with the asynchronous processing in Java too. We will not call future.get() or some other method to ask if hopefully our task has been completed. We instruct the aysynchronous 'slave' to come back to us to tell us when it is done. Well, to be more precise: we make it part of the job we submit to call us at the end of it. There is no special magic to it, no special infrastructure in the Java language for this call back structure. It is a simple Design Pattern that we apply. First of all, the task itself is more formally specified, not using a Callable object that is created on the fly but using a formal Class definition: [code language="java"]package future; import java.util.concurrent.Callable; public class CallingBackWorker implements Callable { private CallbackInterface employer; public CallingBackWorker() { } public Object call() { new SlowWorker().doWork(); employer.returnResult("Task Completed!"); return null; } public void setEmployer(CallbackInterface employer) { this.employer = employer; } public CallbackInterface getEmployer() { return employer; } }
You will notice that this class expects to have a CallBackInterface set, an employer it will call when the work is done. So in order to make use of this CallingBackWorker – that in turn invokes the SlowWorker again – we need to inject it with an implementation of the CallBackInterface.
This interface is as simple as you would expect:
package future; public interface CallbackInterface { public void returnResult(Object result); }
And one implementation of it is class CalledBack. This class submits five tasks and then sits and waits to be called by each asynchronous CallingBackWorker when the task is done.
package future; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class CalledBack implements CallbackInterface{ Object result; public CalledBack() { } public void returnResult(Object result) { System.out.println("Result Received "+result); this.result = result; } public void andAction() { ExecutorService es = Executors.newFixedThreadPool(3); CallingBackWorker worker = new CallingBackWorker(); worker.setEmployer(this); final Future future = es.submit( worker); System.out.println("... try to do something while the work is being done...."); System.out.println("... and more ...."); System.out.println("End work" + new java.util.Date()); } public static void main(String[] args) { new CalledBack().andAction(); } }
The output from this process is not spectacular:
... try to do something while the work is being done.... ... and more .... ==== working, working, working ====== (Worker Id = 1) End workWed Feb 18 11:05:11 CET 2009 ==== ready! ====== Result Received Task Completed!
but we did not have to ask for the result ourselves, and that is good news!
Resources
Download sources for this article: AsynchProcessingfuture.zip
Fantastic article. I have been getting stumped on async execution and always wondered how best to do it. This will atelast give me a very good guide on how to implement it.
Excellent, well written article on Executors . Thanks & Thanks.
Clear and educationalist. Thanks a lot for sharing this !
Very interesting and very well written article. Thanks a lot!
Very well drafted article…..
Very well explained …
Superb article!!
Very good explaination..Very well written article..
It’s a pretty clear article. I especially appreciate the example code to show callbacks vs ExecutorCompletionService – I had assumed I must be missing something since callbacks are obviously part of a fully asynchronous request/response, but are not discussed in java.util.concurrent. Future is ok, but with get() and take() it seems to be only 1-way async. The JSR 166 group apparently considered the issue but did not include callbacks (http://osdir.com/ml/java.jsr.166-concurrency/2002-09/msg00068.html).
Another way to use util.concurrent 2-way async without callbacks would be to have an executor run a “request service” and also run a second queue back to a “response handling service.” That way you get the benefit of queueing, and you can free up the “request service” threads a little sooner. But callbacks will work well in many situations where response handling is fast (setting values or using in-memory data structures). The javadocs for javax.xml.ws.Service hint that Service may use this strategy.
One suggestion: change the formatting a little – there are some long lines with “2 lines of code” on them. I was initially confused because I was reading with a large font size, and the 2nd line (very important code) was invisible off to the right of the text box.
This is the line that clued me in to the problem, since I originally did not see “cs” defined:
ExecutorService es = Executors.newFixedThreadPool(3); CompletionService cs = new ExecutorCompletionService(es);
Very well done article. One of the best I have seen explaining the concepts. Thanks.