首页 > 学院 > 开发设计 > 正文

基于队列多任务处理

2019-11-06 06:23:34
字体:
来源:转载
供稿:网友

概述

最近学习了线程池和队列相关知识,基于队列和线程池实现的多任务、多处理器的任务处理器。

具体实现

任务队列的接口

public interface TaskQueue { /** * 设置任务队列大小,默认2000,最大10000 * @param size */ void setQueueSize(int size); /** * 设置核心处理线程 * @param coreThreadSize */ void setCoreThreadSize(int coreThreadSize); /** * 提交任务 * @param task */ boolean submitTask(Task task); /** * 设置任务处理器 * @param taskPRocessors */ void setTaskProcessors(List<TaskProcessor> taskProcessors);}

任务Task

*/public class Task<T> { private T data; private String key; private long submitTime; private long processTime; public T getData() { return data; } public void setData(T data) { this.data = data; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public long getSubmitTime() { return submitTime; } public void setSubmitTime(long submitTime) { this.submitTime = submitTime; } public long getProcessTime() { return processTime; } public void setProcessTime(long processTime) { this.processTime = processTime; }}

TaskProcessor 任务处理器

public interface TaskProcessor<T> { /** * 处理任务 * @param task */ void process(Task<T> task);}

任务队列实现类(bean 对象生成)

public class LinkedBlockTaskQueue implements TaskQueue, InitializingBean, DisposableBean { private static final Logger logger = LoggerFactory.getLogger(LinkedBlockTaskQueue.class); /** * 最大的队列大小 */ private final int maxQueueSize = 10000; private final int maxCoreThreadSize = 20; /** * 默认队列大小 */ private int queueSize = 2000; /** * 核心线程数 */ private int coreThreadSize = 10; private LinkedBlockingQueue<Task<?>> queue = null; private ExecutorService executorService = null; private List<TaskProcessor> taskProcessors; private volatile boolean breakLoop = false; @Override public void setQueueSize(int size) { size = size <= 0 ? queueSize : size; this.queueSize = size > this.maxQueueSize ? this.maxQueueSize : size; } @Override public void setCoreThreadSize(int coreThreadSize) { coreThreadSize = coreThreadSize <= 0 ? this.coreThreadSize : coreThreadSize; coreThreadSize = coreThreadSize > this.maxCoreThreadSize ? this.maxCoreThreadSize : coreThreadSize; this.coreThreadSize = coreThreadSize; } @Override public boolean submitTask(Task task) { return queue.offer(task); } @Override public void setTaskProcessors(List<TaskProcessor> taskProcessors) { this.taskProcessors = taskProcessors; } @Override public void destroy() throws Exception { this.breakLoop = true; } @Override public void afterPropertiesSet() throws Exception { this.queue = new LinkedBlockingQueue<>(this.queueSize); this.executorService = Executors.newFixedThreadPool(this.coreThreadSize); start(); } private void start() { if (taskProcessors == null || taskProcessors.isEmpty()) { logger.error("taskProcessors is null or empty"); return; } for (int i = 0; i < coreThreadSize; i++) { executorService.submit(new MultiConsumer()); } } private class MultiConsumer implements Runnable { @Override public void run() { while (true) { Task task = null; try { task = queue.take(); } catch (InterruptedException e) { continue; } for (TaskProcessor processor : taskProcessors) { try { processor.process(task); } catch (Exception e) { logger.error("process task exception", e); } } if (breakLoop) { break; } } } }}

测试

简单写了测试类(这里为了简便,没有使用spring作为容器),直接通过new 实例化任务队列的。

public class StringTaskProcessor implements TaskProcessor<String> { @Override public void process(Task<String> task) { System.out.println("thread Id:"+Thread.currentThread().getId()+",data:"+task.getData().toString()); }}public class Test { public static void main(String[] args) throws Exception { LinkedBlockTaskQueue linkedBlockTaskQueue=new LinkedBlockTaskQueue(); List<TaskProcessor> taskProcessors=new ArrayList<>(); taskProcessors.add(new StringTaskProcessor()); linkedBlockTaskQueue.setTaskProcessors(taskProcessors); linkedBlockTaskQueue.afterPropertiesSet(); for (int i=0;i<100;i++) { Task<String> task=new Task<>(); task.setData(i+""); linkedBlockTaskQueue.submitTask(task); } }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表