深入学习java源码之ReentrantLock.newCondition()与ReentrantLock.lock()
深入学习java源码之ReentrantLock.newCondition()与ReentrantLock.lock()
Java中通常实现锁有两种方式,一种是synchronized关键字,另一种是Lock。
对于使用者的直观体验上Lock是比较复杂的,需要lock和realse,如果忘记释放锁就会产生死锁的问题,所以,通常需要在finally中进行锁的释放。但是synchronized的使用十分简单,只需要对自己的方法或者关注的同步对象或类使用synchronized关键字即可。但是对于锁的粒度控制比较粗,同时对于实现一些锁的状态的转移比较困难。
在JDK1.5之后synchronized引入了偏向锁,轻量级锁和重量级锁,从而大大的提高了synchronized的性能,Lock的实现主要有ReentrantLock、ReadLock和WriteLock,后两者接触的不多。
重入锁(ReentrantLock)是一种递归无阻塞的同步机制。以前一直认为它是synchronized的简单替代,而且实现机制也不相差太远。不过最近实践过程中发现它们之间还是有着天壤之别。
以下是官方说明:一个可重入的互斥锁定 Lock,它具有与使用 synchronized 方法和语句所访问的隐式监视器锁定相同的一些基本行为和语义,但功能更强大。ReentrantLock 将由最近成功获得锁定,并且还没有释放该锁定的线程所拥有。当锁定没有被另一个线程所拥有时,调用 lock 的线程将成功获取该锁定并返回。如果当前线程已经拥有该锁定,此方法将立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法来检查此情况是否发生。
AbstractQueuedSynchronizer 是一个抽象类,所以在使用这个同步器的时候,需要通过自己实现预期的逻辑,Sync、FairSync和NonfairSync都是ReentrantLock为了实现自己的需求而实现的内部类,之所以做成内部类,我认为是只在ReentrantLock使用上述几个类,在外部没有使用到。
它提供了lock()方法:
如果该锁定没有被另一个线程保持,则获取该锁定并立即返回,将锁定的保持计数设置为 1。
如果当前线程已经保持该锁定,则将保持计数加 1,并且该方法立即返回。
如果该锁定被另一个线程保持,则出于线程调度的目的,禁用当前线程,并且在获得锁定之前,该线程将一直处于休眠状态,此时锁定保持计数被设置为 1。
public void lock() {
sync.lock();
}
sync的实现是NonfairSync,所以调用的是NonfairSync的lock方法:
/**
* Sync object for non-fair locks
* tips:调用Lock的时候,尝试获取锁,这里采用的CAS去尝试获取锁,如果获取锁成功
* 那么,当前线程获取到锁,如果失败,调用acquire处理。
*
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
接下来看看compareAndSetState方法是怎么进行锁的获取操作的:
/**
* tips: 1.compareAndSetState的实现主要是通过Unsafe类实现的。
* 2.之所以命名为Unsafe,是因为这个类对于JVM来说是不安全的,我们平时也是使用不了这个类的。
* 3.Unsafe类内封装了一些可以直接操作指定内存位置的接口,是不是感觉和C有点像了?
* 4.Unsafe类封装了CAS操作,来达到乐观的锁的争抢的效果
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
stateOffset 是AbstractQueuedSynchronizer内部定义的一个状态量,AbstractQueuedSynchronizer是线程的竞态条件,所以只要某一个线程CAS改变状态成功,同时在没有释放的情况下,其他线程必然失败。
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
//这个方法很有意思,主要的意思是获取AbstractQueuedSynchronizer的state成员的偏移量
//通过这个偏移量来更新state成员,另外state是volatile的来保证可见性。
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
对于竞争成功的线程会调用 setExclusiveOwnerThread方法:
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}
这个实现是比较简单的,只是获取当前线程的引用,令AbstractOwnableSynchronizer中的exclusiveOwnerThread引用到当前线程。竞争失败的线程,会调用acquire方法,这个方法也是ReentrantLock设计的精华之处:
/**
* tips:此处主要是处理没有获取到锁的线程
* tryAcquire:重新进行一次锁获取和进行锁重入的处理。
* addWaiter:将线程添加到等待队列中。
* acquireQueued:自旋获取锁。
* selfInterrupt:中断线程。
* 三个条件的关系为and,如果 acquireQueued返回true,那么线程被中断selfInterrupt会中断线程
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
AbstractQueuedSynchronizer为抽象方法,调用tryAcquire时,调用的为NonfairSync的tryAcquire。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire方法主要是做重入锁的实现,synchronized本身支持锁的重入,而ReentrantLock则是通过此处实现。在锁状态为0时,重新尝试获取锁。如果已经被占用,那么做一次是否当前线程为占用锁的线程的判断,如果是一样的那么进行计数,当然在锁的relase过程中会进行递减,保证锁的正常释放。
如果没有重新获取到锁或者锁的占用线程和当前线程是一个线程,方法返回false。那么把线程添加到等待队列中,调用addWaiter:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这里主要是用当前线程构建一个Node的等待队列双向链表,这里addWaiter中和enq中的部分逻辑是重复的,可能是如果能一次成功就避免了enq中的死循环。因为tail节点是volatile的同时node也是不会发生竞争的所以node.prev = pred;是安全的。但是tail的next是不断竞争的,所以利用compareAndSetTail保证操作的串行化。
Node节点线程的自旋过程,自旋过程主要检查当前节点是不是head节点的next节点,如果是,则尝试获取锁,如果获取成功,那么释放当前节点,同时返回。至此一个非公平锁的锁获取过程结束。
如果这里一直不断的循环检查,其实是很耗费性能的,JDK的实现肯定不会这么“弱智”,所以有了shouldParkAfterFailedAcquire和parkAndCheckInterrupt,这两个方法就实现了线程的等待从而避免无限的轮询:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
首先,检查一下当前Node的前置节点pred是否是SIGNAL,如果是SIGNAL,那么证明前置Node的线程已经Park了,如果waitStatus>0,那么当前节点已经Concel或者中断。那么不断调整当前节点的前置节点,将已经Concel的和已经中断的线程移除队列。如果waitStatus<0,那么设置waitStatus为SIGNAL,因为调用shouldParkAfterFailedAcquire的方法为死循环调用,所以终将返回true。接下来看parkAndCheckInterrupt方法,当shouldParkAfterFailedAcquire返回True的时候执行parkAndCheckInterrupt方法:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
此方法比较简单,其实就是使当前的线程park,即暂停了线程的轮询。当Unlock时会做后续节点的Unpark唤醒线程继续争抢锁。
接下来看一下锁的释放过程,锁释放主要是通过unlock方法实现:
public void unlock() {
sync.release(1);
}
主要是调用AbstractQueuedSynchronizer同步器的release方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease方法为ReentrantLock中的Sync的tryRelease方法:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryRelease方法主要是做了一个释放锁的过程,将同步状态state -1,直到减到0为止,这主要是兼容重入锁设计的,同时setExclusiveOwnerThread(null)清除当前占用的线程。这些head节点后的线程和新进的线程就可以开始争抢。这里需要注意的是对于同步队列中的线程来说在setState(c),且c为0的时候,同步队列中的线程是没有竞争锁的,因为线程被park了还没有唤醒。但是此时对于新进入的线程是有机会获取到锁的。
下面代码是进行线程的唤醒:
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
因为在setState(c)释放了锁之后,是没有线程竞争的,所以head是当前的head节点,先检查当前的Node是否合法,如果合法则unpark it。开始锁的获取。就回到了上面的for循环执行获取锁逻辑:
至此锁的释放就结束了,可以看到ReentrantLock是一个不断的循环的状态模型,里面有很多东西值得我们学习和思考。
ReentrantLock具有公平和非公平两种模式,也各有优缺点:
公平锁是严格的以FIFO的方式进行锁的竞争,但是非公平锁是无序的锁竞争,刚释放锁的线程很大程度上能比较快的获取到锁,队列中的线程只能等待,所以非公平锁可能会有“饥饿”的问题。但是重复的锁获取能减小线程之间的切换,而公平锁则是严格的线程切换,这样对操作系统的影响是比较大的,所以非公平锁的吞吐量是大于公平锁的,这也是为什么JDK将非公平锁作为默认的实现。
CAS
在学习java.util.concurrent(简称JUC)包下的类时,了解到了CAS这个概念,整个JUC包的基础也是CAS,ReentrantLock也是基于它的。学习CAS,先从synchronized关键字说起,synchronized关键字能保证最基本的互斥同步。同步是指在多个线程并发访问共享数据时,保证共享数据在同一个时刻只被一个线程使用。而互斥是实现同步的一种手段。互斥同步4个字,互斥是因,同步是果;互斥是方法,同步是目的。互斥同步属于一种悲观的并发政策,认为如果不进行正确的同步措施,那就好出现问题,无论共享数据是否真会出现竞争,它都要进行加锁。
随着硬件指令集的发展,多了一个选择:基于冲突检测的乐观并发政策,先进性操作,如果没有其他线程共享数据,则操作成功;如果共享数据有争用,产生冲突,那就采取其他措施(不断重试),这种乐观的并发政策的许多实现不用把线程挂起。乐观并发政策需要操作和冲突检测两个步骤具有原子性,需要底层硬件完成。
x86中通过cmpxchg汇编指令来完成CAS操作。CAS(Compare-and-Swap 比较和交换)与平台相关,它有三个操作数,内存位置值(V),旧的预期值(A),新值(B)。CAS指令执行时,当V=A时,处理器用B的值跟新V的值,否则不执行。上述的过程是一个原子操作。JDK1.5引入的CAS,它在sun.misc.Unsafe类里面方法提供。里面调用了Native方法。
下面给一个JUC包下面的Atomic类的部分源代码,执行自增操作。用到CAS,里面用到了循环一直判断。里面没有进行加锁处理。但是也有逻辑漏洞,在111和222如果其他线程被执行,获得V(V运来是A),将他修改为B,后来又修改会A,则执行222代码的时候认为V没有改变过,这就是“ABA”问题。
//该方法实现了i++的非阻塞的原子操作
public final int getAndIncrement() {
for (;;) { //循环,使用CAS的经典方式,这是实现non-blocking方式的代价
int current = get();//得到现在的值 111
int next = current + 1;//通过计算得到要赋予的新值
if (compareAndSet(current, next)) //关键点,调用CAS原子更新, 222
return current;
}
}
二.锁优化
JDK1.6中高校并发是一个重要改进。里面的给出的各种锁都是为了线程间更高效的共享数据。优化的方法有下面几种。这里的锁优化主要是针对synchronized关键字来说,它产生的是一种重量级的锁定(重量级的锁定不是用CAS),会有互斥,效率较低。而在JDK1.6后,引入的自旋锁,轻量级锁,偏向锁对互斥同步进行了优化,它们三种锁默认都是开启的。
1.锁消除,锁粗化。锁消除是判断当堆上的数据不会被其他线程访问到时,该线程上的同步加锁就无需进行。
由于加锁和解锁的开销很大,如果不断的加锁和解锁操作都是对于同一个对象,虚拟机会把整个加锁同步的范围扩张到操作序列的外部,就是只加一次锁。
2.自旋锁:互斥同步对性能最大的影响是阻塞的实现,挂起线程和恢复线程都需要转入内核中完成。现将本该要阻塞的线程不去挂起,不放弃处理器的执行时间,而是在那做一个忙循环(自旋),看看持有锁的线程是否很快释放锁。自旋的次数(循环的次数)是有限度的,默认是10次,如果没有获得锁就采用传统的方式去挂起线程。
3.轻量级锁:在线程没有竞争的时候,采用CAS操作,避免使用互斥量的开销。这里涉及到对象头的概念。
4.偏向锁:它相对于轻量级锁,减少了锁重入的开销,对于第一个获得锁的线程,后面的执行如果该锁没有被其他线程获取,则该线程将不再进行同步(CAS操作)。
轻量级锁和偏向锁都是在没有竞争的情况下出现,一旦出现竞争就会升级为重量级锁。
对于synchronized,锁的升级情况可能是 偏向锁—>轻量锁—>自适应自旋锁—>重量锁
java源码
Modifier and Type | Method and Description |
---|---|
int |
getHoldCount()
查询当前线程对此锁的暂停数量。 |
protected Thread |
getOwner()
返回当前拥有此锁的线程,如果不拥有,则返回 |
protected Collection<Thread> |
getQueuedThreads()
返回包含可能正在等待获取此锁的线程的集合。 |
int |
getQueueLength()
返回等待获取此锁的线程数的估计。 |
protected Collection<Thread> |
getWaitingThreads(Condition condition)
返回包含可能在与此锁相关联的给定条件下等待的线程的集合。 |
int |
getWaitQueueLength(Condition condition)
返回与此锁相关联的给定条件等待的线程数的估计。 |
boolean |
hasQueuedThread(Thread thread)
查询给定线程是否等待获取此锁。 |
boolean |
hasQueuedThreads()
查询是否有线程正在等待获取此锁。 |
boolean |
hasWaiters(Condition condition)
查询任何线程是否等待与此锁相关联的给定条件。 |
boolean |
isFair()
如果此锁的公平设置为true,则返回 |
boolean |
isHeldByCurrentThread()
查询此锁是否由当前线程持有。 |
boolean |
isLocked()
查询此锁是否由任何线程持有。 |
void |
lock()
获得锁。 |
void |
lockInterruptibly()
获取锁定,除非当前线程是 interrupted 。 |
Condition |
newCondition()
|
String |
toString()
返回一个标识此锁的字符串以及其锁定状态。 |
boolean |
tryLock()
只有在调用时它不被另一个线程占用才能获取锁。 |
boolean |
tryLock(long timeout, TimeUnit unit)
如果在给定的等待时间内没有被另一个线程 占用 ,并且当前线程尚未被 保留,则获取该锁( interrupted) 。 |
void |
unlock()
尝试释放此锁。 |
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Collection;
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public boolean isLocked() {
return sync.isLocked();
}
public final boolean isFair() {
return sync instanceof FairSync;
}
protected Thread getOwner() {
return sync.getOwner();
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}
Modifier and Type | Method and Description |
---|---|
void |
lock()
获得锁。 |
void |
lockInterruptibly()
获取锁定,除非当前线程是 interrupted 。 |
Condition |
newCondition()
返回一个新 |
boolean |
tryLock()
只有在调用时才可以获得锁。 |
boolean |
tryLock(long time, TimeUnit unit)
如果在给定的等待时间内是空闲的,并且当前的线程尚未得到 interrupted,则获取该锁。 |
void |
unlock()
释放锁。 |
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
Modifier and Type | Method and Description |
---|---|
void |
await()
导致当前线程等到发信号或 interrupted 。 |
boolean |
await(long time, TimeUnit unit)
使当前线程等待直到发出信号或中断,或指定的等待时间过去。 |
long |
awaitNanos(long nanosTimeout)
使当前线程等待直到发出信号或中断,或指定的等待时间过去。 |
void |
awaitUninterruptibly()
使当前线程等待直到发出信号。 |
boolean |
awaitUntil(Date deadline)
使当前线程等待直到发出信号或中断,或者指定的最后期限过去。 |
void |
signal()
唤醒一个等待线程。 |
void |
signalAll()
唤醒所有等待线程。 |
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.Date;
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
提供一个框架,用于实现依赖先进先出(FIFO)等待队列的阻塞锁和相关同步器(信号量,事件等)。 该类被设计为大多数类型的同步器的有用依据,这些同步器依赖于单个原子int
值来表示状态。 子类必须定义改变此状态的受保护方法,以及根据该对象被获取或释放来定义该状态的含义。 给定这些,这个类中的其他方法执行所有排队和阻塞机制。 子类可以保持其他状态字段,但只以原子方式更新int
使用方法操纵值getState()
, setState(int)
和compareAndSetState(int, int)
被跟踪相对于同步。
子类应定义为非公共内部助手类,用于实现其封闭类的同步属性。 AbstractQueuedSynchronizer
类不实现任何同步接口。 相反,它定义了一些方法,如acquireInterruptibly(int)
,可以通过具体的锁和相关同步器来调用适当履行其公共方法。
此类支持默认独占模式和共享模式。 当以独占模式获取时,尝试通过其他线程获取不能成功。 多线程获取的共享模式可能(但不需要)成功。 除了在机械意义上,这个类不理解这些差异,当共享模式获取成功时,下一个等待线程(如果存在)也必须确定它是否也可以获取。 在不同模式下等待的线程共享相同的FIFO队列。 通常,实现子类只支持这些模式之一,但是两者都可以在ReadWriteLock
中发挥作用 。 仅支持独占或仅共享模式的子类不需要定义支持未使用模式的方法。
这个类定义的嵌套AbstractQueuedSynchronizer.ConditionObject
可用于作为一类Condition
由子类支持独占模式用于该方法的实施isHeldExclusively()
份报告是否同步排他相对于保持在当前线程,方法release(int)
与当前调用getState()
值完全释放此目的,和acquire(int)
,给定此保存的状态值,最终将此对象恢复到其先前获取的状态。 AbstractQueuedSynchronizer
方法将创建此类条件,因此如果不能满足此约束,请勿使用该约束。 AbstractQueuedSynchronizer.ConditionObject
的行为当然取决于其同步器实现的语义。
该类为内部队列提供检查,检测和监控方法,以及条件对象的类似方法。 这些可以根据需要导出到类中,使用AbstractQueuedSynchronizer
进行同步机制。
此类的序列化仅存储底层原子整数维持状态,因此反序列化对象具有空线程队列。 需要可序列化的典型子类将定义一个readObject
方法,可以将其恢复为readObject
时的已知初始状态。
Modifier and Type | Method and Description |
---|---|
void |
acquire(int arg)
以独占模式获取,忽略中断。 |
void |
acquireInterruptibly(int arg)
以独占方式获得,如果中断,中止。 |
void |
acquireShared(int arg)
以共享模式获取,忽略中断。 |
void |
acquireSharedInterruptibly(int arg)
以共享方式获取,如果中断,中止。 |
protected boolean |
compareAndSetState(int expect, int update)
如果当前状态值等于期望值,则将同步状态原子地设置为给定的更新值。 |
Collection<Thread> |
getExclusiveQueuedThreads()
返回一个包含可能正在等待以独占模式获取的线程的集合。 |
Thread |
getFirstQueuedThread()
返回队列中第一个(等待时间最长的)线程,或 |
Collection<Thread> |
getQueuedThreads()
返回一个包含可能正在等待获取的线程的集合。 |
int |
getQueueLength()
返回等待获取的线程数的估计。 |
Collection<Thread> |
getSharedQueuedThreads()
返回包含可能正在等待在共享模式下获取的线程的集合。 |
protected int |
getState()
返回同步状态的当前值。 |
Collection<Thread> |
getWaitingThreads(AbstractQueuedSynchronizer.ConditionObject condition)
返回一个集合,其中包含可能正在等待与此同步器相关联的给定条件的线程。 |
int |
getWaitQueueLength(AbstractQueuedSynchronizer.ConditionObject condition)
返回等待与此同步器相关联的给定条件的线程数量的估计。 |
boolean |
hasContended()
查询任何线程是否有争取获取此同步器; 那就是收购方法是否被阻止。 |
boolean |
hasQueuedPredecessors()
查询任何线程是否等待获取比当前线程更长的时间。 |
boolean |
hasQueuedThreads()
查询任何线程是否等待获取。 |
boolean |
hasWaiters(AbstractQueuedSynchronizer.ConditionObject condition)
查询任何线程是否等待与此同步器相关联的给定条件。 |
protected boolean |
isHeldExclusively()
返回 |
boolean |
isQueued(Thread thread)
如果给定的线程当前排队,则返回true。 |
boolean |
owns(AbstractQueuedSynchronizer.ConditionObject condition)
查询给定的ConditionObject是否将此同步器用作其锁。 |
boolean |
release(int arg)
以专属模式发布。 |
boolean |
releaseShared(int arg)
以共享模式发布。 |
protected void |
setState(int newState)
设置同步状态的值。 |
String |
toString()
返回一个标识此同步器的字符串及其状态。 |
protected boolean |
tryAcquire(int arg)
尝试以独占模式获取。 |
boolean |
tryAcquireNanos(int arg, long nanosTimeout)
尝试以独占模式获取,如果中断则中止,如果给定的超时时间失败。 |
protected int |
tryAcquireShared(int arg)
尝试以共享模式获取。 |
boolean |
tryAcquireSharedNanos(int arg, long nanosTimeout)
尝试以共享模式获取,如果中断则中止,如果给定的时间超过,则失败。 |
protected boolean |
tryRelease(int arg)
尝试设置状态以独占模式反映版本。 |
protected boolean |
tryReleaseShared(int arg)
尝试将状态设置为以共享模式反映发布。 |
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import sun.misc.Unsafe;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// Queuing utilities
static final long spinForTimeoutThreshold = 1000L;
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// Utilities for various versions of acquire
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// Main exported methods
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// Queue inspection methods
public final boolean hasQueuedThreads() {
return head != tail;
}
public final boolean hasContended() {
return head != null;
}
public final Thread getFirstQueuedThread() {
// handle only fast path, else relay
return (head == tail) ? null : fullGetFirstQueuedThread();
}
private Thread fullGetFirstQueuedThread() {
Node h, s;
Thread st;
if (((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null) ||
((h = head) != null && (s = h.next) != null &&
s.prev == head && (st = s.thread) != null))
return st;
Node t = tail;
Thread firstThread = null;
while (t != null && t != head) {
Thread tt = t.thread;
if (tt != null)
firstThread = tt;
t = t.prev;
}
return firstThread;
}
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// Instrumentation and monitoring methods
public final int getQueueLength() {
int n = 0;
for (Node p = tail; p != null; p = p.prev) {
if (p.thread != null)
++n;
}
return n;
}
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
public final Collection<Thread> getExclusiveQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (!p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public final Collection<Thread> getSharedQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
if (p.isShared()) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
}
return list;
}
public String toString() {
int s = getState();
String q = hasQueuedThreads() ? "non" : "";
return super.toString() +
"[State = " + s + ", " + q + "empty queue]";
}
// Internal support methods for Conditions
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Instrumentation methods for conditions
public final boolean owns(ConditionObject condition) {
return condition.isOwnedBy(this);
}
public final boolean hasWaiters(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.hasWaiters();
}
public final int getWaitQueueLength(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitQueueLength();
}
public final Collection<Thread> getWaitingThreads(ConditionObject condition) {
if (!owns(condition))
throw new IllegalArgumentException("Not owner");
return condition.getWaitingThreads();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
public ConditionObject() { }
// Internal methods
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// support for instrumentation
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
}
可以由线程专有的同步器。 该类提供了创建可能需要所有权概念的锁和相关同步器的基础。 AbstractOwnableSynchronizer
类本身不管理或使用此信息。 然而,子类和工具可能会使用适当维护的值来帮助控制和监视访问并提供诊断。
Modifier and Type | Method and Description |
---|---|
protected Thread |
getExclusiveOwnerThread()
返回由 |
protected void |
setExclusiveOwnerThread(Thread thread)
设置当前拥有独占访问权限的线程。 |
package java.util.concurrent.locks;
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}