`

第六章 - 使用Fork/Join 框架

阅读更多

Java 7 引入了一个特殊的 executor 使用 Fork/Join 框架。Fork/Join 框架用来解决那些能使用分治算法解决的问题

 

Fork/Join 框架介绍

为了使用分治算法,你必须把一个问题分解成小问题。使用递归方法来重复相同的操作直到问题被细分为能直接被解决的足够小的问题。这些小问题可以用 executor 来解决,但是为了更有效地解决这些小问题,Java 7 引入了 Fork/Join 框架。

这个框架基于 ForkJoinPool 类,它是一个特殊的 executor,它包含有两个操作 fork() 和 join()  (以及它们的变种形式),以及一个名为工作窃取算法的内部算法。

 

Fork/Join 框架的基本特征

使用此框架时,你的主方法代码类似于:

if ( problem.size() > DEFAULT_SIZE) {
       divideTasks();
       executeTask();
       taskResults=joinTasksResult();
       return taskResults;
   } else {
       taskResults=solveBasicProblem();
       return taskResults;
}

 代码中最重要的部分是允许你有效地分解并执行子任务并从这些子任务中获得运行结果用来计算父任务的结果。这个功能由 ForkJoinTask 的以下两个方法提供:

  • fork() 方法:这个方法允许你发送一个子任务给 Fork/Join executor
  • join() 方法:该方法允许你等待一个子任务的完成并返回其运行结果

这两个方法还有其它不同的变形。Fork/Join 框架还有一个关键的特性:决定哪些任务被执行的工作窃取算法。当一个任务使用 join() 方法等待一个子任务完成时,执行此任务的线程会从任务池中取不同的任务运行。通过此方法,Fork/Join executor 的线程总是在执行任务,从而提高了应用的性能。

 

Java 8 提供了 Fork/Join 框架的一个新特性。现在每个 Java 应用程序都有一个默认的名为 ForkJoinPool 的通用池。你可以通过调用静态方法 ForkJoinPool.commonPool() 来获得。默认情况下这个默认的 Fork/Join executor 会被使用,executor 里线程数由运行计算机的可用处理器数来决定。你可以通过改变操作系统值 java.util.concurrent.ForkJoinPool.common.parallelism 来改变其默认行为。

 

一些 Java API 本身也使用 Fork/Join 框架来实现并行操作。例如用并发方式来对数组排序的 Arrays 类的 parallelSort() 方法,以及 Java 8 提供的 parallel streams。

 

Fork/Join 框架的局限性

Fork/Join 框架具有以下局限性:

  • 你不会再细分的基本问题不能太小也不能太大。根据 Java API 文档,它必须具有 100 到 10,000 个基本运算步骤
  • 你不能使用能阻塞的 I/O 操作,例如用户输入或从网络中获取数据。这些操作会造成CPU处理器空闲,从而降低并发处理速度
  • 你不能在任务里抛出checked exceptions。你必须在代码中获取这些 checked exceptions (例如把它们封装成 unchecked RuntimeException),Unchecked exceptions会被特殊处理。

Fork/Join 框架的组件

Fork/Join 框架有以下五个基础类:

  • ForkJoinPool类:该类实现了 Executor 和 ExecutorService 接口,它用来执行 Fork/Join 任务。Java 提供了一个默认的 ForkJoinPool 对象 (通用池),但你可以使用一些构造函数来创建你自己的对象。你必须制定并发级别 (最大的运行并发线程数)。默认情况下,它使用可用的处理器数作为并发级别。
  • ForkJoinTask类:该类是所有的 Fork/Join 任务的基础抽象类。它提供了 fork() 和 join() 方法以及一些其他的变种。同时它也实现了 Future 接口并提供了一些方法来判断一个任务是否正常结束,被取消或是否抛出了 unchecked 异常。RecursiveTask, RecursiveAction 以及 CountedCompleter 类提供了 compute() 这个抽象犯法,你必须在子类中实现它来做一些实际的计算操作。
  • RecursiveTask类:该类继承了 ForkJoinTask 类。它也是一个抽象类,你可以使用它如果你要实现有返回值的 Fork/Join 任务。
  • RecursiveAction类:该类继承了 ForkJoinTask类。它也是一个抽象类,你可以使用它如果你要实现没有返回值的 Fork/Join 任务。
  • CountedCompleter类:该类继承了 ForkJoinTask 类。它是 Java 8 API 的一个新特性。如果任务结束后需要触发其它任务,你可以使用它。

 

public class ForkJoinDemo {
    public static void main(String[] args) {
        int[] ints = IntStream.range(1, 5).toArray();

        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        SumTask sumTask = new SumTask(ints, 0, ints.length, 1);
        long startTime = System.currentTimeMillis();
        int sum = forkJoinPool.invoke(sumTask);
        long endTime = System.currentTimeMillis();
        System.out.println("Fork/join sum: " + sum + " in " + (endTime - startTime) + " ms.");
        forkJoinPool.shutdown();
        try {
            forkJoinPool.awaitTermination(20, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class SumTask extends RecursiveTask<Integer> {

    private int[] dataList;
    private int from;
    private int end;
    private int divideFactor;

    public SumTask(int[] dataList, int from, int end, int divideFactor) {
        this.dataList = dataList;
        this.from = from;
        this.end = end;
        this.divideFactor = divideFactor;
    }

    @Override
    protected Integer compute() {

        if ((end - from) <= divideFactor) {
            int sum = 0;
            for (int i = from; i < end; i++) {
                sum += dataList[i];
            }

            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            System.out.println(String.format("%s : compute %d~%d = %d", Thread.currentThread().getName(), from, end, sum));

            return sum;
        } else {
            int middle = (end + from) / 2;
            System.out.println(String.format("%s : split %d~%d ==> %d~%d, %d~%d", Thread.currentThread().getName(), from, end, from, middle, middle, end));
            SumTask task1 = new SumTask(dataList, from, middle, divideFactor);
            SumTask task2 = new SumTask(dataList, middle, end, divideFactor);

            // 这里我们使用 invokeAll() 方法,如果使用 task1.fork() 和 task2.fork() 方法
            // 会导致当前线程变成监工,不参与子任务的执行,从而降低性能
            invokeAll(task1, task2);

            int sum1 = task1.join();
            int sum2 = task2.join();

            int sum = sum1 + sum2;
            System.out.println(Thread.currentThread().getName() + " : " + "result = " + sum1 + " + " + sum2 + " ==> " + sum);

            return sum1 + sum2;
        }
    }
}

 

Fork/Join 框架

ForkJoinPool 类提供了 execute(),invoke(),submit() 方法来向线程池中提交任务。它们之间的区别是:

  • execute():发送任务给 ForkJoinPool 并立刻返回一个 void 值 (即提交无返回值的任务)
  • invoke():发送任务给 ForkJoinPool,当任务运行结束才返回
  • submit():发送任务给 ForkJoinPool,立刻返回一个 Future 对象用来控制任务的状态并获取结果

我们知道 ForkJoinPool 处理基于 ForkJoinTask 类的任务,但是也能执行基于 Runnable 和 Callable 接口的任务。你可以使用 submit() 方法,该方法可以接收一个 Runnable 对象,一个有返回值的 Runnable 对象,以及一个 Callable 对象。

 

ForkJoinTask:

  • get (long timeout, TimeUnit unit) 来获取任务返回的结果。这个方法在指定时间内等待任务执行完并返回结果。如果指定时间之内任务无法完成,则抛出 TimeoutException 异常。
  • invoke() 方法从语法上类似于 fork(); join() 但它总是尝试在当前线程执行。
  • 名字以 “quiet" 开始的方法没有返回运行结果也不报告异常。这些方法当有多个任务在执行时,结果或异常在运行结束后才被处理是很有用。
  • 一对 fork-join 就像并行递归方法里的 call (fork) 和 return (join)。因此 join 必须从最里面开始,例如, a.fork();b.form();b.join();a.join() 比 a 在 b 之前 join 更有效率。
  • 大小: 127.4 KB
分享到:
评论

相关推荐

    并发编程笔记20190526.docx

    6、Fork/Join框架的实现原理 26 二、闭锁CountDownLatch 28 1、应用场景 28 2、CyclicBarrier 28 3、Semaphore 29 4、Callable、Future和FutureTask 30 5、原子操作CAS (compare atomic swap) 32 三、显式锁和AQS 34...

    精通lambda表达式Java多核编程 中文完整pdf扫描版[66MB]

    ● 分割迭代器、fork/join框架与异常 ● 使用微基准测试检查流的性能 ● 使用默认方法演化API 目录 第1章 走进新生代的Java 1 第2章 Java lambda表达式的基础知识 23 第3章 流与管道介绍 55 第4章 终止流:收集...

    Java 7并发编程实战手册

    全书分为9章,涵盖了线程管理、线程同步、线程执行器、Fork/Join框架、并发集合、定制并发类、测试并发应用等内容。全书通过60多个简单而非常有效的实例,帮助读者快速掌握Java 7多线程应用程序的开发技术。学习完...

    java7源码-LearnJava7ConcurrencyCook:Java7ConcurrencyCook这本书源代码的学习和注解

    java 7 源码 LearnJava7ConcurrencyCook Java7ConcurrencyCook 这本书源代码的学习和注解 第一章 线程的管理 第二章 线程同步基础 synchronized关键字 ...第六章 并发容器 第七章 定制并发类 第八章 测试并发应用

    龙果 java并发编程原理实战

    第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器CopyOnWriteArrayList原理与使用00:15:52分钟 | 第48节并发容器ConcurrentLinkedQueue原理与使用00:31:03分钟 | ...

    Java 并发编程原理与实战视频

    第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器CopyOnWriteArrayList原理与使用00:15:52分钟 | 第48节并发容器ConcurrentLinkedQueue原理与使用00:31:03分钟 | ...

    龙果java并发编程完整视频

    第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器CopyOnWriteArrayList原理与使用00:15:52分钟 | 第48节并发容器ConcurrentLinkedQueue原理与使用00:31:03分钟 | ...

    java并发编程

    第45节Fork/Join框架详解00:28:09分钟 | 第46节同步容器与并发容器00:18:44分钟 | 第47节并发容器CopyOnWriteArrayList原理与使用00:15:52分钟 | 第48节并发容器ConcurrentLinkedQueue原理与使用00:31:03分钟 | ...

    Java学习指南第四版下册

    《Java学习指南(第4版)(上、下册)》加入了从Java 6和Java 7发布以后的变化,包括新的语言功能、并发工具(Fork-Join框架)、新的NIO Files API、Java Servlet(3.0)等新主题,作者通过精心挑选的、富有实用性和趣味性...

    linux网络编程-宋敬彬-part1

    第6章 应用层网络服务程序简介 167 6.1 HTTP协议和服务 167 6.1.1 HTTP协议概述 167 6.1.2 HTTP协议的基本过程 168 6.2 FTP协议和服务 170 6.2.1 FTP协议概述 170 6.2.2 FTP协议的工作模式 172 6.2.3 ...

    JBPM4工作流应用开始指南.rar

    71 5.7 任务服务API 72 5.8 历史服务API 75 5.9 管理服务API 76 5.10 查询服务API 77 5.11 例程:利用jBPM Service API完成流程实例 78 5.12 小结 80 第6章 掌握jBPM流程定义语言 81 6.1 process(流程) 82 6.2 ...

    汪文君高并发编程实战视频资源全集

     高并发编程第三阶段34讲 ForkJoin框架之RecursiveAction_.mp4  高并发编程第三阶段35讲 Phaser工具的实战案例使用第一部分_.mp4  高并发编程第三阶段36讲 Phaser工具的实战案例使用第二部分_.mp4  高并发编程第...

    汪文君高并发编程实战视频资源下载.txt

     高并发编程第三阶段34讲 ForkJoin框架之RecursiveAction_.mp4  高并发编程第三阶段35讲 Phaser工具的实战案例使用第一部分_.mp4  高并发编程第三阶段36讲 Phaser工具的实战案例使用第二部分_.mp4  高并发编程第...

    linux网路编程 中文 23M 版

    第1 章Linux操作系统概述................... .......................................................................... 2 1.1 Linux发展历史........................................................ 2 ...

Global site tag (gtag.js) - Google Analytics