Sure, here's the multi-threading utility code:
/*
* Created on 24-Jan-2006 Copyright (C) Eurobios, 2006.
*/
package eurobios.threading;
import java.util.Iterator;
import java.util.List;
/**
* Utility class which handles all aspects of parallelising a set of
operations.
* See {@link #run(int, Iterator, IRunItem)} for details of usage. An
example
* calling pattern is:
*
* <pre>
* final Iterator dcIter = new SynchronizedIterator(locations,
interleavedDCs);
*
* ParalleliseOperations.run(parallelThreads, dcIter,
* new ParalleliseOperations.IRunItem() {
* public void run(Object object) {
* ((DeliveryCollection) object).sequenceAndCalculate(tTypes,
* pointTopology, params, optimizationParams, roadParams);
* }
* });
*
* </pre>
*
* Note that the reason you can't wrap up the actual processing inside
the
* 'next()' call of the provided iterator is that that would lock up
the
* synchronized next() method, causing other threads to wait.
* <p>
* Clients of this code include the PostDK-New project and
* eurobios.routeoptimiser.
*
* @author VinceD
*/
public class ParalleliseOperations {
/**
* An implementation of this must be provided by our callers. It is
used to
* trigger an operation on each object returned by the iterator.
*
* @author VinceD
*/
public interface IRunItem {
public void run(Object object) throws InterruptedException;
}
public interface IRunnable {
public void run() throws InterruptedException;
}
/**
* Class to manage all aspects of blocking the current thread until
* calculations are done.
* <p>
* Basic usage:
* <li>Create an instance, specifying the number of threads it will
manage
* <li>Create new threads and set them running, in such a fashion
that
* {@link #threadFinished()} will be called when no more tasks
remain, or when
* {@link #isAborted()} is true.
* <li>Call {@link #waitUntilDone()} in the main thread. Your code
will now
* block until all the managed threads are done.
*
* @author VinceD
*/
private static class ThreadMonitor {
public int runningThreads = 0;
ThreadMonitor(int runningThreads) {
this.runningThreads = runningThreads;
}
RuntimeException abortReason = null;
InterruptedException interruptedReason = null;
synchronized void waitUntilDone() throws InterruptedException {
if (runningThreads > 0) {
wait();
}
if (interruptedReason != null) {
throw interruptedReason;
}
if (abortReason != null) {
throw abortReason;
}
}
synchronized boolean isAborted() {
return abortReason != null;
}
synchronized public void threadFinished() {
runningThreads--;
if (runningThreads == 0) notify();
}
synchronized public void threadFinished(RuntimeException interrupt)
{
if (interrupt != null && abortReason == null) {
abortReason = interrupt;
}
threadFinished();
}
synchronized public void threadFinished(InterruptedException
interrupt) {
if (interrupt != null && interruptedReason == null) {
interruptedReason = interrupt;
}
threadFinished();
}
}
/**
* Run the given list of operations in parallel
*
* @param parallelThreads
* @param operations
* @throws InterruptedException
*/
public static void run(int parallelThreads, final List<IRunnable>
operations)
throws InterruptedException {
run(parallelThreads, operations.iterator(), new IRunItem() {
public void run(Object object) throws InterruptedException {
((IRunnable) object).run();
}
});
}
/**
* For all objects provided by operationIterator's 'next()' method,
call
* 'runner.run(object)'. These operations will be farmed out to a
maximum of
* 'parallelThreads' threads, and this method will only return once
all of the
* operations are complete. The order with which the operations are
carried
* out is not defined, and therefore most code will want to ensure
that
* results of interest are not dependent on that sequence.
* <p>
* It is assumed that 'operationIterator' is thread-safe, which means
that its
* 'hasNext()' and 'next()' methods are synchronized.
* <p>
* If parallelThreads is less than 2, no new threads are created and
a basic
* iteration in the main thread is used instead.
*
* @param parallelThreads
* @param operationIterator
* @param runner
* @throws InterruptedException
*/
public static void run(int parallelThreads, final Iterator
operationIterator,
final IRunItem runner) throws InterruptedException {
if (parallelThreads <= 1) {
while (operationIterator.hasNext()) {
runner.run(operationIterator.next());
}
} else {
final ThreadMonitor groupOfThreads = new
ThreadMonitor(parallelThreads);
class LocalThread implements Runnable {
int threadCounter;
LocalThread(int threadCounter) {
this.threadCounter = threadCounter;
}
public void run() {
try {
// Each thread keeps asking for more operations until there
are
// no more to provide.
while (!groupOfThreads.isAborted() &&
operationIterator.hasNext()) {
Object operation = null;
// Lock the operationIterator while we query and remove
the next
// operation.
synchronized (operationIterator) {
if (operationIterator.hasNext())
operation = operationIterator.next();
}
if (operation != null) {
// System.out.println("Thread " + threadCounter + "
computing");
runner.run(operation);
}
}
// and then notifies that it is done
groupOfThreads.threadFinished();
} catch (InterruptedException ie) {
// or that an error was thrown
groupOfThreads.threadFinished(ie);
} catch (RuntimeException ie) {
// or that an error was thrown
groupOfThreads.threadFinished(ie);
}
}
}
for (int thread = 0; thread < parallelThreads; thread++) {
Thread thisThread = new Thread(new LocalThread(thread+1));
// thisThread.setPriority(java.lang.Thread.MAX_PRIORITY);
thisThread.start();
}
groupOfThreads.waitUntilDone();
}
}
}
--
We use this for a variety of different operations, all fairly
computationally expensive, taking between 1/10sec and several seconds
apiece.
Vince.
> vince.dar...@gmail.com wrote:
> > Our application, at one stage of its processing, does some intensive
[quoted text clipped - 20 lines]
>
> > Vince.Could you show us the code, or, an SSCCE?