lock体系

之前本来在写完多线程之后就写lock体系的,可以作以对比,但之后又写了其他知识的博客,一直拖到今天才写lock相关的知识,大家在看lock锁的时候,可以对比着看内建锁,对比对比lock锁和内建锁的优缺点。
这是我那篇关于内建锁的博客链接:https://blog.****.net/huaijiu123/article/details/85242821

lock在Java的util包下

lock体系下的线程的阻塞与唤醒,不再用内建锁和monitor机制了,而是用lock和unlock。

1. lock锁和内建锁最大的区别就是:内建锁是隐式的加减锁,而lock是显示的加减锁

public class Test {
    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        try{
            lock.lock();
            //以下代码被锁住,同一时刻只能一个线程可以运行
        }finally {
            lock.unlock();
        }
    }
}

解锁操作必须放在finally中,保证无论是否发生异常,都会释放锁。

下来我们看一个例子:还是卖票,这次是用lock加锁

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class MyThread implements Runnable{
    private int ticket = 500;
    private Lock lock = new ReentrantLock();
    @Override
    public void run() {
        for(int i = 0; i < 500; i++){
            try{
                lock.lock();
                if(ticket > 0){
                    System.out.println(Thread.currentThread().getName()+"还剩"+ticket--+"票");
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }finally {
                lock.unlock();
            }
        }
    }
}

public class Test {
    public static void main(String[] args) {
        MyThread mt = new MyThread();
        Thread th1 = new Thread(mt,"A");
        Thread th2 = new Thread(mt,"B");
        Thread th3 = new Thread(mt,"C");
        th1.start();
        th2.start();
        th3.start();
    }
}

lock体系
内建锁与lock锁的区别如果将lock换成synchronized的话,run()方法,同一时刻是可以多个线程同时进去的,这就是同一时刻多个线程竞争同一把锁,是重量级锁,其他线程进入阻塞状态,而lock锁,会使其他线程进入自旋状态。

lock锁除了用于内建锁的所有特性外,还拥有内建锁不具备的特性,拥有可中断的获取锁以及超时获取锁以及共享锁。

2. lock常用的接口

void lock();  //获取锁
void lockInerruptibly() throws InterruptedException();  //响应中断锁
boolean tryLock();     //获取到锁返回true,反之返回false
boolean tryLock(long time, TimeUnit unit);   //超时获取锁,在规定时间内未获取到锁返回false
Condition newCondition();    //获取与lock绑定的等待通知组件
void unlock();   //释放锁

通过观察ReentrantLock源码我们可以发现,ReentrantLock中所有的方法实际上都是调用了其静态内部类Sync中的方法,而Sync继承了AbstractQueuedSynchronizer(AQS–简称同步器)

那,AQS到底是什么呢
同步器是用来构建锁以及其他同步组件的基础框架,它的实现主要是依赖一个int状态变量以及通过一个FIFO队列共同构成同步队列。

子类必须重写AQS的用protected修饰的用来改变同步状态的方法,其他方法主要是实现了排队与阻塞机制。int状态的更新使用getState()、setState()以及compareAndSetState()。

子类推荐使用静态内部类来继承AQS实现自己的同步语义。同步器既支持独占锁,也支持共享锁。

3. 那lock与AQS到底什么关系呢?

lock—面向使用者,定义了使用者与锁交互的接口。

AQS—面向锁的实现者,屏蔽了同步状态的管理、线程排队、线程等待与唤醒等底层操作。

4. AQS的模板模式

AQS使用模板方法模式,将一些与状态有关的核心方法开放给子类重写,而后AQS会使用子类重写的关于状态的方法进行线程的排队、阻塞以及唤醒等操作。

5. AQS详解

在同步组件(锁)中,AQS是最核心的部分,同步组件的实现依赖AQS提供的模板方法来实现同步组件语义(也就是说什么时候获取锁了)。

AQS实现了对同步状态的管理,以及对阻塞线程进行排队、等待通知等等底层操作。

AQS核心组成:同步队列、独占锁的获取与释放、共享锁的获取与释放、可中断锁、超时锁。这一系列功能的实现依赖于AQS提供的模板方法。

独占锁

1. void acquire(int arg)  : 独占式获取同步状态,如果获取失败插入同步队列进行等待
2. void acquireInterruptibly(int arg)  :在1的基础上,此方法可以在同步队列中响应中断
3. boolean tryAcquireNanos(int arg, long nanosTimeOut) :在2的基础上增加了超时等待功能,到了预计时间还未获得锁直接返回
4. boolean tryAcquire(int arg) :获取锁成功返回true,否则返回false
5. boolean release(int arg) :释放同步状态,该方法会唤醒在同步队列的下一个节点

共享式锁

1. void acquireShared(int arg) :共享获取同步状态,同一时刻多个线程获取同步状态
2. void acquireSharedInterruptibly(int arg) :在1的基础上增加响应中断
3. boolean tryAcquireSharedNanos(int arg, long nanosTimeOut) :在2的基础上增加超时等待
4. boolean releaseShared(int arg) :共享式释放同步状态

同步队列
我们可以通过debug lock锁的多线程会发现同步队列实际上是一个带有头尾节点的双向链表。

AQS源码
lock体系
通过观察源码我们也可以发现在AQS内部有一个静态内部类Node,这是同步队列中每个具体的节点

节点属性有:

int waitStatus:节点状态
Node prev:同步队列中的前驱节点
Node next:同步队列中的后继节点
Thread thread:当前节点包装的线程对象
Node nextWaiter:等待队列中的下一个节点

节点状态有:

int INITIAL = 0;//初始状态
int CANCELLED = 1;   //当前节点从同步队列中取消
int SIGNAL = -1; //当前节点的后继节点处于阻塞(wait)状态。如果当前节点释放同步状态会通知后继节点,使后继节点继续运行。
int CONDITION = -2;   //节点处于等待队列中。当其他线程对Condition调用signal()方法后,该节点会从等待队列移到同步队列中
int PROPAGATE = -3;  //共享式同步状态会无条件的传播

AQS是由一个int型状态变量以及同步队列组成的

独占锁的获取:acquire(int arg)

获取锁失败后调用AQS提供的acquire(int arg) 模版方法

acquire()源码

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
tryAcquire(arg):再次尝试获取同步状态,成功直接方法退出,失败调用addWaiter(Node.EXCLUSIVE, arg)
addWaiter(Node.EXCLUSIVE, arg):将当前线程以指定模式(独占式、共享式)封装为Node节点后置入同步队列

addWaiter源码

   private Node addWaiter(Node mode) {
   		//将线程以指定模式封装为Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // 获取当前队列的尾节点
        Node pred = tail;
        //若尾节点不为空
        if (pred != null) {
            node.prev = pred;
            //使用CAS将当前节点尾插到同步队列中
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                //CAS尾插成功,返回当前Node节点
                return node;
            }
        }
        //尾节点为空或CAS尾插失败
        enq(node);
        return node;
    }
