闽公网安备 35020302035485号
Fork/Join对任务的拆分和对结果合并过程也是如此,可以用下面伪代码来表示:
solve(problem):
if problem is small enough:
// 堆代码 duidaima.com
// 如果任务足够小,执行任务
solve problem directly (sequential algorithm)
else:
// 拆分任务
for part in subdivide(problem)
fork subtask to solve(part)
// 合并结果
join all subtasks spawned in previous loop
return combined results
所以,理解Fork/Join模型和ForkJoinPool线程池,首先要理解其背后的算法的目的和思想,因为后文所要详述的ForkJoinPool不过只是这种算法的一种的实现和应用。public class TheKingRecursiveSumTask extends RecursiveTask<Long> {
private static final AtomicInteger taskCount = new AtomicInteger();
private final int sumBegin;
private final int sumEnd;
/**
* 任务拆分阈值,当任务尺寸大于该值时,进行拆分
*/
private final int threshold;
public TheKingRecursiveSumTask(int sumBegin, int sumEnd, int threshold) {
this.sumBegin = sumBegin;
this.sumEnd = sumEnd;
this.threshold = threshold;
}
@Override
protected Long compute() {
if ((sumEnd - sumBegin) > threshold) {
// 两个数之间的差值大于阈值,拆分任务
TheKingRecursiveSumTask subTask1 = new TheKingRecursiveSumTask(sumBegin, (sumBegin + sumEnd) / 2, threshold);
TheKingRecursiveSumTask subTask2 = new TheKingRecursiveSumTask((sumBegin + sumEnd) / 2, sumEnd, threshold);
subTask1.fork();
subTask2.fork();
taskCount.incrementAndGet();
return subTask1.join() + subTask2.join();
}
// 直接执行结果
long result = 0L;
for (int i = sumBegin; i < sumEnd; i++) {
result += i;
}
return result;
}
public static AtomicInteger getTaskCount() {
return taskCount;
}
}
在下面的代码中,我们设置的计算区间值0~10000000,当计算的个数超过100时,将对任务进行拆分,最大并发数设置为16. public static void main(String[] args) {
int sumBegin = 0, sumEnd = 10000000;
computeByForkJoin(sumBegin, sumEnd);
computeBySingleThread(sumBegin, sumEnd);
}
private static void computeByForkJoin(int sumBegin, int sumEnd) {
ForkJoinPool forkJoinPool = new ForkJoinPool(16);
long forkJoinStartTime = System.nanoTime();
TheKingRecursiveSumTask theKingRecursiveSumTask = new TheKingRecursiveSumTask(sumBegin, sumEnd, 100);
long forkJoinResult = forkJoinPool.invoke(theKingRecursiveSumTask);
System.out.println("======");
System.out.println("ForkJoin任务拆分:" + TheKingRecursiveSumTask.getTaskCount());
System.out.println("ForkJoin计算结果:" + forkJoinResult);
System.out.println("ForkJoin计算耗时:" + (System.nanoTime() - forkJoinStartTime) / 1000000);
}
private static void computeBySingleThread(int sumBegin, int sumEnd) {
long computeResult = 0 L;
long startTime = System.nanoTime();
for (int i = sumBegin; i < sumEnd; i++) {
computeResult += i;
}
System.out.println("======");
System.out.println("单线程计算结果:" + computeResult);
System.out.println("单线程计算耗时:" + (System.nanoTime() - startTime) / 1000000);
}
运行结果如下:====== ForkJoin任务拆分:131071 ForkJoin计算结果:49999995000000 ForkJoin计算耗时:207 ====== 单线程计算结果:49999995000000 单线程计算耗时:40 Process finished with exit code 0从计算结果中可以看到,ForkJoinPool总共进行了131071次的任务拆分,最终的计算结果是49999995000000,耗时207毫秒。 不过,细心的你可能已经发现了,ForkJoin的并行计算的耗时竟然比单程程还慢?并且足足慢了近5倍!先别慌,关于ForkJoin的性能问题,我们会在后文有讲解。
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
(2)方式二:通过并行数构造public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
(3)方式三:自定义全部参数构造public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE,
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
2. 按类型提交不同任务|
|
从非fork/join线程调用 | 从fork/join调用 |
|---|---|---|
| 提交异步执行 | execute(ForkJoinTask) | ForkJoinTask.fork() |
| 等待并获取结果 | invoke(ForkJoinTask) | ForkJoinTask.invoke() |
| 提交执行获取Future结果 | submit(ForkJoinTask) | ForkJoinTask.fork() (ForkJoinTasks are Futures) |
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
(2)第二类核心方法:execute public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.RunnableExecuteAction(task);
externalPush(job);
}
(3)第三类核心方法:submitpublic < T > ForkJoinTask < T > submit(ForkJoinTask < T > task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
public < T > ForkJoinTask < T > submit(Callable < T > task) {
ForkJoinTask < T > job = new ForkJoinTask.AdaptedCallable < T > (task);
externalPush(job);
return job;
}
public < T > ForkJoinTask < T > submit(Runnable task, T result) {
ForkJoinTask < T > job = new ForkJoinTask.AdaptedRunnable < T > (task, result);
externalPush(job);
return job;
}
public ForkJoinTask < ? > submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask < ? > job;
if (task instanceof ForkJoinTask < ? > ) // avoid re-wrap
job = (ForkJoinTask < ? > ) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
externalPush(job);
return job;
}
3. ForkJoinTask在类的设计上,ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future,它们之间的关系如下图所示。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
join-获取任务执行结果public final V join() {
int s;
// 如果调用doJoin返回的非NORMAL状态,将报告异常
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
// 正常执行结束,返回原始结果
return getRawResult();
}
private int doJoin() {
int s;
Thread t;
ForkJoinWorkerThread wt;
ForkJoinPool.WorkQueue w;
//如果已完成,返回状态
return (s = status) < 0 ? s :
//如果未完成且当前线程是ForkJoinWorkerThread,则从该线程中取出workQueue,并尝试将当前task取出执行。如果执行的结果是完成,则返回状态;否则,使用当前线程池awaitJoin方法进行等待
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread) t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0 L):
//当前线程非ForkJoinWorkerThread,调用externalAwaitDone方法等待
externalAwaitDone();
}
final int doExec() {
int s;
boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
// 执行完成后,将状态设置为NORMAL
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
(2)RecursiveAction与RecursiveTask
在ForkJoinPool中,常用的有两种任务类型:返回结果的和不返回结果的,这方面和ThreadPoolExecutor等线程池是一致的,对应的两个类分别是:RecursiveAction和RecursiveTask. 从类图中可以看到,它们均继承于ForkJoinTask.
static class SortTask extends RecursiveAction {
final long[] array;
final int lo, hi;
SortTask(long[] array, int lo, int hi) {
this.array = array;
this.lo = lo;
this.hi = hi;
}
SortTask(long[] array) {
this(array, 0, array.length);
}
// 核心计算方法
protected void compute() {
if (hi - lo < THRESHOLD)
// 直接执行
sortSequentially(lo, hi);
else {
// 拆分任务
int mid = (lo + hi) >>> 1;
invokeAll(new SortTask(array, lo, mid),
new SortTask(array, mid, hi));
merge(lo, mid, hi);
}
}
// implementation details follow:
static final int THRESHOLD = 1000;
void sortSequentially(int lo, int hi) {
Arrays.sort(array, lo, hi);
}
void merge(int lo, int mid, int hi) {
long[] buf = Arrays.copyOfRange(array, lo, mid);
for (int i = 0, j = lo, k = mid; i < buf.length; j++)
array[j] = (k == hi || buf[i] < array[k]) ?
buf[i++] : array[k++];
}
}
RecursiveTask:返回结果class Fibonacci extends RecursiveTask<Integer> {
final int n;
Fibonacci(int n) { this.n = n; }
Integer compute() {
if (n <= 1)
return n;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
Fibonacci f2 = new Fibonacci(n - 2);
return f2.compute() + f1.join();
}
}
(3)ForkJoinTask使用限制public int getRunningThreadCount() {
int rc = 0;
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null && w.isApparentlyUnblocked())
++rc;
}
}
return rc;
}
(2)获取活跃线程数量public int getActiveThreadCount() {
int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
(3)判断ForkJoinPool是否空闲public boolean isQuiescent() {
return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}
(4)获取任务窃取数量public long getStealCount() {
AtomicLong sc = stealCounter;
long count = (sc == null) ? 0 L : sc.get();
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
count += w.nsteals;
}
}
return count;
}
(5)获取队列中的任务数量public long getQueuedTaskCount() {
long count = 0;
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
count += w.queueSize();
}
}
return count;
}
(6)获取已提交的任务数量public int getQueuedSubmissionCount() {
int count = 0;
WorkQueue[] ws;
WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
count += w.queueSize();
}
}
return count;
}
四、警惕ForkJoinPool#commonPool====== ForkJoin任务拆分:16383 ForkJoin计算结果:499999999500000000 ForkJoin计算耗时:143 ====== 单线程计算结果:499999999500000000 单线程计算耗时:410那么,问题又来了,哪些因素会影响Fork/Join的性能呢?
任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。