首页 > 学院 > 开发设计 > 正文

生产者消费者模式之工作窃取算法

2019-11-08 19:42:26
字体:
来源:转载
供稿:网友

生产者消费者模式之工作窃取算法 1、一个通道只有一个队列,多个消费者共享一个队列实例,导致锁的竞争,如果一个通道拥有对个队列,则消费者可以从通道中获取各自队列获取数据。 2、如要服务有高性能和可靠性的要求,Consumer-PRoducer模式请使用 kafka等开源工具

public interface WorkStealingEnableChannel<P> extends Chanel<P> { P take(BlockingDeque<P> preferredQueue) throws InterruptedException;}public class WorkStealingChannel<P> implements WorkStealingEnableChannel<P> { //双端队列,可以从2端插入值或获取值,继承了BlockingQueue private final BlockingDeque<P>[] managedQueues; public WorkStealingChannel(BlockingDeque<P>[] managedQueues) { super(); this.managedQueues = managedQueues; } @Override public P take() throws InterruptedException { return take(null); } @Override public void put(P product) throws InterruptedException { int targetIndex = (product.hashCode() % managedQueues.length); BlockingQueue<P> targetQueue = managedQueues[targetIndex]; targetQueue.put(product); } @Override public P take(BlockingDeque<P> preferredQueue) throws InterruptedException { BlockingDeque<P> targetQueue = preferredQueue; P product = null; //优先从指定的队列获取值 if(null != targetQueue){ product = targetQueue.poll(); } int queueIndex = -1; while(null != product){ queueIndex = (queueIndex +1) % managedQueues.length; targetQueue = managedQueues[queueIndex]; //试图从其他受管队列的队尾“窃取”“产品” product = targetQueue.pollLast(); if(preferredQueue == targetQueue){ break; } } if(null == product){ //随机窃取 其他受管队列的产品 queueIndex = (int) (System.currentTimeMillis() % managedQueues.length); targetQueue = managedQueues[queueIndex]; product = targetQueue.pollLast(); System.out.println("stealed from " + queueIndex + ": " + product); } return product; }}public class WorkStealingExample { private final WorkStealingEnableChannel<String> channel; private final TerminationToken token = new TerminationToken(); public static void main(String[] args) throws InterruptedException { WorkStealingExample wse = new WorkStealingExample(); //Thread.sleep(3500); } public WorkStealingExample(){ int nCPU = Runtime.getRuntime().availableProcessors(); int consumerCount = nCPU/2 + 1; BlockingDeque<String>[] managedQueues = new LinkedBlockingDeque[consumerCount]; channel = new WorkStealingChannel<String>(managedQueues); Consumer[] consumers = new Consumer[consumerCount]; for(int i=0; i<consumerCount; i++){ managedQueues[i] = new LinkedBlockingDeque<String>(); consumers[i] = new Consumer(token, managedQueues[i]); } for(int i=0; i<nCPU; i++){ new Producer().start(); } for(int i=0; i<consumerCount; i++){ consumers[i].start(); } } private class Producer extends AbstractTerminatableThread{ private int i = 0; @Override protected void doRun() throws Exception { channel.put(String.valueOf(i++)); Thread.sleep(10); token.reservations.incrementAndGet(); } } private class Consumer extends AbstractTerminatableThread{ private final BlockingDeque<String> workQueue; public Consumer(TerminationToken token, BlockingDeque<String> workQueue) { super(token); this.workQueue = workQueue; } @Override protected void doRun() throws Exception { /** * 实现了工作窃取算法 */ String product = channel.take(); if(product != null){ } System.out.println("Processing product:" + product); try { Thread.sleep(new Random().nextInt(50)); } catch (Exception e) { }finally{ token.reservations.decrementAndGet(); } } }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表