enq(Node node):当前队列为空或者CAS尾插失败调用此方法来初始化队列或不断尝试尾插入队列中

enq源码

 private Node enq(final Node node) {
 		//死循环,直到将当前节点插入同步队列成功为止
        for (;;) {
            Node t = tail;
            //初始化同步队列
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
            	//不断CAS将当前节点为插入同步队列中
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
acquireQueued:
1. 如果当前节点前驱节点为头节点并且再次获取同步状态成功,当前线程成功获得锁,方法执行退出
2. 如果获取锁失败,先不断自旋将前驱节点状态置为SIGNAL,之后调用LockSupport.park()方法将当前线程阻塞

acquireQueued源码:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //自旋
            for (;;) {
            	//获取当前节点的前驱节点
                final Node p = node.predecessor();
                //获取同步状态成功条件
                //当前节点前驱节点为头结点并且再次获取同步状态成功
                if (p == head && tryAcquire(arg)) {
                	//将当前节点设置为头节点
                    setHead(node);
                    //删除原来的头节点
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        	//获取失败将当前节点取消
            if (failed)
                cancelAcquire(node);
        }
    }
setHead方法:将当前节点设置为头节点,删除原有头节点

setHead源码:

    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

如果获取同步状态失败,则调用shouldParkAfterFailedAcquire方法

shouldParkAfterFailedAcquire:此方法主要逻辑是使用CAS将前驱节点状态置为SIGNAL,表示需要将当前节点阻塞。如果CAS失败,不断自旋直到前驱节点状态置为SIGNAL为止,表示当前节点需要阻塞。

shouldParkAfterFailedAcquire源码:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    	//获取前驱节点的节点状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)     //ws的状态为-1----也就是说当前节点的后继节点处于阻塞状态
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
         //前驱节点已被取消
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             //一直向前走,直到找到一个前驱节点状态不为取消状态
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             //前驱节点状态不是取消状态时,将前驱节点状态置为-1,表示后继节点(当前节点)应该处于阻塞状态
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
parkAndCheckInterrupt:调用LockSupport.park(this)将当前线程阻塞

parkAndCheckInterrupt源码:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

上边这些就是独占锁的获取流程。
我们可以画一下它的流程图,这样看起来更清晰些,再对应具体的源码剖析就更容易理解些了。
lock体系
独占锁的释放:release()
unlock()方法实际上是调用AQS提供的release()模板方法

release方法源码:

    public final boolean release(int arg) {
    	//释放状态成功后
        if (tryRelease(arg)) {
        	//获取当前同步队列的头节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
            	//唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

release方法是unlock方法的具体实现。首先获取头节点的后继节点,当后继节点不为null时,会调用LockSupport.unpark()方法唤醒后继节点包装的线程。因此,每一次锁释放后就会唤醒队列中该节点的后继节点所包装的线程。

独占锁的获取与释放总结

  1. 线程获取锁失败,将线程调用addWaiter()封装成Node节点进行入队操作。addWaiter()中方法enq()完成对同步队列的头节点初始化以及CAS尾插失败后的重试处理。
  2. 入队之后排队获取锁的核心方法acquireQueued(),节点排队获取锁是一个自旋过程。 当且仅当当前节点的前驱节点为头节点并且成功获取同步状态时,节点出队并且该节点引用的线程获取到锁。
    否则,不满足条件时会不断自旋将前驱节点的状态置为SIGNAL而后调用LockSupport.park()将当前线程阻塞。
  3. 释放锁时会唤醒后继节点(后继节点不为null)

独占锁的特性

获取锁时响应中断
获取锁响应中断原理与acquire()几乎一样,唯一区别在于当parkAndCheckInterrupt()返回true时也就是说线程阻塞时被中断,抛出中断异常后线程退出。
而acquire()方法在阻塞时中断只会返回true,不会做出任何响应。

超时等待获取锁
tryAcquireNanos(),该方法在三种情况下会返回:

  1. 在超时时间内,当前线程成功获取到锁
  2. 当前线程在超时时间内被中断
  3. 超时时间结束,仍未获得锁返回false

超时获取锁逻辑与可中断获取锁基本一致,唯一区别在于获取锁失败后,增加了一个时间处理,如果当前时间超过截至时间,线程不再等待,直接退出,返回false。否则将线程阻塞置为等待状态排队获取锁。