Java并发工具CyclicBarrier的用法及实现原理
CyclicBarrier的基本用法
我们用Java编写多线程应用程序,有多种JDK同步工具可以选择,常用的一般是ReentrantLock锁和Condition条件队列等。然而,在某些特殊应用场景中,我们应该使用一些特殊的同步工具。例如,有时候我们需要创建N个线程,让他们各自执行,当它们运行到某个特定位置时暂定,并且等待另外N-1线程都到达它们各自的暂停位置再同时继续运行。或者,换一种说法,我们希望N个线程达到一种互相等待状态之后再一起往下执行,如下图所示。
让5个线程形成相互等待状态,然后再继续执行,这种类型的同步工作用CyclicBarrier来实现可以更简单,如下图所示。
CyclicBarrier的高级用法
除了上面讲到的让N个线程形成互相等待这种基本用法外,CyclicBarrier还可以有一种高级用法。这种用法类似于分布式事务——让N个线程中的操作要么都执行,要么都不执行。JDK称这种特性为"all-or-none breakage model",即要么全部都成功要么全都抛异常。例如,假如我们已经创建了一个CyclicBarrier,
b = new CyclicBarrier(5);
前4个线程都执行到了
b.await();
这里,在这里进行等待。此时,第5个线程却没有调用b.await(),而是遇到问题,转而去调用了
b.reset();
那么,前面4个阻塞在await()方法上的线程将抛出BrokenBarrierException异常(如果在处理这个异常的代码中编写执行回滚动作的代码,那么一个单机多线的分布式事务就可以实现出来了)。
CyclicBarrier的基本实现原理
CyclicBarrier根据一个倒数计数来判断应该阻塞还是唤醒,类似于CountDownLatch。每次调用await,都让计数值减1,若计数器不等于0就让调用await的当前线程阻塞;当计数值减至0后恢复运行。CyclicBarrier让N个线程达到互相等待状态,然后同时继续执行,这个过程如下图所示。
注意,CyclicBarrier的构造函数还可以传入一个Runnable barrierAction,用来执行一些结果合并工作,上图未画出。barrierAction的run()方法在trip之后,在等待线程唤醒之前执行,并且是最后一个到达的线程中运行。
CyclicBarrier的内部利用ReentrantLock锁和关联的Condition条件队列来实现等待和唤醒的,并没有利用AbstractQueuedSynchronizer。它的await()操作对倒数的计数变量进行减1,如果变为0,则执行barrierAction中的run(),然后唤醒条件队列中的其他线程(调用Condition.signalAll()实现)。
可以看到,CyclicBarrier的主要操作都用到了ReentrantLock(因为要用Condition,必须先得到它所关联的ReentrantLock),并发性并不高。