java锁里的AQS源码解析

AQS是current包下面的java.util.concurrent.locks.AbstractQueuedSynchronizer(抽象队列同步器),ReentrantLock(重入锁)、ReentrantReadWriteLock(读写锁)都是基于这个类来实现的。作用就是将对共享资源的访问进行控制。
这个类里的锁有两种:共享锁和独占锁。
共享锁之间可以共享资源,比如读写锁里,读与读之间是不互斥的
独占锁是只能独自占有资源,比如读写锁里的写锁,写与写互斥,写与读也互斥。
对应的主要操作就是 共享锁的获取与释放、独占锁的获取和释放。所以这个类将具体的竞争和释放逻辑由子类去实现,将公用的逻辑抽象了出来。包括:1、竞争失败后的放入队列并线程阻塞的逻辑;2、释放锁后的唤起下一个竞争线程的逻辑

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }  

独占锁释放:

public final boolean release(int arg) {
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
   }

共享锁获取:

public final void acquireShared(int arg) {
       if (tryAcquireShared(arg) < 0)
           doAcquireShared(arg);
   }

共享锁释放:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

下面根据源码一个一个解析:
首先要先了解下这个类的实现思路,多个线程访问一个共享资源时,需要保存这些线程的访问顺序,而FIFO队列可以满足这个特性,每个线程来获取锁时,需要知道当前资源的占用的情况,比如被多少共享锁占用,有没有独占所在占用、是否空闲。那么就需要有个标志属性,那么用一个实例变量state标识,这个state会被多线程改动,需要保证并发安全,于是用到volatile修饰,并且修改操作用cas实现,那么大体的思路就如下图,一个双向链表+state标识:
java锁里的AQS源码解析

解析独占锁获取源码
public final void acquire(int arg) {
		//先尝试竞争一次独占锁,这个逻辑由具体的子类去实现,  
		//比如ReentrantLock里有公平锁和非公平锁两个子类,各自都实现了这个方法
		//公平锁和非公平锁里的逻辑区别就是 在竞争前查看下是否已经有等待的线程  
		//非公平锁:不管有没有已经在等待的线程,直接尝试获取  
		//公平锁:如果有已经等待的线程,则放弃竞争
        if (!tryAcquire(arg) &&       //1
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            //2
            selfInterrupt();  //3
    }  

第二行解析: 这个是核心方法,线程的阻塞逻辑也在这个方法里。

final boolean acquireQueued(final Node node, int arg) {
       boolean failed = true;
       try {
           boolean interrupted = false;
           for (;;) {//循环
           //先获取当前节点的前一个节点
               final Node p = node.predecessor();
               //如果前一个节点就是头节点(当前资源的占有者),则直接尝试竞争,  
               //tryAcquire是子类实现的逻辑,这里不做描述,就是尝试竞争一次锁
               if (p == head && tryAcquire(arg)) {
               //竞争成功,设置当前节点为头节点
                   setHead(node);
                   p.next = null; // help GC
                   failed = false;
                   return interrupted;
               }
               //shouldParkAfterFailedAcquire主要作用有:
               //1、修改node节点的state(标识当前节点的状态,包括是否已经取消竞争、等待竞争、无效等)
               //2、整理链表,如果发现是已放弃竞争的节点,则需要将这些节点从链表中剔除
               if (shouldParkAfterFailedAcquire(p, node) &&
                   parkAndCheckInterrupt())  //通过LockSupport.park方法阻塞当前线程,对应的唤醒方法是LockSupport.unpark
                   interrupted = true;
           }
       } finally {
           if (failed)
               cancelAcquire(node);
       }
   }
独占锁释放:
public final boolean release(int arg) {
//该方法由子类实现
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)//0表示节点已失效
           //
               unparkSuccessor(h);
           return true;
       }
       return false;
   }

unparkSuccessor方法单独讲解:

private void unparkSuccessor(Node node) {
       /*
        * If status is negative (i.e., possibly needing signal) try
        * to clear in anticipation of signalling.  It is OK if this
        * fails or if status is changed by waiting thread.
        */
       int ws = node.waitStatus;
       if (ws < 0)
           compareAndSetWaitStatus(node, ws, 0);

       /*
        * Thread to unpark is held in successor, which is normally
        * just the next node.  But if cancelled or apparently null,
        * traverse backwards from tail to find the actual
        * non-cancelled successor.
        */
       Node s = node.next;
       if (s == null || s.waitStatus > 0) {//大于0表示当前节点是放弃竞争的节点,需要向下遍历到有效节点
           s = null;
           //这个地方为什么要从尾部往前遍历?
           //原因是放弃竞争的方法是cancelAcquire,这个方法里为了便于系统GC,  
           //将放弃竞争的节点的next指针指向了自己。node.next = node; // help GC
           for (Node t = tail; t != null && t != node; t = t.prev)
               if (t.waitStatus <= 0)
                   s = t;
       }
       if (s != null)
           LockSupport.unpark(s.thread);//唤起线程
   }
共享锁获取:
public final void acquireShared(int arg) {
//获取共享锁,如果失败则加入到队列中并阻塞
       if (tryAcquireShared(arg) < 0)
           doAcquireShared(arg);
   }

doAcquireShared方法解析:作用是将当前线程放到阻塞队列的尾部并阻塞

private void doAcquireShared(int arg) {
//将当前线程包装成一个共享节点,放到等待队列的尾部中
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
            //获取前一个节点,如果前一个节点就是头节点,则直接竞争一次
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                    //这个方法单独拿出来解析
                        setHeadAndPropagate(node, r);   //2
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //这个方法上面已经解析,主要是整理队列及修改节点状态
			   if (shouldParkAfterFailedAcquire(p, node) &&    // 1
                    parkAndCheckInterrupt())//阻塞线程
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())//如果后续节点仍然是共享节点,则都唤醒
                doReleaseShared();//共享锁释放逻辑,下面会解析
        }
    }
共享锁释放:
private void doReleaseShared() {
       /*
        * Ensure that a release propagates, even if there are other
        * in-progress acquires/releases.  This proceeds in the usual
        * way of trying to unparkSuccessor of head if it needs
        * signal. But if it does not, status is set to PROPAGATE to
        * ensure that upon release, propagation continues.
        * Additionally, we must loop in case a new node is added
        * while we are doing this. Also, unlike other uses of
        * unparkSuccessor, we need to know if CAS to reset status
        * fails, if so rechecking.
        */
       for (;;) {
           Node h = head;
           if (h != null && h != tail) {
               int ws = h.waitStatus;
               if (ws == Node.SIGNAL) {
                   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//设置节点状态为无效
                       continue;            // loop to recheck cases
   				////唤醒节点
   				// 线程被唤醒后,会从doAcquireShared方法的代码1处继续执行,
   				//然后会执行到代码2处,会依次将所有的共享锁节点给唤醒
                   unparkSuccessor(h); 
               }
               else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                   continue;                // loop on failed CAS
           }
           if (h == head)                   // loop if head changed
               break;
       }
   }

以上是AQS四个重要方法的源码解析