首页 > 学院 > 开发设计 > 正文

callable和CompletionService接口试用

2019-11-15 00:32:36
字体:
来源:转载
供稿:网友
callable和CompletionService接口试用

CompletionService接口定义为Interface CompletionService<V>接口定它在java7中只有一个实现ExecutorCompletionService,这个接口内部集成了一个BlockingQueue,因此可以实现对多线程运行结果的收集工作。为了更好的测试该接口,我使用了两个测试,第一个测试是自己定义一个外部BlockingQueue来接收callable返回的数据。第二个测试是用CompletionService对executor进行装饰,使得返回的CompletionService对象能直接submit任务。

但是我发现它submit的后并没有马上调用executor的submit,而是对它进行了封装,因此出现了一点点延迟。如果在submit之后使用shutdown()命令结束的话,实际上该task可能还没有 放到executor的taskpool中。所以这一点值得注意。

import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.Callable;import java.util.concurrent.CompletionService;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorCompletionService;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import java.util.concurrent.LinkedBlockingDeque;public class testCallable {    public static void main(String[] args) {        try {            futureCount();            completionServiceCount();        } catch (InterruptedException e) {            e.PRintStackTrace();        } catch (ExecutionException e) {            e.printStackTrace();        }    }    /**     * 使用自定义阻塞队列得到任务执行结果     *      * @throws InterruptedException     * @throws ExecutionException     */    public static void futureCount() throws InterruptedException,            ExecutionException {        BlockingQueue<Future<Integer>> queue = new LinkedBlockingDeque<Future<Integer>>();        ExecutorService executorService = Executors.newCachedThreadPool();        int threadNum = 5;        for (int i = 0; i < threadNum; i++) {            Future<Integer> future = executorService.submit(getTask());            queue.put(future);        }        int sum = 0;        int temp = 0;        while(!queue.isEmpty()){            temp = queue.take().get();            sum += temp;            System.out.print(temp + "/t");        }        System.out.println("BlockingQueue all is : " + sum);        executorService.shutdown();    }    /**     * 使用completionService收集callable结果     * @throws ExecutionException      * @throws InterruptedException      */    public static void completionServiceCount() throws InterruptedException, ExecutionException {        ExecutorService executorService = Executors.newCachedThreadPool();        CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(                executorService);        int threadNum = 5;        for (int i = 0; i < threadNum; i++) {            completionService.submit(getTask());        }        int sum = 0;        int temp = 0;        for(int i=0;i<threadNum;i++){            temp = completionService.take().get();            sum += temp;            System.out.print(temp + "/t");        }        System.out.println("CompletionService all is : " + sum);        executorService.shutdown();    }    public static Callable<Integer> getTask() {        final Random rand = new Random();        Callable<Integer> task = new Callable<Integer>() {            @Override            public Integer call() throws Exception {                int num = 0;                for (int i = 0; i < 10; i++) {                    num = num + rand.nextInt(10);                }                return num;            }        };        return task;    }}

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表