org.metasyntactic.thread.concurrent
Class ForkJoinTaskRunnerGroup

java.lang.Object
  |
  +--org.metasyntactic.thread.concurrent.ForkJoinTaskRunnerGroup
All Implemented Interfaces:
Executor

public class ForkJoinTaskRunnerGroup
extends java.lang.Object
implements Executor

A stripped down analog of a ThreadGroup used for establishing and managing ForkJoinTaskRunner threads. ThreadRunnerGroups serve as the control boundary separating the general world of normal threads from the specialized world of ForkJoinTasks.

By intent, this class does not subclass java.lang.ThreadGroup, and does not support most methods found in ThreadGroups, since they would make no sense for ForkJoinTaskRunner threads. In fact, the class does not deal with ThreadGroups at all. If you want to restrict a ForkJoinTaskRunnerGroup to a particular ThreadGroup, you can create it from within that ThreadGroup.

The main contextual parameter for a ForkJoinTaskRunnerGroup isthe group size, established in the constructor. Groups must be of a fixed size. There is no way to dynamically increase or decrease the number of threads in an existing group.

In general, the group size should be equal to the number of CPUs on the system. (Unfortunately, there is no portable means of automatically detecting the number of CPUs on a JVM, so there is no good way to automate defaults.) In principle, when ForkJoinTasks are used for computation-intensive tasks, having only as many threads as CPUs should minimize bookkeeping overhead and contention, and so maximize throughput. However, because ForkJoinTaskRunners lie atop Java threads, and in turn operating system thread support and scheduling policies, it is very possible that using more threads than CPUs will improve overall throughput even though it adds to overhead. This will always be so if ForkJoinTasks are I/O bound. So it may pay to experiment a bit when tuning on particular platforms. You can also use setRunPriorities to either increase or decrease the priorities of active threads, which may interact with group size choice.

In any case, overestimating group sizes never seriously degrades performance (at least within reasonable bounds). You can also use a value less than the number of CPUs in order to reserve processing for unrelated threads.

There are two general styles for using a ForkJoinTaskRunnerGroup. You can create one group per entire program execution, for example as a static singleton, and use it for all parallel tasks:

 class Tasks {
   static ForkJoinTaskRunnerGroup group;
   public void initialize(int groupsize) {
	  group = new ForkJoinTaskRunnerGroup(groupSize);
   }
   // ...
 }
 
Alternatively, you can make new groups on the fly and use them only for particular task sets. This is more flexible, and leads to more controllable and deterministic execution patterns, but it encounters greater overhead on startup. Also, to reclaim system resources, you should call ForkJoinTaskRunnerGroup.interruptAll when you are done using one-shot groups. Otherwise, because ForkJoinTaskRunners set Thread.isDaemon status, they will not normally be reclaimed until program termination.

The main supported methods are execute, which starts a task processed by ForkJoinTaskRunner threads, and invoke, which starts one and waits for completion. For example, you might extend the above ForkJoinTasks class to support a task-based computation, say, the Fib class from the ForkJoinTask documentation:

 class Tasks { // continued
   // ...
   static int fib(int n) {
	 try {
	   Fib f = new Fib(n);
	   group.invoke(f);
	   return f.getAnswer();
	 }
	 catch (InterruptedException ex) {
	   throw new Error("Interrupted during computation");
	 }
   }
 }
 

Method stats() can be used to monitor performance. Both ForkJoinTaskRunnerGroup and ForkJoinTaskRunner may be compiled with the compile-time constant COLLECT_STATS set to false. In this case, various simple counts reported in stats() are not collected. On platforms tested, this leads to such a tiny performance improvement that there is very little motivation to bother.

See Also:
ForkJoinTask, ForkJoinTaskRunner

Constructor Summary
ForkJoinTaskRunnerGroup(int groupSize)
          Create a ForkJoinTaskRunnerGroup with the indicated number of ForkJoinTaskRunner threads.
 
Method Summary
protected  void checkActive(ForkJoinTaskRunner t, long scans)
          Set active status of thread t to false, and then wait until: (a) there is a task in the entry queue, or (b) other threads are active, or (c) the current thread is interrupted.
 void execute(java.lang.Runnable r)
          Arrange for execution of the given task by placing it in a work queue.
 void executeTask(ForkJoinTask t)
          Specialized form of execute called only from within ForkJoinTasks
 int getActiveCount()
          Return the number of threads that are not idly waiting for work.
protected  ForkJoinTaskRunner[] getArray()
          Return the array of threads in this group.
 void interruptAll()
          Try to shut down all ForkJoinTaskRunner threads in this group by interrupting them all.
 void invoke(java.lang.Runnable r)
          Start a task and wait it out.
protected  ForkJoinTask pollEntryQueue()
          Return a task from entry queue, or null if empty.
protected  void setActive(ForkJoinTaskRunner t)
          Set active status of thread t to true, and notify others that might be waiting for work.
