java进阶-7-D -多线程-Lock专题- AQS 之 acquire / acquireQueued

1.这个AQS里面最基础也是使用频率最高的一个方法 : acquire  - 独占式获取锁

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

在锁已经被占用的情况下会先执行addWaiter方法,插入一个Node到队尾

 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

然后执行AcquireQueued方法,有2个目的 --- 1.通过一个简单的 && 拦截所有不是Head-Next的节点,保证在队列中的线程获取锁顺序的公平性     ---2.阻塞等待队列中的线程,让所有加入到等待队列中的线程进入到Park状态

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {    //满足上面说的第二点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire   和 parkAndCheckInterrupt  这2个东西要放到一起来看

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            //从这里我们就知道: NodeA.status=SIGNAL表示 下一个 NodeA.next 是要被 PARK 的
            return true;    //true : 对就是你赶紧下车!!!
        if (ws > 0) {
            //就从不断循环,不断将 node.prev指向更前面没有Canceled的node,我们知道,这个地方是为 
           //了删除掉那些已经canceled的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //能走到这 ,pred =0/ <-1 ,要是等于0 说明这是一个新加入的节点,那在给一次tryAcquire 
            //的机会, 要是<-1 那说明这个节点是 await进来 or 这是个共享传播锁,那必须在给一 
            //次  tryAcquire  机会
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;    //false : 表示本次不Park 
    }

 

No Picture no BB

java进阶-7-D -多线程-Lock专题- AQS 之 acquire / acquireQueued

通过一个真实的的案例来带我们理解这个acquire的工作:

  1.  实例化一个ReentrantLock的锁之后,开启10个线程,每个线程加一把锁并且执行的工作是休眠2分钟,这里还是要强调一下,同一把锁每次只能有一个线程占有,开10个线程,每60s开启一个,第一次启动,线程直接通过tryAcquire()获取到锁,执行休眠  ,再然后其他的线程会每隔60s进来,此时线程会尝试去拿锁,失败之后会进入到等待队列中,等待被唤醒!
  2. 上一步最后面说的尝试去拿锁是 通过tryAcquire方法,这个方法是一个在继承AQS抽象类之后自己主动去实现的类。在尝试拿锁失败之后就会去执行 acquireQueued(addWaiter(null),arg) 
  3. 执行 acquireQueued(addWaiter(null),arg) 先要去执行addWaiter(null)  表示在线程获取锁失败之后要将Thread 包装成Node,然后存放到 等待队列的尾巴上,(不用担心等待队列是否为空,这个队列是个双向队列,保证了获取锁的顺序也就是公平性) 此时nodeA就会进入到acquireQueued() 方法中
  4. 此时acquireQueued 拿到了已经放到尾巴上的nodeA 然后进行自旋{  先去把这个nodeA 的前驱节点nodePre拿到 ,看一下前驱节点是头节点吗,是 -- 那就再次尝试获取锁。【拿到了那就释放掉这个nodeA,over /没拿到那就继续往下看】 ,不是那就继续执行下面的判断 shouldParkAfterFailedAcquire(nodePre,nodeA), && ParkAndCheckInterrupt() .
  5. 进入到shouldParkAfterFailedAcquire(nodePre,nodeA) 方法,此时nodePre是head ,waitStatus  int 类型 默认值是0。0的情况会进入到if的第三个判断,里面的操作是将nodePre的waitStatus 设置成 -1,返回false,这样在acquireQueued 的自旋进行第二次的时候,判断nodePre的waitStatus = -1,返回true,会调用 ParkAndCheckInterrupt() , 可以防止一直运行自旋,消耗资源。此时就算是已经将任务提交到等待队列中了。
  6. 要是这个时候又有新的线程进入, 就是去重复 2~5 的动作。     一个Lock使用完之后会调用unlock方法,这就是将等待队列中的Park状态给打断 ,此时队列中Head -Next ->nodeB 就可能tryAcquire 成功,然后nodeB-Next->nodeC ,nodeC 就成为Head-Next->nodeC ,就可以在下一次锁竞争中 tryAcquire 了。   基本上就是这个样子了

 

 

主要的东西都已经说明了,在理解的过程中碰到的几个问题

  • Node的waitStatus没有设置初始值吗?原来时int类型 默认未0
  • 在AcquireQueued方法的自旋中除了前置Node被cancel掉,否者每次检查其实都是在检查自身和前一个,这个动作不会移动

 

2.讲了acquire 那可以顺提一下release 方法

在理解了acquire的实现方式之后,在理解release 就非常简单了:

在调用unLock之后当前占用的线程表示占用结束,下一个线程可以去竞争锁了,在公平锁的情况下,锁的释放过程 就是一个将等待队列的Head-Next的nodeA进行 unPark,waitStatus=0(unparkSuccessor),并且将Head-Next 指向 nodeA-Next  = nodeB  ,这个时候 要是nodeA 执行tryAcquire成功,就会执行nodeA的线程,然后将nodeA这个节点的和 prev=null,thread=null (acquireQueued) ,简单吧,共享式的有细微的差别,唤醒动作是共享传播式