从用法上来说,CyclicBarrier可能看出是CountDownLatch的高级版本,增加了重置的功能,对于多个线程的中断提供了通知的功能。
具体的用法通过api就有比较详细的介绍。
首先CyclicBarrier内部有一个内部静态类Generation。当然在每个CyclicBarrier实例中也有一个Generation域
这个类只有一个内部域broken用来表示当前的屏障是否被打破了。
PRivate static class Generation { boolean broken = false; }Generation只在线程不中断的情况下用来判断CyclicBarrier的状态的。 是由于有count个线程调用了await来正常中断的——即所谓的开闸状态。 还是由于其他特殊原因打破了CyclicBarrier(也就是当前CyclicBarrier无效了)——即所谓的打破状态。
而如果需要重置也就是讲CyclicBarrier实例中的域来重新构建一个新的Generation就可以了。
锁、条件队列、状态变量、条件谓词之间的关系。
最主要的就是await()方法。
实现的功能:
调用await()的线程会等待直到有足够数量的线程调用await——也就是开闸状态,
当最后一个线程到达或者出现下面的情况——也就是打破状态。
有其他线程中断当前线程。则抛出interruptException指定了限时操作,并到达线程,则抛出TimeoutException如果barrier被重置,或者屏障处于打破状态,则抛出BrokenBarrierException什么样的情况会出现打破状态?当任意等待线程抛出BrokenBarrierException的时候会使得当前屏障处于打破状态。
await方法是通过一个内部方法dowait来实现的。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; if (g.broken)//如果当前Generation是处于打破状态则传播这个BrokenBarrierExcption throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier();//如果当前线程被中断则使得当前generation处于打破状态,重置剩余count。并且唤醒状态变量。这时候其他线程会传播BrokenBarrierException. throw new InterruptedException(); } int index = --count;//尝试降低当前count if (index == 0) { // tripped//如果当前状态将为0,则Generation处于开闸状态。运行可能存在的command,设置下一个Generation。相当于每次开闸之后都进行了一次reset。 boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally { if (!ranAction)//如果运行command失败也会导致当前屏障被打破。 breakBarrier(); } } // loop until tripped, broken, interrupted, or timed out for (;;) { try { if (!timed)//阻塞在当前的状态变量。 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) {//如果当前线程被中断了则使得屏障被打破。并抛出异常。 breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt();//这种捕获了InterruptException之后调用Thread.currentThread().interrupt()是一种通用的方式。但是之前源码中好像都没有体现。我第一次见这个好像是java并发实践中。这样做的目的是什么?其实就是为了保存中断状态,从而让其他更高层次的代码注意到这个中断。但是需要注意的是这里需要其他代码予以配合才行否则这样做其实是比较危险的一种方式,因为这相当于吞了这个异常。 } } //从阻塞恢复之后,需要重新判断当前的状态。 if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }此外再看下两个小过程:
这两个小过程当然是需要锁的,但是由于这两个方法只是通过其他方法调用,所以依然是在持有锁的范围内运行的。这两个方法都是对域进行操作。
nextGeneration实际上在屏障开闸之后重置状态。以待下一次调用。 breakBarrier实际上是在屏障打破之后设定打破状态,以唤醒其他线程并通知。
private void nextGeneration() { // signal completion of last generation trip.signalAll(); // set up next generation count = parties; generation = new Generation(); } private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }reset reset方法比较简单。但是这里还是要注意一下要先打破当前屏蔽,然后再重建一个新的屏蔽。否则的话可能会导致信号丢失。
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }新闻热点
疑难解答