CyclicBarrier原理、实战和源码分析

一 原理

    CyclicBarrier(循环屏障),也即可以循环利用的屏障。内部基于ReentrantLock和Condition实现。

关于ReentrantLock和Condition。CyclicBarrier内部维护了整型变量count(拦截线程计数器),每次调用await方法,

count就会减1,如果count还有剩余,则线程调用Condition的await等待,如果count为0了,则打破屏障,

调用Condition的signalAll唤醒等待的线程执行。关于CyclicBarrier中的parties,Generation在源码中分析。

    CyclicBarrier原理可以简单理解为等待/通知模型,count不为0,线程等待,count为0,则唤醒等待线程执行。

    屏障用于控制一组线程等待就绪,然后打破屏障,线程继续执行。比如,学校操场运动会5条跑道,起跑线好比屏障,

5个运动员先后到达起跑线就绪后,鸣枪开跑。

二 实战

例如,操场5条跑道,起跑线相当于屏障,5个运动员先后就绪后,鸣枪,开跑!

AthleteThread

package com.lanhuigu.demo10.CyclicBarrier;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * 运动员跑步全部就绪-->鸣枪-->开跑!
 * @author yihonglei
 * @date 2018/9/30 19:27
 */
public class AthleteThread extends Thread{
    /**
     * 操场上有5条跑道,当运动员全部就绪-->鸣枪-->开跑!
     * 这里new Runnable作为barrierAction,也就是屏障打破条后触发的动作,比如运行员就绪后,鸣枪!
     */
    private static final CyclicBarrier playGround = new CyclicBarrier(5, new Runnable() {
        @Override
        public void run() {
            System.out.println("鸣枪!!!!!!!!!!!!!!!!");
        }
    });

    // 运动员编号
    private Integer num;

    /**
     * 构造器
     * @param num 运动员编号
     */
    public AthleteThread(Integer num) {
        this.num = num;
    }

    @Override
    public void run() {
        try {
            System.out.println("运动员序号:" + num + "就绪...................");
            playGround.await(); // 拦截线程
            System.out.println("运行员序号:" + num + "开跑>>>>>>>>>>>>>>>>>>>");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

CyclicBarrierDemo

package com.lanhuigu.demo10.CyclicBarrier;

import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * CyclicBarrier实例演示
 * @author yihonglei
 * @date 2018/7/24 23:39
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        Executor executor = Executors.newFixedThreadPool(5);

        // 5个运动员参与运动
        for (int i = 1; i <= 5; i++) {
            executor.execute(new AthleteThread(i));
        }
    }
}

运行结果

CyclicBarrier原理、实战和源码分析

程序分析

1)AthleteThread中通过CyclicBarrier构造器创建拦截数为5,以及打破屏障时执行什么线程的CyclicBarrier;

2)每一个线程在开跑前都要等待其它线程就绪,所有运行员就绪后,才打破屏障,开跑!

下面结合原理和实例通过源码分析CyclicBarrier的实现原理,看源码才是最佳的学习方式,

一切吹牛逼在源码面前都是纸老虎!

三 源码分析

1、构造器

CyclicBarrier原理、实战和源码分析

1)传入线程拦截数,调用2,创建CyclicBarrier;

2)基于parties,barrierAction创建CyclicBarrier;

CyclicBarrier原理、实战和源码分析

1)parties为CyclicBarrier拦截线程数量,变量被定义为final,表示不可以改变,主要用于CyclicBarrier重置时

创建CyclicBarrier对象;

2)barrierCommand为打破屏障时执行的线程,这个线程可选,并不是必须定义的;

3)count为线程拦截计数器,每一次await都会减少1,初始化时count数量等于parties大小;

2、await逻辑

await中包含线程等待、唤醒以及重置CyclicBarrier逻辑,await有能够指定超时等的重载方法,

这里只分析默认方法。

CyclicBarrier原理、实战和源码分析

具体实现在dowait中,dowait方法内容比较多,源码如下:

CyclicBarrier原理、实战和源码分析

CyclicBarrier原理、实战和源码分析

在进行源码分析前,需要先说明CyclicBarrier中的锁和Condition变量定义:

CyclicBarrier原理、实战和源码分析

ReentrantLock为重入锁,Condition用于实现一组线程的等待/唤醒机制,也就CyclicBarrier能实现等待/唤醒的底层实现。

1)每一个线程进入dowait的时候,需要上锁,避免并发;

2)这里有个Generation,下一代的意思,源码:

private static class Generation {
    boolean broken = false;
}

Generation维护了一个broken变量,表示是否打破屏障。

如果屏障已经打破或线程已经停止则都会抛出异常。

3)count减少1,如果index为0,表示已经达到拦截线程已经数已经用完。

4)如果barrierCommand不为空,则执行run方法。

5)重点在nextGeneration()里面,源码:

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

打破屏障的时候,会调trip.signalAll()唤醒所有等待的线程,count重置为parties,generation也重置。

相当于把CyclicBarrier的count重置,也就CyclicBarrier重置了,因为parties,barrierCommand都没有变。

6)这个一般情况下不会触发。

7)如果index不为0,程序会执行到for循环,默认timed为false,调用trip.await()线程会进入等待状态。

8)这个是显示指定超时时间,线程等待超时时间自动唤醒执行,在使用CyclicBarrier的非默认await

方法,指定超时时间时使用,看一眼CyclicBarrier的await重载方法就可以理解。

9)如果已经打破屏障,直接抛异常。

10)如果generation没有被重置,说明还没有打破,返回剩余线程拦截数。

11)如果调用的是超时的await,传入参数时间错误,屏障恢复,同时会抛超时异常,

并且唤醒其他等待线程。