Fork/Join对任务的拆分和对结果合并过程也是如此,可以用下面伪代码来表示:
solve(problem): if problem is small enough: // 堆代码 duidaima.com // 如果任务足够小,执行任务 solve problem directly (sequential algorithm) else: // 拆分任务 for part in subdivide(problem) fork subtask to solve(part) // 合并结果 join all subtasks spawned in previous loop return combined results所以,理解Fork/Join模型和ForkJoinPool线程池,首先要理解其背后的算法的目的和思想,因为后文所要详述的ForkJoinPool不过只是这种算法的一种的实现和应用。
public class TheKingRecursiveSumTask extends RecursiveTask<Long> { private static final AtomicInteger taskCount = new AtomicInteger(); private final int sumBegin; private final int sumEnd; /** * 任务拆分阈值,当任务尺寸大于该值时,进行拆分 */ private final int threshold; public TheKingRecursiveSumTask(int sumBegin, int sumEnd, int threshold) { this.sumBegin = sumBegin; this.sumEnd = sumEnd; this.threshold = threshold; } @Override protected Long compute() { if ((sumEnd - sumBegin) > threshold) { // 两个数之间的差值大于阈值,拆分任务 TheKingRecursiveSumTask subTask1 = new TheKingRecursiveSumTask(sumBegin, (sumBegin + sumEnd) / 2, threshold); TheKingRecursiveSumTask subTask2 = new TheKingRecursiveSumTask((sumBegin + sumEnd) / 2, sumEnd, threshold); subTask1.fork(); subTask2.fork(); taskCount.incrementAndGet(); return subTask1.join() + subTask2.join(); } // 直接执行结果 long result = 0L; for (int i = sumBegin; i < sumEnd; i++) { result += i; } return result; } public static AtomicInteger getTaskCount() { return taskCount; } }在下面的代码中,我们设置的计算区间值0~10000000,当计算的个数超过100时,将对任务进行拆分,最大并发数设置为16.
public static void main(String[] args) { int sumBegin = 0, sumEnd = 10000000; computeByForkJoin(sumBegin, sumEnd); computeBySingleThread(sumBegin, sumEnd); } private static void computeByForkJoin(int sumBegin, int sumEnd) { ForkJoinPool forkJoinPool = new ForkJoinPool(16); long forkJoinStartTime = System.nanoTime(); TheKingRecursiveSumTask theKingRecursiveSumTask = new TheKingRecursiveSumTask(sumBegin, sumEnd, 100); long forkJoinResult = forkJoinPool.invoke(theKingRecursiveSumTask); System.out.println("======"); System.out.println("ForkJoin任务拆分:" + TheKingRecursiveSumTask.getTaskCount()); System.out.println("ForkJoin计算结果:" + forkJoinResult); System.out.println("ForkJoin计算耗时:" + (System.nanoTime() - forkJoinStartTime) / 1000000); } private static void computeBySingleThread(int sumBegin, int sumEnd) { long computeResult = 0 L; long startTime = System.nanoTime(); for (int i = sumBegin; i < sumEnd; i++) { computeResult += i; } System.out.println("======"); System.out.println("单线程计算结果:" + computeResult); System.out.println("单线程计算耗时:" + (System.nanoTime() - startTime) / 1000000); }运行结果如下:
====== ForkJoin任务拆分:131071 ForkJoin计算结果:49999995000000 ForkJoin计算耗时:207 ====== 单线程计算结果:49999995000000 单线程计算耗时:40 Process finished with exit code 0从计算结果中可以看到,ForkJoinPool总共进行了131071次的任务拆分,最终的计算结果是49999995000000,耗时207毫秒。 不过,细心的你可能已经发现了,ForkJoin的并行计算的耗时竟然比单程程还慢?并且足足慢了近5倍!先别慌,关于ForkJoin的性能问题,我们会在后文有讲解。
public ForkJoinPool() { this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), defaultForkJoinWorkerThreadFactory, null, false); }(2)方式二:通过并行数构造
public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); }(3)方式三:自定义全部参数构造
public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { this(checkParallelism(parallelism), checkFactory(factory), handler, asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-"); checkPermission(); }2. 按类型提交不同任务
|
从非fork/join线程调用 | 从fork/join调用 |
---|---|---|
提交异步执行 | execute(ForkJoinTask) | ForkJoinTask.fork() |
等待并获取结果 | invoke(ForkJoinTask) | ForkJoinTask.invoke() |
提交执行获取Future结果 | submit(ForkJoinTask) | ForkJoinTask.fork() (ForkJoinTasks are Futures) |
public <T> T invoke(ForkJoinTask<T> task) { if (task == null) throw new NullPointerException(); externalPush(task); return task.join(); }(2)第二类核心方法:execute
public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task); } public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask.RunnableExecuteAction(task); externalPush(job); }(3)第三类核心方法:submit
public < T > ForkJoinTask < T > submit(ForkJoinTask < T > task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } public < T > ForkJoinTask < T > submit(Callable < T > task) { ForkJoinTask < T > job = new ForkJoinTask.AdaptedCallable < T > (task); externalPush(job); return job; } public < T > ForkJoinTask < T > submit(Runnable task, T result) { ForkJoinTask < T > job = new ForkJoinTask.AdaptedRunnable < T > (task, result); externalPush(job); return job; } public ForkJoinTask < ? > submit(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask < ? > job; if (task instanceof ForkJoinTask < ? > ) // avoid re-wrap job = (ForkJoinTask < ? > ) task; else job = new ForkJoinTask.AdaptedRunnableAction(task); externalPush(job); return job; }3. ForkJoinTask
在类的设计上,ForkJoinTask继承了Future接口,所以也可以将其看作是轻量级的Future,它们之间的关系如下图所示。
public final ForkJoinTask<V> fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }join-获取任务执行结果
public final V join() { int s; // 如果调用doJoin返回的非NORMAL状态,将报告异常 if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); // 正常执行结束,返回原始结果 return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //如果已完成,返回状态 return (s = status) < 0 ? s : //如果未完成且当前线程是ForkJoinWorkerThread,则从该线程中取出workQueue,并尝试将当前task取出执行。如果执行的结果是完成,则返回状态;否则,使用当前线程池awaitJoin方法进行等待 ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread) t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0 L): //当前线程非ForkJoinWorkerThread,调用externalAwaitDone方法等待 externalAwaitDone(); } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } // 执行完成后,将状态设置为NORMAL if (completed) s = setCompletion(NORMAL); } return s; }(2)RecursiveAction与RecursiveTask
在ForkJoinPool中,常用的有两种任务类型:返回结果的和不返回结果的,这方面和ThreadPoolExecutor等线程池是一致的,对应的两个类分别是:RecursiveAction和RecursiveTask. 从类图中可以看到,它们均继承于ForkJoinTask.
static class SortTask extends RecursiveAction { final long[] array; final int lo, hi; SortTask(long[] array, int lo, int hi) { this.array = array; this.lo = lo; this.hi = hi; } SortTask(long[] array) { this(array, 0, array.length); } // 核心计算方法 protected void compute() { if (hi - lo < THRESHOLD) // 直接执行 sortSequentially(lo, hi); else { // 拆分任务 int mid = (lo + hi) >>> 1; invokeAll(new SortTask(array, lo, mid), new SortTask(array, mid, hi)); merge(lo, mid, hi); } } // implementation details follow: static final int THRESHOLD = 1000; void sortSequentially(int lo, int hi) { Arrays.sort(array, lo, hi); } void merge(int lo, int mid, int hi) { long[] buf = Arrays.copyOfRange(array, lo, mid); for (int i = 0, j = lo, k = mid; i < buf.length; j++) array[j] = (k == hi || buf[i] < array[k]) ? buf[i++] : array[k++]; } }RecursiveTask:返回结果
class Fibonacci extends RecursiveTask<Integer> { final int n; Fibonacci(int n) { this.n = n; } Integer compute() { if (n <= 1) return n; Fibonacci f1 = new Fibonacci(n - 1); f1.fork(); Fibonacci f2 = new Fibonacci(n - 2); return f2.compute() + f1.join(); } }(3)ForkJoinTask使用限制
public int getRunningThreadCount() { int rc = 0; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { if ((w = ws[i]) != null && w.isApparentlyUnblocked()) ++rc; } } return rc; }(2)获取活跃线程数量
public int getActiveThreadCount() { int r = (config & SMASK) + (int)(ctl >> AC_SHIFT); return (r <= 0) ? 0 : r; // suppress momentarily negative values }(3)判断ForkJoinPool是否空闲
public boolean isQuiescent() { return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0; }(4)获取任务窃取数量
public long getStealCount() { AtomicLong sc = stealCounter; long count = (sc == null) ? 0 L : sc.get(); WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { if ((w = ws[i]) != null) count += w.nsteals; } } return count; }(5)获取队列中的任务数量
public long getQueuedTaskCount() { long count = 0; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 1; i < ws.length; i += 2) { if ((w = ws[i]) != null) count += w.queueSize(); } } return count; }(6)获取已提交的任务数量
public int getQueuedSubmissionCount() { int count = 0; WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { for (int i = 0; i < ws.length; i += 2) { if ((w = ws[i]) != null) count += w.queueSize(); } } return count; }四、警惕ForkJoinPool#commonPool
====== ForkJoin任务拆分:16383 ForkJoin计算结果:499999999500000000 ForkJoin计算耗时:143 ====== 单线程计算结果:499999999500000000 单线程计算耗时:410那么,问题又来了,哪些因素会影响Fork/Join的性能呢?
任务窃取:通过任务窃取,充分地利用空闲线程,并减少竞争。