Sunday 2 August 2015

Tech Talk... Java fork/join pool

What is Java fork/join pool?
There are situations when there are multiple processors that an application can leverage on to increase its computational efficiency. How the application can do this, is generally, by breaking a big chunk of work that it might be serially doing, into subtasks which can then be divided among the different processors available, which can do the processing in parallel. The java fork/join pool provides a framework to achieve exactly the same. It wraps the complexity of thread creation, pool management and getting the work done in parallel, leaving to the user the only responsibility of his core computation logic and splitting the work when desired.
How to use Java fork/pool?
Simple, need to extend a ForkJoinTask (or its subclass RecursiveAction) and then assign it to a ForkJoinPool which will get it executed. The pool executes the task and on completion, returns back the control to the invoker thread.
The Fork/Join concept it recursion based, so, the ForkJoinTask invokes its instances recursively until the work is broken into the desired level of chunks and then is executed. The invocation through invokeAll method insures that all the tasks will be completed in parallel and then the control will be returned back to the caller, so, it would appear to the caller as if the tasks were done in a blocking fashion.
Sample program
Below is a sample program that shows the use of fork/join. The execution that the program needs to do is simple - print elements of an array list, along with the name of thread that is printing it. But, the elements need not be printed in order, whenever the size of the array list is greater than a threshold, subparts of this array list are recursively handed over to further instances of the ForkJoinTask which get it executed in parallel. The parallel execution can be observed by running the task multiple times and seeing that the sub-array-lists are executed in alternate orders at time and the order keeps changing every time the program is executed.
So, below are the steps for writing a program to use the fork/join framework:
Step1: Subclass the ForkJoinTask class and have your implementation inside it:
public class ForkJoinExample extends RecursiveAction {
       //An array list that would hold the numbers to be printed
private ArrayList<Integer> events;
       //if the number of elements in the array list is > this threshold, divide the task
private int computationThreshold;
       //name of the class, when printed, can help identify how the parallel execution is happening
private String name;
       //counter for appending to the name of the class
private static int count=0;

public ForkJoinExample(ArrayList<Integer> events, int computationThreshold,
String name) {
this.events = events;
this.computationThreshold = computationThreshold;
this.name = name;
}

//this method is supposed to be invoked for direct computation once the work has been broken down to the desired extent
private void computeDirect(){
for(int i=0;i<events.size();i++){
System.out.println("Printing from :" + name + ":  " +events.get(i));
}
}

//this is the method that would be invoked by the ForkJoinPool
//Inside this method, the work can be further subdivided recursively
@Override
protected void compute() {
//if the threshold value is not reached, direct computing can be done
if(events.size() <= computationThreshold){
computeDirect();
return;
}else{
//break the task among threads and let each one do the computation of its own
int split = events.size()/2;
ArrayList<Integer>firstHalf = new ArrayList<Integer>();
ArrayList<Integer>secondHalf = new ArrayList<Integer>();
for(int i : events){
if(i>split){
secondHalf.add(i);
}else{
firstHalf.add(i);
}
}
invokeAll(new ForkJoinExample(firstHalf, computationThreshold, this.name+(++count)),
new ForkJoinExample(secondHalf, computationThreshold, this.name+(++count)));
}
}
}
Step 2: Create a ForkJoinPool and let it invoke the ForkJoinTask:
ForkJoinExample forkJoinExample = new ForkJoinExample(
new ArrayList<Integer>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)), 4, "Thread");
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(forkJoinExample);

Sample outputs on running this program:

Output Sample 1:
Printing from :Thread1:  1
Printing from :Thread1:  2
Printing from :Thread1:  3
Printing from :Thread1:  4
Printing from :Thread2:  5
Printing from :Thread2:  6
Printing from :Thread2:  7
Printing from :Thread2:  8

Output Sample 2:
Printing from :Thread1:  1
Printing from :Thread2:  5

Printing from :Thread1:  2
Printing from :Thread2:  6
Printing from :Thread1:  3
Printing from :Thread2:  7
Printing from :Thread1:  4
Printing from :Thread2:  8

No comments:

Post a Comment