首页 > 学院 > 开发设计 > 正文

Java多线程之并发协作生产者消费者设计模式

2019-11-14 08:58:30
字体:
来源:转载
供稿:网友

学习java的同学注意了!!! 学习过程中遇到什么问题或者想获取学习资源的话,欢迎加入Java学习交流群,群号码:183993990  我们一起学Java!

两个线程一个生产者个一个消费者

需求情景

两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个

涉及问题

同步问题:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用标记或加锁机制wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现(共三个类和一个main方法的测试类)

Resource.java

复制代码
/** * Created by yuandl on 2016-10-11./** * 资源 */public class Resource {    /*资源序号*/    PRivate int number = 0;    /*资源标记*/    private boolean flag = false;    /**     * 生产资源     */    public synchronized void create() {        if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;            try {                wait();//让生产线程等待            } catch (InterruptedException e) {                e.printStackTrace();            }        }        number++;//生产一个        System.out.println(Thread.currentThread().getName() + "生产者------------" + number);        flag = true;//将资源标记为已经生产        notify();//唤醒在等待操作资源的线程(队列)    }    /**     * 消费资源     */    public synchronized void destroy() {        if (!flag) {            try {                wait();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println(Thread.currentThread().getName() + "消费者****" + number);        flag = false;        notify();    }}复制代码

Producer.java

复制代码
/** * Created by yuandl on 2016-10-11. * /** * 生产者   http://www.manongjc.com */public class Producer implements Runnable {    private Resource resource;    public Producer(Resource resource) {        this.resource = resource;    }    @Override    public void run() {        while (true) {            try {                Thread.sleep(10);            } catch (InterruptedException e) {                e.printStackTrace();            }            resource.create();        }    }}复制代码

Consumer.java

复制代码
/** * 消费者 */public class Consumer implements Runnable {    private Resource resource;    public Consumer(Resource resource) {        this.resource = resource;    }    @Override    public void run() {        while (true) {            try {                Thread.sleep(10);            } catch (InterruptedException e) {                e.printStackTrace();            }            resource.destroy();        }    }}复制代码

ProducerConsumerTest.java

复制代码
/** * Created by yuandl on 2016-10-11. */public class ProducerConsumerTest {    public static void main(String args[]) {        Resource resource = new Resource();        new Thread(new Producer(resource)).start();//生产者线程        new Thread(new Consumer(resource)).start();//消费者线程    }}复制代码

打印结果:

复制代码
Thread-0生产者------------1Thread-1消费者****1Thread-0生产者------------2Thread-1消费者****2Thread-0生产者------------3Thread-1消费者****3Thread-0生产者------------4Thread-1消费者****4Thread-0生产者------------5Thread-1消费者****5Thread-0生产者------------6Thread-1消费者****6Thread-0生产者------------7Thread-1消费者****7Thread-0生产者------------8Thread-1消费者****8Thread-0生产者------------9Thread-1消费者****9Thread-0生产者------------10Thread-1消费者****10复制代码

以上打印结果可以看出没有任何问题

 

多个线程,多个生产者和多个消费者的问题

需求情景

四个线程,两个个负责生产,两个个负责消费,生产者生产一个,消费者消费一个

涉及问题

notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的所有线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

再次测试代码

ProducerConsumerTest.java

复制代码
** * Created by yuandl on 2016-10-11. */public class ProducerConsumerTest {    public static void main(String args[]) {        Resource resource = new Resource();        new Thread(new Consumer(resource)).start();//生产者线程        new Thread(new Consumer(resource)).start();//生产者线程        new Thread(new Producer(resource)).start();//消费者线程        new Thread(new Producer(resource)).start();//消费者线程    }}复制代码

运行结果:

复制代码
Thread-0生产者------------100Thread-3消费者****100Thread-0生产者------------101Thread-3消费者****101Thread-2消费者****101Thread-1生产者------------102Thread-3消费者****102Thread-0生产者------------103Thread-2消费者****103Thread-1生产者------------104Thread-3消费者****104Thread-1生产者------------105Thread-0生产者------------106Thread-2消费者****106Thread-1生产者------------107Thread-3消费者****107Thread-0生产者------------108Thread-2消费者****108Thread-0生产者------------109Thread-2消费者****109Thread-1生产者------------110Thread-3消费者****110复制代码

通过以上打印结果发现问题

101生产了一次,消费了两次105生产了,而没有消费

原因分析

当两个线程同时操作生产者生产或者消费者消费时,如果有生产者或者的两个线程都wait()时,再次notify(),由于其中一个线程已经改变了标记而另外一个线程再次往下直接执行的时候没有判断标记而导致的。if判断标记,只有一次,会导致不该运行的线程运行了。出现了数据错误的情况。

解决方案

while判断标记,解决了线程获取执行权后,是否要运行!也就是每次wait()后再notify()时先再次判断标记

 

代码改进(Resource中的if->while) 

Resource.java

复制代码
/** * Created by yuandl on 2016-10-11./** * 资源 */public class Resource {    /*资源序号*/    private int number = 0;    /*资源标记*/    private boolean flag = false;    /**     * 生产资源     */    public synchronized void create() {        while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;            try {                wait();//让生产线程等待            } catch (InterruptedException e) {                e.printStackTrace();            }        }        number++;//生产一个        System.out.println(Thread.currentThread().getName() + "生产者------------" + number);        flag = true;//将资源标记为已经生产        notify();//唤醒在等待操作资源的线程(队列)    }    /**     * 消费资源     */    public synchronized void destroy() {        while (!flag) {            try {                wait();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println(Thread.currentThread().getName() + "消费者****" + number);        flag = false;        notify();    }}复制代码

再次发现问题

打印到某个值比如生产完74,程序运行卡死了,好像锁死了一样。

原因分析

notify:只能唤醒一个线程,如果本方唤醒了本方,没有意义。而且while判断标记+notify会导致”死锁”。

解决方案

notifyAll解决了本方线程一定会唤醒对方线程的问题。

 

最后代码改进(Resource中的notify()->notifyAll()) 

Resource.java

复制代码
/** * Created by yuandl on 2016-10-11./** * 资源 */public class Resource {    /*资源序号*/    private int number = 0;    /*资源标记*/    private boolean flag = false;    /**     * 生产资源     */    public synchronized void create() {        while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;            try {                wait();//让生产线程等待            } catch (InterruptedException e) {                e.printStackTrace();            }        }        number++;//生产一个        System.out.println(Thread.currentThread().getName() + "生产者------------" + number);        flag = true;//将资源标记为已经生产        notifyAll();//唤醒在等待操作资源的线程(队列)    }    /**     * 消费资源     */    public synchronized void destroy() {        while (!flag) {            try {                wait();            } catch (InterruptedException e) {                e.printStackTrace();            }        }        System.out.println(Thread.currentThread().getName() + "消费者****" + number);        flag = false;        notifyAll();    }}复制代码

运行结果:

Thread-0生产者------------412Thread-2消费者****412Thread-0生产者------------413Thread-3消费者****413Thread-1生产者------------414Thread-2消费者****414Thread-1生产者------------415Thread-2消费者****415Thread-0生产者------------416Thread-3消费者****416Thread-1生产者------------417Thread-3消费者****417Thread-0生产者------------418Thread-2消费者****418Thread-0生产者------------419Thread-3消费者****419Thread-1生产者------------420Thread-2消费者****420

以上就大功告成了,没有任何问题。

学习Java的同学注意了!!! 学习过程中遇到什么问题或者想获取学习资源的话,欢迎加入Java学习交流群,群号码:183993990  我们一起学Java!


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表