从ReentrantLock来理解AbstractQueuedSynchronizer
目录
2.AbstractQueuedSynchronizer在ReenTrantLock实现可重入锁
3.通过AbstractQueuedSynchronizer创建简单的同步锁
1.基本概念
AbstractQueuedSynchronizer
AQS是一种提供了原子式管理同步状态、阻塞和唤醒线程功能以及队列模型的简单框架。
简单来说,作用就是维护锁的当前状态和线程等待列表,维护的关键则是对字段state以及双端双向队列的使用
ReenTrantLock
ReentrantLock意思为可重入锁,指的是一个线程能够对一个临界资源重复加锁,即当前线程获取该锁再次获取不会被阻塞。
2.AbstractQueuedSynchronizer在ReenTrantLock实现可重入锁
2.1 基本特性
2.1.1 重入
// 可重入测试 public class ReentrantTest { public static void main(String[] args) { // main --> firstAction --> secondAction --> lastAction SyncAction.doAction(ReentrantTest::firstAction); } private static void firstAction() { SyncAction.doAction(ReentrantTest::secondAction); } private static void secondAction() { SyncAction.doAction(ReentrantTest::lastAction); } private static void lastAction() { System.out.println("Hello World, last Action."); } } // 同步方法 public class SyncAction { private static ReentrantLock lock = new ReentrantLock(); // 自定义锁 // private static CustomLock lock = new CustomLock(); /** * 功能描述:同步方法 * * @param runnable * @author lvchengyi * @date 10:43 下午 2020/9/1 */ public static void doAction(Runnable runnable) { lock.lock(); try { runnable.run(); } catch (Exception e) { System.out.println(e); } finally { lock.unlock(); } } } |
在同一个线程中,代码中三次lock操作,都进入了锁,体现锁的重入性。
2.1.2 同步
// 同步测试 public class SyncTest { public static void main(String[] args) { // main --> firstAction 终止,死锁发生 // main线程拥有ReentrantLock的锁,firstAction线程等待main线程的锁释放 // main线程等待firstAction方法执行完毕,也在等待,因此发生死锁 ReentrantTest.syncAction(SyncTest::firstAction); } public static void firstAction() { System.out.println(Thread.currentThread().getName()); CompletableFuture.runAsync(() -> ReentrantTest.syncAction(SyncTest::secondAction)).join(); } public static void secondAction() { System.out.println(Thread.currentThread().getName()); CompletableFuture.runAsync(() -> ReentrantTest.syncAction(SyncTest::lastAction)).join(); } private static void lastAction() { System.out.println(Thread.currentThread().getName()); System.out.println("Hello World, last Action."); } } |
在不同线程中,第二次lock发生了阻塞,拿不到锁,体现锁的同步性。
2.1.3 实际场景
public class NormalLockTest { private static int sum = 0; public static void main(String[] args) { List<CompletableFuture<Void>> completableFutureList = new ArrayList<>(); // 开启10个线程都做递增方法 for (int i = 0; i < 10; i++) { // 加锁后进行递增 // CompletableFuture<Void> future = CompletableFuture.runAsync(() -> SyncAction.doAction(NormalLockTest::addSum)); // 未加锁进行递增 CompletableFuture<Void> future = CompletableFuture.runAsync(NormalLockTest::addSum); completableFutureList.add(future); } // 等待10个线程执行完毕 CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[0])).join(); System.out.println(sum); } private static void addSum() { // 给sum变量加1000 for (int i = 0; i < 1000; i++) { sum++; } } } |
在并发场景下,假设有T1、T2、T3竞争同一个ReentrantLock锁
若T1拿到锁后,T1(lock),T2(park),T3(park)
Waited Queue → Head → T2 next → T3
T1(unlock) → T2.unpark
T2(free),T3(park) 可能会存在竞争锁
Waited Queue → Head(T2) → T3
假设T2(lock),T3(park)
......
具体如何实现见下文
2.2 AQS重要方法与ReentrantLock的关联
方法名 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占方式。arg为获取锁的次数,尝试获取资源,成功则返回True,失败则返回False。 |
protected boolean tryRelease(int arg) | 独占方式。arg为释放锁的次数,尝试释放资源,成功则返回True,失败则返回False。 |
一般来说,自定义同步器要么是独占方式,要么是共享方式,它们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
ReentrantLock是独占锁,所以实现了tryAcquire和tryRelease。
ReentrantLock使用state字段实现同步和可重入:
-
State初始化的时候为0,表示没有任何线程持有锁。
-
当有线程持有该锁时,值就会在原来的基础上+1,同一个线程多次获得锁是,就会多次+1,这里就是可重入的概念。
-
解锁也是对这个字段-1,一直到0,此线程对锁释放。
2.3 ReentrantLock和AQS交互过程
2.3.1 交互图
2.3.2 流程图-加锁
2.3.3 解析
加锁:
-
通过ReentrantLock的加锁方法Lock进行加锁操作。
-
会调用到内部类Sync的Lock方法,由于Sync#lock是抽象方法,根据ReentrantLock初始化选择的公平锁和非公平锁,执行相关内部类的Lock方法,本质上都会执行AQS的Acquire方法。
-
AQS的Acquire方法会执行tryAcquire方法,但是由于tryAcquire需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁和非公平锁内部类实现的tryAcquire方法,因此会根据锁类型不同,执行不同的tryAcquire。
-
tryAcquire是获取锁逻辑,获取失败后,会执行框架AQS的后续逻辑,跟ReentrantLock自定义同步器无关。
解锁:
-
通过ReentrantLock的解锁方法Unlock进行解锁。
-
Unlock会调用内部类Sync的Release方法,该方法继承于AQS。
-
Release中会调用tryRelease方法,tryRelease需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁。
-
释放成功后,所有处理由AQS框架完成,与自定义同步器无关。
2.4 AQS的框架作用如何体现
这里以ReentrantLock的非公平锁为例进行分析
2.4.1 加锁
在非公平锁中,有一段这样的代码:
static final class NonfairSync extends Sync { ... final void lock() { // 如果当前state为0,则拿到锁 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // 尝试获取锁 acquire(1); } ... } |
看一下AQS的Acquire是怎么写的:
public final void acquire(int arg) { // 尝试获取锁,如果获取锁失败,则将当前线程信息放入到内部队列中 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 手动中断线程 selfInterrupt(); } |
再看一下tryAcquire方法:
// AQS的tryAcquire,具体由下层实现,实现主要是对AQS中state变量以及OwnerThread一些操作 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } |
可以看出,这里只是AQS的简单实现,具体获取锁的实现方法是由各自的公平锁和非公平锁单独实现的(以ReentrantLock为例)。如果该方法返回了True,则说明当前线程获取锁成功,就不用往后执行了;如果获取失败,就需要加入到等待队列中。
2.4.2 解锁
在ReentrantLock中,解锁的代码:
public class ReentrantLock implements Lock, java.io.Serializable { ... public void unlock() { sync.release(1); } ... } |
非公平锁直接调AQS的release(int arg)方法
public final boolean release(int arg) { // 尝试释放锁 if (tryRelease(arg)) { Node h = head; // 判断头部节点是否处于被挂起状态 if (h != null && h.waitStatus != 0) // 唤醒线程 unparkSuccessor(h); return true; } return false; } |
再看一下tryRelease方法:
// AQS的tryRelease,具体由下层实现,实现主要是对AQS中state变量以及OwnerThread一些操作 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } |
2.4.3 总结
由上可得,继承AQS实现同步锁,只需要在下层实现方法中维护好锁的状态,锁的双端双向队列、唤醒线程等逻辑AQS都已经在框架中实现了。
3.通过AbstractQueuedSynchronizer创建简单的同步锁
public class CustomLock implements Lock { private Sync sync = new Sync(); @Override public void lock() { System.out.println(Thread.currentThread().getName() + "===尝试获取锁==="); sync.acquire(1); System.out.println(Thread.currentThread().getName() + "===已经获取锁==="); } @Override public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } @Override public boolean tryLock() { return sync.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { System.out.println(Thread.currentThread().getName() + "===尝试释放锁==="); sync.release(1); System.out.println(Thread.currentThread().getName() + "====已经释放锁==="); } @Override public Condition newCondition() { return sync.newCondition(); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire(int arg) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int arg) { if (getState() == 0) { throw new IllegalMonitorStateException(); } setExclusiveOwnerThread(null); setState(0); return true; } @Override protected boolean isHeldExclusively() { return getState() == 1; } Condition newCondition() { return new ConditionObject(); } } } |
4.AQS其他的应用场景
除了上边ReentrantLock的可重入性的应用,AQS作为并发编程的框架,为很多其他同步工具提供了良好的解决方案。下面列出了JUC中的几种同步工具,大体介绍一下AQS的应用场景:
同步工具 | 同步工具与AQS的关联 |
---|---|
ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理 |
Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryReleaseShared会增加计数,acquireShared会减少计数 |
CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过 |
ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数 |
ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease) |