CyclicBarrier原理、实战和源码分析
一 原理
CyclicBarrier(循环屏障),也即可以循环利用的屏障。内部基于ReentrantLock和Condition实现。
关于ReentrantLock和Condition。CyclicBarrier内部维护了整型变量count(拦截线程计数器),每次调用await方法,
count就会减1,如果count还有剩余,则线程调用Condition的await等待,如果count为0了,则打破屏障,
调用Condition的signalAll唤醒等待的线程执行。关于CyclicBarrier中的parties,Generation在源码中分析。
CyclicBarrier原理可以简单理解为等待/通知模型,count不为0,线程等待,count为0,则唤醒等待线程执行。
屏障用于控制一组线程等待就绪,然后打破屏障,线程继续执行。比如,学校操场运动会5条跑道,起跑线好比屏障,
5个运动员先后到达起跑线就绪后,鸣枪开跑。
二 实战
例如,操场5条跑道,起跑线相当于屏障,5个运动员先后就绪后,鸣枪,开跑!
AthleteThread
package com.lanhuigu.demo10.CyclicBarrier;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* 运动员跑步全部就绪-->鸣枪-->开跑!
* @author yihonglei
* @date 2018/9/30 19:27
*/
public class AthleteThread extends Thread{
/**
* 操场上有5条跑道,当运动员全部就绪-->鸣枪-->开跑!
* 这里new Runnable作为barrierAction,也就是屏障打破条后触发的动作,比如运行员就绪后,鸣枪!
*/
private static final CyclicBarrier playGround = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("鸣枪!!!!!!!!!!!!!!!!");
}
});
// 运动员编号
private Integer num;
/**
* 构造器
* @param num 运动员编号
*/
public AthleteThread(Integer num) {
this.num = num;
}
@Override
public void run() {
try {
System.out.println("运动员序号:" + num + "就绪...................");
playGround.await(); // 拦截线程
System.out.println("运行员序号:" + num + "开跑>>>>>>>>>>>>>>>>>>>");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
CyclicBarrierDemo
package com.lanhuigu.demo10.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* CyclicBarrier实例演示
* @author yihonglei
* @date 2018/7/24 23:39
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
Executor executor = Executors.newFixedThreadPool(5);
// 5个运动员参与运动
for (int i = 1; i <= 5; i++) {
executor.execute(new AthleteThread(i));
}
}
}
运行结果
程序分析
1)AthleteThread中通过CyclicBarrier构造器创建拦截数为5,以及打破屏障时执行什么线程的CyclicBarrier;
2)每一个线程在开跑前都要等待其它线程就绪,所有运行员就绪后,才打破屏障,开跑!
下面结合原理和实例通过源码分析CyclicBarrier的实现原理,看源码才是最佳的学习方式,
一切吹牛逼在源码面前都是纸老虎!
三 源码分析
1、构造器
1)传入线程拦截数,调用2,创建CyclicBarrier;
2)基于parties,barrierAction创建CyclicBarrier;
1)parties为CyclicBarrier拦截线程数量,变量被定义为final,表示不可以改变,主要用于CyclicBarrier重置时
创建CyclicBarrier对象;
2)barrierCommand为打破屏障时执行的线程,这个线程可选,并不是必须定义的;
3)count为线程拦截计数器,每一次await都会减少1,初始化时count数量等于parties大小;
2、await逻辑
await中包含线程等待、唤醒以及重置CyclicBarrier逻辑,await有能够指定超时等的重载方法,
这里只分析默认方法。
具体实现在dowait中,dowait方法内容比较多,源码如下:
在进行源码分析前,需要先说明CyclicBarrier中的锁和Condition变量定义:
ReentrantLock为重入锁,Condition用于实现一组线程的等待/唤醒机制,也就CyclicBarrier能实现等待/唤醒的底层实现。
1)每一个线程进入dowait的时候,需要上锁,避免并发;
2)这里有个Generation,下一代的意思,源码:
private static class Generation {
boolean broken = false;
}
Generation维护了一个broken变量,表示是否打破屏障。
如果屏障已经打破或线程已经停止则都会抛出异常。
3)count减少1,如果index为0,表示已经达到拦截线程已经数已经用完。
4)如果barrierCommand不为空,则执行run方法。
5)重点在nextGeneration()里面,源码:
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
打破屏障的时候,会调trip.signalAll()唤醒所有等待的线程,count重置为parties,generation也重置。
相当于把CyclicBarrier的count重置,也就CyclicBarrier重置了,因为parties,barrierCommand都没有变。
6)这个一般情况下不会触发。
7)如果index不为0,程序会执行到for循环,默认timed为false,调用trip.await()线程会进入等待状态。
8)这个是显示指定超时时间,线程等待超时时间自动唤醒执行,在使用CyclicBarrier的非默认await
方法,指定超时时间时使用,看一眼CyclicBarrier的await重载方法就可以理解。
9)如果已经打破屏障,直接抛异常。
10)如果generation没有被重置,说明还没有打破,返回剩余线程拦截数。
11)如果调用的是超时的await,传入参数时间错误,屏障恢复,同时会抛超时异常,
并且唤醒其他等待线程。