首页 > 学院 > 开发设计 > 正文

java 阻塞队列

2019-11-14 10:22:51
字体:
来源:转载
供稿:网友

阻塞队列会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素。当队列中有元素后,被阻塞的线程会自动被唤醒,不需要notify。这样提供了极大的方便性。

一.几种主要的阻塞队列

  自从java 1.5之后,在java.util.concurrent包下提供了若干个阻塞队列,主要有以下几个:

ArrayBlockingQueue:一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。    这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。   此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。

LinkedBlockingQueue:一个基于已链接节点的、范围任意的 blocking queue。此队列按FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。   可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。    此类及其迭代器实现 Collection 和 Iterator 接口的所有可选 方法。

PRiorityBlockingQueue:以上2种队列都是先进先出队列,而PriorityBlockingQueue却不是,一个无界阻塞队列,它使用与类 PriorityQueue 相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。   此类及其迭代器可以实现 Collection 和 Iterator 接口的所有可选 方法。iterator() 方法中提供的迭代器并不保证以特定的顺序遍历 PriorityBlockingQueue 的元素。如果需要有序地进行遍历,则应考虑使用 Arrays.sort(pq.toArray())。此外,可以使用方法 drainTo 按优先级顺序移除全部或部分元素,并将它们放在另一个 collection 中。   在此类上进行的操作不保证具有同等优先级的元素的顺序。如果需要实施某一排序,那么可以定义自定义类或者比较器,比较器可使用修改键断开主优先级值之间的联系。

DelayQueue:Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。   此类及其迭代器实现了 Collection 和 Iterator 接口的所有可选方法。

二.阻塞队列的实现原理

如果队列是空的,消费者会一直等待,当生产者添加元素时候,消费者是如何知道当前队列有元素的呢?JDK使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。

三.示例对比

示例一:使用阻塞队列实现的生产者消费者模式package com.bh.block;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;/**使用阻塞队列实现的生产者消费者模式 * @author bh * */public class Test { private int queueSize = 10; /*一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。 队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。 */ private BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } /**消费者 * @author bh * */ class Consumer extends Thread{ @Override public void run() { consume(); } /** * 消费 */ private void consume() { while(true){ try { queue.take(); System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } catch (InterruptedException e) { e.printStackTrace(); } } } } /**生产者 * @author bh * */ class Producer extends Thread{ @Override public void run() { produce(); } /** * 生产 */ private void produce() { while(true){ try { queue.put(1); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } catch (InterruptedException e) { e.printStackTrace(); } } } }}示例二:使用非阻塞队列实现的生产者消费者模式package com.bh.unblock;import java.util.PriorityQueue;import java.util.Queue;/**非阻塞队列通过使用Object.wait()和Object.notify()方法实现生产者与消费者模式 * @author bh * */public class Test { private int queueSize = 10; //基于优先级堆的无界优先级队列 private Queue<Integer> queue = new PriorityQueue<Integer>(queueSize); public static void main(String[] args) { Test test = new Test(); Producer producer = test.new Producer(); Consumer consumer = test.new Consumer(); producer.start(); consumer.start(); } /**消费者 * @author bh * */ class Consumer extends Thread{ @Override public void run() { consume(); } /**消费 */ private void consume() { while(true){ synchronized (queue) { while(queue.size() == 0){ try { System.out.println("队列空,等待数据"); //在其他线程调用此对象的 notify() 方法或 notifyAll() 方法前,导致当前线程等待 queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.poll(); //每次移走队首元素 queue.notify(); //唤醒在此对象监视器上等待的单个线程 System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素"); } } } } /**生产者 * @author bh * */ class Producer extends Thread{ @Override public void run() { produce(); } /**生产 */ private void produce() { while(true){ synchronized (queue) { while(queue.size() == queueSize){ try { System.out.println("队列满,等待有空余空间"); queue.wait(); } catch (InterruptedException e) { e.printStackTrace(); queue.notify(); } } queue.offer(1); //每次插入一个元素 queue.notify(); System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size())); } } } }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表