Java并发学习之CountDownLatch实现原理及使用姿势
在并发编程的场景中,最常见的一个case是某个任务的执行,需要等到多个线程都执行完毕之后才可以进行,CountDownLatch可以很好解决这个问题
下面将主要从使用和实现原理两个方面进行说明,围绕点如下
CountDownLatch 是个什么鬼
怎么用(结合case说明)
底层实现原理(及如何保障功能的正常性)
I. 使用说明
同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待
比较有意思的是,CountDownLatch并未继承自其他的类or接口,在jdk中这样的类并不多见(多半是我孤陋寡闻)
0. 接口定义
在使用之前,得先了解下其定义的几个方法
也就几个接口,基本上都是比较常见的了,需要注意的是不要把await()和Object#wait()方法弄混了,否则就gg思密达了...
1. Demo演示
依然以讲解ReentrantLock中的例子来说明,多线程实现累加
实现如下
输出
看了上面的定义和Demo之后,使用就会简单一点了,一般流程如
-
首先是创建实例
CountDownLatch countDown = new CountDownLatch(2)
-
需要同步的线程执行完之后,计数-1;
countDown.countDown()
需要等待其他线程执行完毕之后,再运行的线程,调用countDown.await()实现阻塞同步
注意
在创建实例是,必须指定初始的计数值,且应大于0
必须有线程中显示的调用了countDown()计数-1方法;必须有线程显示调用了await()方法(没有这个就没有必要使用CountDownLatch了)
由于await()方法会阻塞到计数为0,如果在代码逻辑中某个线程漏掉了计数-1,导致最终计数一直大于0,直接导致死锁了
鉴于上面一点,更多的推荐await(long, TimeUnit)来替代直接使用await()方法,至少不会造成阻塞死只能重启的情况
有兴趣的小伙伴可以对比下这个实现与《Java并发学习之ReentrantLock的工作原理及使用姿势》中的demo,明显感觉使用CountDownLatch优雅得多(后面有机会介绍用更有意思的Fork/Join来实现累加)
2. 应用场景
前面给了一个demo演示如何用,那这个东西在实际的业务场景中是否会用到呢?
因为确实在一个业务场景中使用到了,不然也就不会单独捞出这一节...
电商的详情页,由众多的数据拼装组成,如可以分成一下几个模块
交易的收发货地址,销量
商品的基本信息(标题,图文详情之类的)
推荐的商品列表
评价的内容
....
上面的几个模块信息,都是从不同的服务获取信息,且彼此没啥关联;所以为了提高响应,完全可以做成并发获取数据,如
线程1获取交易相关数据
线程2获取商品基本信息
线程3获取推荐的信息
线程4获取评价信息
....
但是最终拼装数据并返回给前端,需要等到上面的所有信息都获取完毕之后,才能返回,这个场景就非常的适合CountDownLatch来做了
在拼装完整数据的线程中调用CountDownLatch#await(long, TimeUnit)等待所有的模块信息返回
每个模块信息的获取,由一个独立的线程执行;执行完毕之后调用CountDownLatch#countDown()进行计数-1
II. 实现原理
同ReentrantLock一样,依然是借助AQS的双端队列,来实现原子的计数-1,线程阻塞和唤醒
前面《Java并发学习之ReentrantLock的工作原理及使用姿势》介绍了AQS的结构,方便查看,下面直接贴出
0. AbstractQueuedSynchronizer (简称AQS)
AQS是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题
AQS使用一个FIFO的队列表示排队等待锁的线程,队列头节点称作“哨兵节点”或者“哑节点”,它不与任何线程关联。其他的节点与等待线程关联,每个节点维护一个等待状态waitStatus
1. 计数器的初始化
CountDownLatch内部实现了AQS,并覆盖了tryAcquireShared()和tryReleaseShared()两个方法,下面说明干嘛用的
通过前面的使用,清楚了计数器的构造必须指定计数值,这个直接初始化了 AQS内部的state变量
后续的计数-1/判断是否可用都是基于sate进行的
2. countDown() 计数-1的实现
上面截出计数减1的完整调用链
尝试释放锁tryReleaseShared,实现计数-1
若计数已经小于0,则直接返回false
否则执行计数(AQS的state)减一
若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
释放并唤醒阻塞线程doReleaseShared
如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队
疑问一:看到这个实现,是不是只要countDownLatch的计数为0了,所有被阻塞的线程都会被执行?
改下上面的demo,新增线程4,实现线程2的结果-线程1的结果
输出如下
上面的实现中,线程3中sleep一段时间,确保线程4的计算会优先执行;线程4计算完成之后的sleep时间,以保证线程3计算完成并输出结果,然后线程4才输出结果;结合输出,这个期望是准确的,也就是说,线程3和线程4被唤醒后是并发执行的,没有先后阻塞顺序
即CountDownLatch计数为0之后,所有被阻塞的线程都会被唤醒,且彼此相对独立,不会出现独占锁阻塞的问题
3. await() 阻塞等待计数为0
阻塞的逻辑相对简单
判断state计数是否为0,不是,则直接放过执行后面的代码
大于0,则表示需要阻塞等待计数为0
当前线程封装Node对象,进入阻塞队列
然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码
III. 小结
1. 使用注意
在创建实例时,必须指定初始的计数值,且应大于0
必须有线程中显示的调用了countDown()计数-1方法;必须有线程显示调用了await()方法(没有这个就没有必要使用CountDownLatch了)
由于await()方法会阻塞到计数为0,如果在代码逻辑中某个线程漏掉了计数-1,导致最终计数一直大于0,直接导致死锁了;
鉴于上面一点,更多的推荐await(long, TimeUnit)来替代直接使用await()方法,至少不会造成阻塞死只能重启的情况
允许多个线程调用await方法,当计数为0后,所有被阻塞的线程都会被唤醒
2. 实现原理
await内部实现流程:
判断state计数是否为0,不是,则直接放过执行后面的代码
大于0,则表示需要阻塞等待计数为0
当前线程封装Node对象,进入阻塞队列
然后就是循环尝试获取锁,直到成功(即state为0)后出队,继续执行线程后续代码
countDown内部实现流程:
尝试释放锁tryReleaseShared,实现计数-1
若计数已经小于0,则直接返回false
否则执行计数(AQS的state)减一
若减完之后,state==0,表示没有线程占用锁,即释放成功,然后就需要唤醒被阻塞的线程了
释放并唤醒阻塞线程doReleaseShared
如果队列为空,即表示没有线程被阻塞(也就是说没有线程调用了 CountDownLatch#wait()方法),直接退出
头结点如果为SIGNAL, 则依次唤醒头结点下个节点上关联的线程,并出队