java多线程知识点 - AQS - CyclicBarrier,基于源码深度分析!详细!赞
CyclicBarrier
有这样一个场景:当你希望创建一组任务,并且并行的执行他们,然后在下一个步骤之前进行停顿等待,直到所有的任务都执行完成,然后继续执行下一项任务。
以前怎么实现:
定义一个主线程,然后定义子线程,让子线程去join到主线程,这时候主线程被阻塞。当所有的子线程都执行完毕后,主线程**开始执行。这样的方法如果遇到较多线程会很难实现。
如何解决:
我们选用CyclicBarrier 和 countdownlatch都可以起到,多线程执行完毕后统一进行下一步的操作。CountDownLatch请参见我另一篇文章,后面也会做出两者的区别分析。
如何使用:
demo1:
//首先定义一个CyclicBarrier对象,new的时候参数表示等待的线程数 private static CyclicBarrier barrier = new CyclicBarrier(5); // 主方法 public static void main(String[] args) throws Exception { // 定义一个线程池 ExecutorService executor = Executors.newCachedThreadPool(); // 循环10次 for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000);//每次循环创建线程前先睡一秒为了方便观察 executor.execute(() -> { try { race(threadNum);//执行race方法 } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown();// 关闭线程池。如果想关闭线程池后能种植所有线程 //情调用shutdownnow方法 } private static void race(int threadNum) throws Exception { log.info("{} is ready", threadNum); //执行完毕后进入等待状态 等待定义的CyclicBarrier(5)五个等待线程执行完毕 barrier.await(); //当五个线程都执行完毕,统一执行continue方法。然后继续执行for循环 log.info("{} continue", threadNum); }
我们来查看一下结果
结果很显然,定义的五个线程都执行完毕后才继续执行接下来的任务。
下面我们来看另一种使用方法:
demo2:
//首先定义五个等待线程,但是额外加了一个参数,他表示当五个线程执行完后先执行这个方法,再去continue private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(400); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { log.info("{} is ready", threadNum); // 设定超时时间,超过2000毫秒就抛出异常 barrier.await(2000, TimeUnit.MILLISECONDS); log.info("{} continue", threadNum); }
结果:
当我把超时时间改一下,缩短到执行五个线程的睡眠总时间长于超时时间,我们来看一下日志输出
下面步入正题!
看完demo 是不是觉得很简单呢,那么CyclicBarrier和CountDownLatch有什么区别呢。
首先最明显的区别就是 CountDownLatch采用计数器,不断减一,减到零就停止。
CyclicBarrier可以一直执行规划好的线程数,一直执行下去。比如有100条请求要执行,它可以一次执行五条然后统一做操作,并且不断循环直到所有请求都处理完。
CyclicBarrier类有两个常用的构造方法:
1. CyclicBarrier(int parties)这里的parties也是一个计数器,例如,初始化时parties里的计数是3,于是拥有该CyclicBarrier对象的线程当parties的计数为3时就唤醒,注:这里parties里的计数在运行时当调用CyclicBarrier:await()时,计数就加1,一直加到初始的值
2. CyclicBarrier(int parties, Runnable barrierAction)
这里的parties与上一个构造方法的解释是一样的,这里需要解释的是第二个入参(Runnable barrierAction),这个参数是一个实现Runnable接口的类的对象,也就是说当parties加到初始值时就出发barrierAction的内容。
基于源码观赏CyclicBarrier
首先是参数:
private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock();//可重入锁 /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); //一个condition队列,AQS中数据结构中单向链表 /** The number of parties */ private final int parties;//一次并行线程数 /* The command to run when tripped */ private final Runnable barrierCommand; //额外的构造器需要实现的方法,在多线程等待后执行任务之后、下一批线程执行任务前执行。详见demo2构造器 /** The current generation */ private Generation generation = new Generation();//下一批线程循环等待标记,false /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count;,//等待线程数,用于--count操作
其次登场的是构造器:
public CyclicBarrier(int parties, Runnable barrierAction) { //请注意是个Runnable的方法 if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } public CyclicBarrier(int parties) { this(parties, null); }
接下来是著名的await方法,可以看到调用的dowait方法有两个参数分别是false和0L 标识不设置超时,0毫秒。
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
接下来来分析dowait方法:我们可以看里面定义了ReenTrantLock,可重入锁。开始锁和结束锁包含整个dowait方法。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation; //栅栏可重用的标志,初始值false if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; //正在等待的线程数,初始count是构造器中的一次执行线程数 if (index == 0) { // tripped 当构造器指定的线程数为0时 boolean ranAction = false; try { final Runnable command = barrierCommand; //处理构造器中额外的任务,转换成runnable方法 if (command != null) command.run(); ranAction = true; nextGeneration();//释放所有的等待线程,将broken设置为true。栅栏破碎,线程跑出来了,开始执行啦!同时初始化下一批线程 return 0; } finally { if (!ranAction)//如果try失败,则ranAction 为false,执行下面的代码。 breakBarrier();//重置count,唤醒所有等待线程,将broken设置为true。栅栏破碎,线程跑出来了,开始执行啦! } } // 死循环,退出条件就是超时。如果超时false则trip.await , 否则按照超时时间计算,超时则执行trip.signalAll(); for (;;) { try { if (!timed) //该线程执行await操作,释放该线程占有的锁,唤醒CLH队列中的后继节点 trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { //其实说白了这步就是判断过程中线程中断当前的generation是不是最新的,并且broken是不是初始false breakBarrier(); //如果是就执行等待线程放行 throw ie; } else {//否则就中断当前线程 // We're about to finish waiting even if we had not //执行interrupt // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation)// 如果次轮等待已结束 return index; if (timed && nanos <= 0L) { breakBarrier(); // 1、generation.broken = true; 2、count = parties; 3、trip.signalAll(); throw new TimeoutException(); } } } finally { lock.unlock(); } }