protected  void setInactive(ForkJoinTaskRunner t)
          Set active status of thread t to false.
 void setRunPriorities(int pri)
          Set the priority to use while a ForkJoinTaskRunner is actively running tasks.
 void setScanPriorities(int pri)
          Set the priority to use while a ForkJoinTaskRunner is polling for new tasks to perform.
 int size()
          Return the number of ForkJoinTaskRunner threads in this group
 void stats()
          Prints various snapshot statistics to System.out.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ForkJoinTaskRunnerGroup

public ForkJoinTaskRunnerGroup(int groupSize)
Create a ForkJoinTaskRunnerGroup with the indicated number of ForkJoinTaskRunner threads. Normally, the best size to use is the number of CPUs on the system.

The threads in a ForkJoinTaskRunnerGroup are created with their isDaemon status set, so do not normally need to be shut down manually upon program termination.

Parameters:
groupSize - The group size
Method Detail

execute

public void execute(java.lang.Runnable r)
             throws java.lang.InterruptedException
Arrange for execution of the given task by placing it in a work queue. If the argument is not of type ForkJoinTask, it is embedded in a ForkJoinTask via ForkJoinTask.Wrap.

Specified by:
execute in interface Executor
Parameters:
r - THe runnable to execute
Throws:
java.lang.InterruptedException - if current Thread is currently interrupted

executeTask

public void executeTask(ForkJoinTask t)
Specialized form of execute called only from within ForkJoinTasks

Parameters:
t - The task to execute

invoke

public void invoke(java.lang.Runnable r)
            throws java.lang.InterruptedException
Start a task and wait it out. Returns when the task completes.

Parameters:
r - The runnable to invoke
Throws:
java.lang.InterruptedException - if current Thread is interrupted before completion of the task.

interruptAll

public void interruptAll()
Try to shut down all ForkJoinTaskRunner threads in this group by interrupting them all. This method is designed to be used during cleanup when it is somehow known that all threads are idle. ForkJoinTaskRunners only check for interruption when they are not otherwise processing a task (and its generated subtasks, if any), so if any threads are active, shutdown may take a while, and may lead to unpredictable task processing.


setScanPriorities

public void setScanPriorities(int pri)
Set the priority to use while a ForkJoinTaskRunner is polling for new tasks to perform. Default is currently Thread.MIN_PRIORITY+1. The value set may not go into effect immediately, but will be used at least the next time a thread scans for work.

Parameters:
pri - The priority to set the scans to

setRunPriorities

public void setRunPriorities(int pri)
Set the priority to use while a ForkJoinTaskRunner is actively running tasks. Default is the priority that was in effect by the thread that constructed this ForkJoinTaskRunnerGroup. Setting this value while threads are running may momentarily result in them running at this priority even when idly waiting for work.

Parameters:
pri - The new run priority

size

public int size()
Return the number of ForkJoinTaskRunner threads in this group

Returns:
the number of ForkJoinTaskRunner threads

getActiveCount

public int getActiveCount()
Return the number of threads that are not idly waiting for work. Beware that even active threads might not be doing any useful work, but just spinning waiting for other dependent tasks. Also, since this is just a snapshot value, some tasks may be in the process of becoming idle.

Returns:
The active count

stats

public void stats()
Prints various snapshot statistics to System.out.

Cautions: Some statistics are updated and gathered without synchronization, so may not be accurate. However, reported counts may be considered as lower bounds of actual values. Some values may be zero if classes are compiled with COLLECT_STATS set to false. (ForkJoinTaskRunner and ForkJoinTaskRunnerGroup classes can be independently compiled with different values of COLLECT_STATS.) Also, the counts are maintained as ints so could overflow in exceptionally long-lived applications.

These statistics can be useful when tuning algorithms or diagnosing problems. For example:


getArray

protected ForkJoinTaskRunner[] getArray()
Return the array of threads in this group. Called only by ForkJoinTaskRunner.scan().

Returns:
the threads in the group

pollEntryQueue

protected ForkJoinTask pollEntryQueue()
Return a task from entry queue, or null if empty. Called only by ForkJoinTaskRunner.scan().

Returns:
A Task from the queue, or null if empty

setActive

protected void setActive(ForkJoinTaskRunner t)
Set active status of thread t to true, and notify others that might be waiting for work.

Parameters:
t - The task to set active

setInactive

protected void setInactive(ForkJoinTaskRunner t)
Set active status of thread t to false.

Parameters:
t - The task to set inactive

checkActive

protected void checkActive(ForkJoinTaskRunner t,
                           long scans)
Set active status of thread t to false, and then wait until: (a) there is a task in the entry queue, or (b) other threads are active, or (c) the current thread is interrupted. Upon return, it is not certain that there will be work available. The thread must itself check.

The main underlying reason for these mechanics is that threads do not signal each other when they add elements to their queues. (This would add to task overhead, reduce locality, and increase contention.) So we must rely on a tamed form of polling. However, tasks inserted into the entry queue do result in signals, so tasks can wait on these if all of them are otherwise idle.