`

第四章 - Callable 和 Future 接口

 
阅读更多

在executor里,你可以运行以下两种任务:

  • 基于Runnable接口的任务:该任务实现run()方法,但该方法不具返回值
  • 基于Callable接口:该任务实现了call()接口并返回一个对象作为结果。call()接口返回的对象类型由Callable接口里的泛型参数决定。executor会返回一个Future接口实现类的对象。

Callable接口

Callable接口具有以下特性:

  • 它有一个类型参数来决定call()方法返回对象的类型
  • 声明了call()方法。当任务被executor执行时该方法会被调用。它必须具有返回值
  • call()方法能够抛出任何的checked异常。你可以通过实现自定义的executor并重载 afterExecute() 方法来处理这个异常

Future接口

当你发送一个Callable任务给executor时,executor会返回一个Future接口实现类的对象,该对象允许你控制任务的执行,任务的状态以及获取任务返回的结果。该接口的主要特性有:

  • 可以调用cancel()方法来取消任务。这个方法接收一个boolean参数来指定你是否中断运行中的任务。
  • 你可以检验一个任务是否已经被取消了(使用isCancelled()方法)或是否已经完成(使用isDone()方法)
  • 你可以通过get()方法来获取任务的返回值。该方法有两种形式:1. 无参数形式,当任务结束时返回运行结果。如果任务没有结束,运行任务的线程被暂停直至任务结束。2. 有参数形式,传入的参数为timeout时间。如果timeout时间之内任务不能完成,则抛出TimeoutException异常。

示范例子 - 最佳单词匹配算法

该算法的目的是为了找出和传入的单词最接近的单词。在这个例子中,我们实现了两个方法:

  • 第一个方法使用Levenshtein距离来找出所有最接近输入的字符串的单词
  • 第二个方法使用Levenshtein距离来确认一个字符串是否存在于我们的单词集里(当然使用equals()方法更快,但是我们这个例子只是为了演示多线程)

我们将会实现一个串行和两个并行两个版本来验证并行能否帮助我们提高效率。以下代码分为串行和并行版本的公共类,以及它们各自特有的类。

 

公共类

我们将需要以下三个基础类:

  • WordsLoader类:读取单词集到到内存中 (我们忽略该类的代码,它只是单纯的从文本文档中读取字符串)
  • LevenshteinDistance类:计算两个字符串之间的Levenshtein距离
  • BestMatchingData类:保存单词匹配结果。它保存了一组相似单词以及它们和输入字符串之间的距离

 

/**
 * 该类的calculate()方法接收两个字符串参数,计算并返回这两个字符串之间的Levenshtein距离
 */
public class LevenshteinDistance {
    
    public static int calculate (String string1, String string2) {
        int[][] distances=new int[string1.length()+1][string2.length()+1];
        for (int i=1; i<=string1.length();i++) {
            distances[i][0]=i;
        }
        for (int j=1; j<=string2.length(); j++) {
            distances[0][j]=j;
        }
        for(int i=1; i<=string1.length(); i++) {
            for (int j=1; j<=string2.length(); j++) {
                if (string1.charAt(i-1)==string2.charAt(j-1)) {
                    distances[i][j]=distances[i-1][j-1];
                } else {
                    distances[i][j]=minimum(distances[i-1][j],
                            distances[i][j-1],distances[i-1][j-1])+1;
                }
            } }
        return distances[string1.length()][string2.length()];
    }

    private static int minimum(int i, int j, int k) {
        return Math.min(i,Math.min(j, k));
    }
}
 

 

 

public class BestMatchingData {
    private List<String> words;
    private int distance;

    public void setWords(List<String> words) {
        this.words = words;
    }

    public void setDistance(int distance) {
        this.distance = distance;
    }

    public List<String> getWords() {
        return words;
    }

    public int getDistance() {
        return distance;
    }
}
 

 

 

串行版本

以下两个类实现串行版本:

  • BestMatchingSerialCalculation类,计算得出和输入字符串最相似的一组单词。
  • BestMatchingSerialMain,main()方法运行算法,记录算法运行时间以及打印出结果

 

public class BestMatchingSerialCalculation {
    public static BestMatchingData getBestMatchingWords(String
                         word, List<String> dictionary) {
        List<String> results=new ArrayList<>();
        int minDistance=Integer.MAX_VALUE;
        int distance;
        for (String str: dictionary) {
            distance=LevenshteinDistance.calculate(word,str);
            if (distance<minDistance) {
                results.clear();
                minDistance=distance;
                results.add(str);
            } else if (distance==minDistance) {
                results.add(str);
            } }
        BestMatchingData result=new BestMatchingData();
        result.setWords(results);
        result.setDistance(minDistance);
        return result;
    }
}

public class BestMatchingSerialMain {
    public static void main(String[] args) {
        Date startTime, endTime;
        List<String> dictionary=WordsLoader.load("data/UK Advanced" +
                "Cryptics Dictionary.txt");
                System.out.println("Dictionary Size: "+dictionary.size());
        startTime=new Date();
        BestMatchingData result=
                BestMatchingSerialCalculation.getBestMatchingWords
                        (args[0], dictionary);
        List<String> results=result.getWords();
        endTime=new Date();
        System.out.println("Word: "+args[0]);
        System.out.println("Minimum distance: "
                +result.getDistance());
        System.out.println("List of best matching words: "
                +results.size());
        results.forEach(System.out::println);
        System.out.println("Execution Time: "+(endTime.getTime()-
                startTime.getTime()));
    }
}
 

 

第一个并行版本

这个版本是基于Callable接口和AbstractExecutorService接口定义的submit()方法,我们用以下三个类来实现:

  • BestMatchingBasicTask类:该类实现Callable接口
  • BestMatchingBasicConcurrentCalculation类:该类创建executor和所需的任务并发送给executor
  • BestMatchingConcurrentMain:主文件,运行算法并打印结果

该版本中,我们把几个任务返回的Future对象保存在List中,然后待任务全部结束后我们一次从各个Future对象获取返回的数据并寻找最接近的单词。

 

/**
 * 此类负责处理单词集的一部分单词
 * @param startIndex 任务所负责处理的单词集的开始位置(包含此位置的单词)
 * @param endIndex 任务所负责处理的单词集的结束位置(不包含次位置的单词)
 * @param dictionary 需要处理的单词集
 * @param word 输入的单词样本
 */
public class BestMatchingBasicTask implements Callable
        <BestMatchingData > {
    private int startIndex;
    private int endIndex;
    private List < String > dictionary;
    private String word;
    public BestMatchingBasicTask(int startIndex, int endIndex,
                                 List < String > dictionary, String word) {
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.dictionary = dictionary;
        this.word = word;
    }

    @Override
    public BestMatchingData call() throws Exception {
        List<String> results=new ArrayList<>();
        int minDistance=Integer.MAX_VALUE;
        int distance;
        for (int i=startIndex; i<endIndex; i++) {
            distance = LevenshteinDistance.calculate
                    (word,dictionary.get(i));
            if (distance < minDistance) {
                results.clear();
                minDistance=distance;
                results.add(dictionary.get(i));
            } else if (distance==minDistance) {
                results.add(dictionary.get(i));
            } 
        }
        BestMatchingData result=new BestMatchingData();
        result.setWords(results);
        result.setDistance(minDistance);
        return result;
    }
}
 

 

 

public class BestMatchingBasicConcurrentCalculation {
    public static BestMatchingData getBestMatchingWords(
            String word, List<String> dictionary) 
            throws InterruptedException,ExecutionException {
        int numCores = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = (ThreadPoolExecutor)
                Executors.newFixedThreadPool(numCores);
        int size = dictionary.size();
        int step = size / numCores;
        int startIndex, endIndex;
        List<Future<BestMatchingData>> results = new
                ArrayList<>();
        for (int i = 0; i < numCores; i++) {
            startIndex = i * step;
            if (i == numCores - 1) {
                endIndex = dictionary.size();
            } else {
                endIndex = (i + 1) * step;
            }
            BestMatchingBasicTask task = new
                    BestMatchingBasicTask(startIndex, endIndex,
                    dictionary, word);
            Future<BestMatchingData> future = executor.submit(task);
            results.add(future);
        }

        executor.shutdown();
        List<String> words=new ArrayList<>();
        int minDistance=Integer.MAX_VALUE;
        for (Future<BestMatchingData> future: results) {
            BestMatchingData data=future.get();
            if (data.getDistance() < minDistance) {
                words.clear();
                minDistance=data.getDistance();
                words.addAll(data.getWords());
            } else if (data.getDistance() == minDistance) {
                words.addAll(data.getWords());
            } 
        }

        BestMatchingData result=new BestMatchingData();
        result.setDistance(minDistance);
        result.setWords(words);
        return result;
    }
}
 

 

 

public class BestMatchingConcurrentMain {
    public static void main(String[] args) {
        Date startTime, endTime;
        List<String> dictionary=WordsLoader.load("data/UK Advanced" +
                "Cryptics Dictionary.txt");
        System.out.println("Dictionary Size: "+dictionary.size());
        startTime=new Date();
        try {
            BestMatchingData result =
                    BestMatchingBasicConcurrentCalculation.getBestMatchingWords
                            (args[0], dictionary);
            List<String> results = result.getWords();
            endTime = new Date();
            System.out.println("Word: " + args[0]);
            System.out.println("Minimum distance: "
                    + result.getDistance());
            System.out.println("List of best matching words: "
                    + results.size());
            results.forEach(System.out::println);
            System.out.println("Execution Time: " + (endTime.getTime() -
                    startTime.getTime()));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}
 

 

第二个并行版本

在这个版本中,我们使用AbstractExecutorService (在ThreadPoolExecutorClass类中实现) 的invokeAll() 方法。第一个版本的并行算法中,我们只用了submit()方法,接收一个Callable对象并返回Future对象。invokeAll() 方法接收一组Callable对象作为参数,并返回一组Future对象。队列中的第一个callable对象对应返回队列中的第一个Future对象,以此类推。submit()和invokeAll()方法另一个重要差异是:submit()方法马上返回,而invokeAll()方法则在所有的Callable任务完成后返回。这意味着调用invokeAll()方法后,如果调用返回回来Future对象的isDone()方法,将返回true。

 

/**
 * 此类负责处理单词集的一部分单词
 * @param startIndex 任务所负责处理的单词集的开始位置(包含此位置的单词)
 * @param endIndex 任务所负责处理的单词集的结束位置(不包含次位置的单词)
 * @param dictionary 需要处理的单词集
 * @param word 输入的单词样本
 */
public class BestMatchingBasicTask implements Callable
        <BestMatchingData > {
    private int startIndex;
    private int endIndex;
    private List < String > dictionary;
    private String word;
    public BestMatchingBasicTask(int startIndex, int endIndex,
                                 List < String > dictionary, String word) {
        this.startIndex = startIndex;
        this.endIndex = endIndex;
        this.dictionary = dictionary;
        this.word = word;
    }

    @Override
    public BestMatchingData call() throws Exception {
        List<String> results=new ArrayList<>();
        int minDistance=Integer.MAX_VALUE;
        int distance;
        for (int i=startIndex; i<endIndex; i++) {
            distance = LevenshteinDistance.calculate
                    (word,dictionary.get(i));
            if (distance < minDistance) {
                results.clear();
                minDistance=distance;
                results.add(dictionary.get(i));
            } else if (distance==minDistance) {
                results.add(dictionary.get(i));
            }
        }
        BestMatchingData result=new BestMatchingData();
        result.setWords(results);
        result.setDistance(minDistance);
        return result;
    }
}

public class BestMatchingAdvancedConcurrentCalculation {
    public static BestMatchingData getBestMatchingWords(
            String word, List<String> dictionary)
            throws InterruptedException, ExecutionException {
        int numCores = Runtime.getRuntime().availableProcessors();
        ThreadPoolExecutor executor = (ThreadPoolExecutor)
                Executors.newFixedThreadPool(numCores);
        int size = dictionary.size();
        int step = size / numCores;
        int startIndex, endIndex;
        List<BestMatchingBasicTask> tasks = new
                ArrayList<>();
        for (int i = 0; i < numCores; i++) {
            startIndex = i * step;
            if (i == numCores - 1) {
                endIndex = dictionary.size();
            } else {
                endIndex = (i + 1) * step;
            }
            BestMatchingBasicTask task = new
                    BestMatchingBasicTask(startIndex, endIndex,
                    dictionary, word);
            tasks.add(task);
        }

        // 这里我们使用invokeAll()方法传入一组任务而不是submit()方法传入单个任务
        List<Future<BestMatchingData>> results = executor.invokeAll(tasks);
        executor.shutdown();
        List<String> words = new ArrayList<>();
        int minDistance = Integer.MAX_VALUE;
        for (Future<BestMatchingData> future : results) {
            BestMatchingData data = future.get();
            if (data.getDistance() < minDistance) {
                words.clear();
                minDistance = data.getDistance();
                words.addAll(data.getWords());
            } else if (data.getDistance()== minDistance) {
                words.addAll(data.getWords());
            }
        }
        BestMatchingData result = new BestMatchingData();
        result.setDistance(minDistance);
        result.setWords(words);
        return result;
    }
}

public class BestMatchingConcurrentAdvancedMain {
    public static void main(String[] args) {
        Date startTime, endTime;
        List<String> dictionary=WordsLoader.load("data/UK Advanced" +
                "Cryptics Dictionary.txt");
        System.out.println("Dictionary Size: "+dictionary.size());
        startTime=new Date();
        try {
            BestMatchingData result =
                    BestMatchingAdvancedConcurrentCalculation.getBestMatchingWords
                            (args[0], dictionary);
            List<String> results = result.getWords();
            endTime = new Date();
            System.out.println("Word: " + args[0]);
            System.out.println("Minimum distance: "
                    + result.getDistance());
            System.out.println("List of best matching words: "
                    + results.size());
            results.forEach(System.out::println);
            System.out.println("Execution Time: " + (endTime.getTime() -
                    startTime.getTime()));
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

 

一些重要的方法

本章我们使用了来自AbstractExecutorService接口 (实现于ThreadPoolExecutor类)以及CompletionService接口 (实现于ExecutorCompletionService类) 的一些方法来管理 Callable 任务的返回值。同时它们还提供了以下有用的方法:

  • invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):这个方法在所有任务执行完成后或者指定的timeout时间结束后返回一组和 Callable 任务相关联的 Future对象 (如果timeout时间到了但有些任务未被完成,那么这些任务将被取消)
  • invokeAny (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):这个任务返回在指定的timeout时间内第一个顺利完成的Callable任务。如果没有一个任务在timeout时间前完成,那么将抛出TimeoutException异常。

关于CompletionService接口,提供了如下重要方法:

  • poll() 方法:这个方法返回并删除自从上一次调用poll()或take()方法之后的下一个完成的任务返回的Future对象。如果此时没有任务完成,那么则返回 null 值
  • take() 方法:这个方法和 poll() 方法类似,但是如果没有任务完成,那么执行任务的线程将进入休眠知道有任务完成
分享到:
评论

相关推荐

    Java2核心技术.part5

    1.8 Callable和Future 1.9 执行器 1.10 同步器 1.11 线程和Swing工作器 第2章 集合 2.1 集合接口 2.2 具体的集合 2.3 集合框架 2.4 算法 2.5 遗留下来的集合 第3章 网络 3.1 连接到服务器 3.2 实现...

    Java2核心技术.part4

    1.8 Callable和Future 1.9 执行器 1.10 同步器 1.11 线程和Swing工作器 第2章 集合 2.1 集合接口 2.2 具体的集合 2.3 集合框架 2.4 算法 2.5 遗留下来的集合 第3章 网络 3.1 连接到服务器 3.2 实现...

    Java2核心技术.part3

    1.8 Callable和Future 1.9 执行器 1.10 同步器 1.11 线程和Swing工作器 第2章 集合 2.1 集合接口 2.2 具体的集合 2.3 集合框架 2.4 算法 2.5 遗留下来的集合 第3章 网络 3.1 连接到服务器 3.2 实现...

    Java2核心技术.part1

    1.8 Callable和Future 1.9 执行器 1.10 同步器 1.11 线程和Swing工作器 第2章 集合 2.1 集合接口 2.2 具体的集合 2.3 集合框架 2.4 算法 2.5 遗留下来的集合 第3章 网络 3.1 连接到服务器 3.2 实现服务器 3.3 发送E-...

    Java2核心技术.part6

    1.8 Callable和Future 1.9 执行器 1.10 同步器 1.11 线程和Swing工作器 第2章 集合 2.1 集合接口 2.2 具体的集合 2.3 集合框架 2.4 算法 2.5 遗留下来的集合 第3章 网络 3.1 连接到服务器 3.2 实现...

    Java2核心技术.part2

    1.8 Callable和Future 1.9 执行器 1.10 同步器 1.11 线程和Swing工作器 第2章 集合 2.1 集合接口 2.2 具体的集合 2.3 集合框架 2.4 算法 2.5 遗留下来的集合 第3章 网络 3.1 连接到服务器 3.2 实现...

    一篇文章弄懂Java多线程基础和Java内存模型

    通过Callable和Future接口创建线程三、Java内存模型概念四、内存间的交互操作五、volatile和synchronized的区别 写在前面:提起多线程大部门同学可能都会皱起眉头不知道多线程到底是什么、什么时候可以用到、用的...

    Java SE实践教程 源代码 下载

    第4章 数据传送的管道——JAVA I/O 71 4.1 讲解 72 4.1.1 流——Java I/O的基础 72 4.1.2 Java I/O库 72 4.2 练习 74 4.2.1 数据传送的通道 74 4.2.2 管道的一端 76 4.2.3 文件处理 78 4.2.4 基于对象的读写...

    Java SE实践教程 pdf格式电子书 下载(四) 更新

    第4章 数据传送的管道——JAVA I/O 71 4.1 讲解 72 4.1.1 流——Java I/O的基础 72 4.1.2 Java I/O库 72 4.2 练习 74 4.2.1 数据传送的通道 74 4.2.2 管道的一端 76 4.2.3 文件处理 78 4.2.4 基于对象的读写...

    疯狂JAVA讲义

    第4章 流程控制和数组 71 4.1 顺序结构 72 4.2 分支结构 72 4.2.1 if条件语句 72 4.2.2 switch分支语句 76 4.3 循环结构 78 4.3.1 while循环语句 78 4.3.2 do while循环语句 79 4.3.3 for循环 80 4.3.4 ...

    Java SE实践教程 pdf格式电子书 下载(一) 更新

    第4章 数据传送的管道——JAVA I/O 71 4.1 讲解 72 4.1.1 流——Java I/O的基础 72 4.1.2 Java I/O库 72 4.2 练习 74 4.2.1 数据传送的通道 74 4.2.2 管道的一端 76 4.2.3 文件处理 78 4.2.4 基于对象的读写...

    javaSE代码实例

    第4章 流程控制——Java世界的航行舵手 42 4.1 if条件语句 42 4.1.1 简略形式 42 4.1.2 完全形式 43 4.1.3 语句的嵌套 43 4.2 switch多分支语句 45 4.2.1 基本语法 45 4.2.2 合法的判断表达式 46 ...

    Python核心编程(第二版).pdf (压缩包分2部分,第二部分)

     第4章 python对象   4.1 python 对象   4.2 标准类型   4.3 其他内建类型   4.3.1 类型对象和type类型对象   4.3.2 none--python的null对象   4.4 内部类型   4.4.1 代码对象   4.4.2 帧...

    Python核心编程第二版(ok)

     第4章 Python对象   4.1 Pythonc对象   4.2 标准类型   4.3 其他内建类型   4.3.1 类型对象和type类型对象   4.3.2 None--Python的Null对象   4.4 内部类型   4.4.1 代码对象   4.4.2 帧...

    Python核心编程第二版

     第4章 Python对象   4.1 Python 对象   4.2 标准类型   4.3 其他内建类型   4.3.1 类型对象和type类型对象   4.3.2 None--Python的Null对象   4.4 内部类型   4.4.1 代码对象   4.4.2 帧...

    Python核心编程(第二版).pdf (压缩包分2部分,第一部分)

     第4章 python对象   4.1 python 对象   4.2 标准类型   4.3 其他内建类型   4.3.1 类型对象和type类型对象   4.3.2 none--python的null对象   4.4 内部类型   4.4.1 代码对象   4.4.2 帧...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段06讲、用Runnable接口将线程的逻辑执行单元从控制中抽取出来.mp4 │ 高并发编程第一阶段07讲、策略模式在Thread和Runnable中的应用分析.mp4 │ 高并发编程第一阶段08讲、构造Thread对象你...

Global site tag (gtag.js) - Google Analytics