ReentrantLock源码
Lock的关系图,用的比较多的是ReentrantLock,下面介绍ReentrantLock
ReentrantLock含有三个内部类:Sync,NonfairSync,FairSync. Sync继承自AbstractQueuedSynchronizer,内部具有ConditionObject类(实现了Condition接口)
默认使用NonfairSync。可以通过构造器传参boolean,决定使用哪一个。
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } /** * Sync object for fair locks */ static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }公平锁和非公平锁只是在lock()和tryAcquire()不一样。
下面是ReentrantLock获取锁的的方法:
lock():
tryLock():可轮询的锁请求
tryLock(long timeout,TimeUnit unit);可定时的锁请求
lockInterruptibly():可中断的锁请求
public void lock() { sync.lock(); }默认是非公平锁,所以调用下面这个方法
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
看看compareAndSetState()方法,调用了一个native方法,
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
其中,stateOffset 是AbstractQueuedSynchronizer内部定义的一个状态偏移量,用来得到内存值,AbstractQueuedSynchronizer是线程的竞态条件,所以只要某一个线程CAS改变状态成功,同时在没有释放的情况下,其他线程必然失败.
private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long stateOffset; private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } }对于竞争成功的线程会调用 setExclusiveOwnerThread方法:
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; }设置当前线程是拥有独占访问权限的线程.
竞争失败的线程,会调用acquire(1)方法:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();//没有获取到锁,而且 }tryAquire(arg)调用的是NonfairSync的tryAquire()方法:
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }nonfairTryAcquire方法主要是做重入锁的实现,synchronized本身支持锁的重入,而ReentrantLock则是通过此处实现。在锁状态c为0时,重新尝试获取锁。如果已经被占用,那么做一次是否当前线程为占用锁的线程的判断,如果是一样的那么进行计数,当然在锁的relase过程中会进行递减,保证锁的正常释放。
如果没有重新获取到锁或者锁的占用线程和当前线程是一个线程,方法返回false。那么把线程添加到等待队列中,调用addWaiter:
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }队列尾节点不为null,使用CAS更新尾节点,如果更新不成功,进入enq方法的无限循环.
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize,可能队列已经为空了 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //node的前一个节点 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //线程挂起等待中断 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
private static boolean shouldParkAfterFailedAcquire(AbstractQueuedSynchronizer.Node var0, AbstractQueuedSynchronizer.Node var1) { int var2 = var0.waitStatus; if(var2 == -1) { //-1代表这个节点的继任节点被阻塞了,需要通知 return true; } else { if(var2 > 0) { do { var1.prev = var0 = var0.prev; } while(var0.waitStatus > 0); var0.next = var1; } else { compareAndSetWaitStatus(var0, var2, -1); } return false; } }
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); //在许可可用之前,禁用当前线程. return Thread.interrupted();//返回线程是否被中断.假如当前的中断标志为true,则调完后会将中断标志位设置成false }
static void selfInterrupt() { Thread.currentThread().interrupt(); }