java进阶-7-D -多线程-Lock专题- AQS 之 acquire / acquireQueued
1.这个AQS里面最基础也是使用频率最高的一个方法 : acquire - 独占式获取锁
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在锁已经被占用的情况下会先执行addWaiter方法,插入一个Node到队尾
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
然后执行AcquireQueued方法,有2个目的 --- 1.通过一个简单的 && 拦截所有不是Head-Next的节点,保证在队列中的线程获取锁顺序的公平性 ---2.阻塞等待队列中的线程,让所有加入到等待队列中的线程进入到Park状态
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //满足上面说的第二点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire 和 parkAndCheckInterrupt 这2个东西要放到一起来看
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//从这里我们就知道: NodeA.status=SIGNAL表示 下一个 NodeA.next 是要被 PARK 的
return true; //true : 对就是你赶紧下车!!!
if (ws > 0) {
//就从不断循环,不断将 node.prev指向更前面没有Canceled的node,我们知道,这个地方是为
//了删除掉那些已经canceled的节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//能走到这 ,pred =0/ <-1 ,要是等于0 说明这是一个新加入的节点,那在给一次tryAcquire
//的机会, 要是<-1 那说明这个节点是 await进来 or 这是个共享传播锁,那必须在给一
//次 tryAcquire 机会
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false; //false : 表示本次不Park
}
No Picture no BB
通过一个真实的的案例来带我们理解这个acquire的工作:
- 实例化一个ReentrantLock的锁之后,开启10个线程,每个线程加一把锁并且执行的工作是休眠2分钟,这里还是要强调一下,同一把锁每次只能有一个线程占有,开10个线程,每60s开启一个,第一次启动,线程直接通过tryAcquire()获取到锁,执行休眠 ,再然后其他的线程会每隔60s进来,此时线程会尝试去拿锁,失败之后会进入到等待队列中,等待被唤醒!
- 上一步最后面说的尝试去拿锁是 通过tryAcquire方法,这个方法是一个在继承AQS抽象类之后自己主动去实现的类。在尝试拿锁失败之后就会去执行 acquireQueued(addWaiter(null),arg)
- 执行 acquireQueued(addWaiter(null),arg) 先要去执行addWaiter(null) 表示在线程获取锁失败之后要将Thread 包装成Node,然后存放到 等待队列的尾巴上,(不用担心等待队列是否为空,这个队列是个双向队列,保证了获取锁的顺序也就是公平性) 此时nodeA就会进入到acquireQueued() 方法中
- 此时acquireQueued 拿到了已经放到尾巴上的nodeA 然后进行自旋{ 先去把这个nodeA 的前驱节点nodePre拿到 ,看一下前驱节点是头节点吗,是 -- 那就再次尝试获取锁。【拿到了那就释放掉这个nodeA,over /没拿到那就继续往下看】 ,不是那就继续执行下面的判断 shouldParkAfterFailedAcquire(nodePre,nodeA), && ParkAndCheckInterrupt() .
- 进入到shouldParkAfterFailedAcquire(nodePre,nodeA) 方法,此时nodePre是head ,waitStatus int 类型 默认值是0。0的情况会进入到if的第三个判断,里面的操作是将nodePre的waitStatus 设置成 -1,返回false,这样在acquireQueued 的自旋进行第二次的时候,判断nodePre的waitStatus = -1,返回true,会调用 ParkAndCheckInterrupt() , 可以防止一直运行自旋,消耗资源。此时就算是已经将任务提交到等待队列中了。
- 要是这个时候又有新的线程进入, 就是去重复 2~5 的动作。 一个Lock使用完之后会调用unlock方法,这就是将等待队列中的Park状态给打断 ,此时队列中Head -Next ->nodeB 就可能tryAcquire 成功,然后nodeB-Next->nodeC ,nodeC 就成为Head-Next->nodeC ,就可以在下一次锁竞争中 tryAcquire 了。 基本上就是这个样子了
主要的东西都已经说明了,在理解的过程中碰到的几个问题
- Node的waitStatus没有设置初始值吗?原来时int类型 默认未0
- 在AcquireQueued方法的自旋中除了前置Node被cancel掉,否者每次检查其实都是在检查自身和前一个,这个动作不会移动
2.讲了acquire 那可以顺提一下release 方法
在理解了acquire的实现方式之后,在理解release 就非常简单了:
在调用unLock之后当前占用的线程表示占用结束,下一个线程可以去竞争锁了,在公平锁的情况下,锁的释放过程 就是一个将等待队列的Head-Next的nodeA进行 unPark,waitStatus=0(unparkSuccessor),并且将Head-Next 指向 nodeA-Next = nodeB ,这个时候 要是nodeA 执行tryAcquire成功,就会执行nodeA的线程,然后将nodeA这个节点的和 prev=null,thread=null (acquireQueued) ,简单吧,共享式的有细微的差别,唤醒动作是共享传播式