多线程理解(十一) 并发容器J.U.C AQS组件
一、AQS是什么:AQS全名是AbstractQueuedSynchronizer,是并发容器J.U.C(java.lang.concurrent)下locks包内的一个类。它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列。底层实现的数据结构是一个双向列表。
Sync queue:同步队列,是一个双向列表。包括head节点和tail节点。head节点主要用作后续的调度。
Condition queue:非必须,单向列表。当程序中存在cindition的时候才会存在此列表。
二、AQS设计思想:
使用Node实现FIFO队列,可以用于构建锁或者其他同步装置的基础框架。
利用int类型标识状态。在AQS类中有一个叫做state的成员变量
/** * The synchronization state. */
private volatile int state;
基于AQS有一个同步组件,叫做ReentrantLock。在这个组件里,stste表示获取锁的线程数,假如state=0,表示还没有线程获取锁,1表示有线程获取了锁。大于1表示重入锁的数量。
继承:子类通过继承并通过实现它的方法管理其状态(acquire和release方法操纵状态)。
可以同时实现排它锁和共享锁模式(独占、共享),站在一个使用者的角度,AQS的功能主要分为两类:独占和共享。它的所有子类中,要么实现并使用了它的独占功能的api,要么使用了共享锁的功能,而不会同时使用两套api,即便是最有名的子类ReentrantReadWriteLock也是通过两个内部类读锁和写锁分别实现了两套api来实现的。
三、AQS的大致实现思路:
AQS内部维护了一个CLH(自旋队列)队列来管理锁。线程会首先尝试获取锁,如果失败就将当前线程及等待状态等信息包装成一个node节点加入到同步队列sync queue里。
接着会不断的循环尝试获取锁,条件是当前节点为head的直接后继才会尝试。如果失败就会阻塞自己直到自己被唤醒。而当持有锁的线程释放锁的时候,会唤醒队列中的后继线程。
四、AQS组件:CountDownLatch
通过一个计数来保证线程是否需要被阻塞。实现一个或多个线程等待其他线程执行的场景。
我们定义一个CountDownLatch,通过给定的计数器为其初始化,该计数器是原子性操作,保证同时只有一个线程去操作该计数器。调用该类await方法的线程会一直处于阻塞状态。只有其他线程调用countDown方法(每次使计数器-1),使计数器归零才能继续执行。
private final static int threadCount = 200;
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
final CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadNum = i;
exec.execute(() -> {
try {
test(threadNum);
} catch (Exception e) {
log.error("exception", e);
} finally {
countDownLatch.countDown();
}
});
}
countDownLatch.await();
log.info("finish");
exec.shutdown();
}
CountDownLatch的await方法还有重载形式,可以设置等待的时间,如果超过此时间,计数器还未清零,则不继续等待:
countDownLatch.await(10, TimeUnit.MILLISECONDS);
//参数1:等待的时间长度
//参数2:等待的时间单位
五、AQS组件:Semaphore
用于保证同一时间并发访问线程的数目。
信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。
在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。
使用场景:仅能提供有限访问的资源。比如数据库连接。
Semaphore使用acquire方法和release方法来实现控制:
/** *
- 普通调用
*/
try {
semaphore.acquire(); // 获取一个许可
test();//需要并发控制的内容
semaphore.release(); // 释放一个许可
} catch (Exception e) {
log.error("exception", e);
}
/** *
2、同时获取多个许可,同时释放多个许可
*/
try {
semaphore.acquire(2);
test();
semaphore.release(2);
} catch (Exception e) {
log.error("exception", e); }
/* *
3、尝试获取许可,获取不到不执行
*/
try{
if(semaphore.tryAcquire())
{
test(threadNum);
semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}
/* *
4、尝试获取许可一段时间,获取不到不执行
* 参数1:等待时间长度 参数2:等待时间单位
*/ try {
if(semaphore.tryAcquire(5000,TimeUnit.MILLISECONDS))
{
test(threadNum); semaphore.release();
}
} catch (Exception e) {
log.error("exception", e);
}
六、AQS组件:CyclicBarrier
也是一个同步辅助类,它允许一组线程相互等待,直到到达某个公共的屏障点(循环屏障)
通过它可以完成多个线程之间相互等待,只有每个线程都准备就绪后才能继续往下执行后面的操作。
每当有一个线程执行了await方法,计数器就会执行+1操作,待计数器达到预定的值,所有的线程再同时继续执行。由于计数器释放之后可以重用(reset方法),所以称之为循环屏障。
CountDownLatch、CyclicBarrier区别
1.CoumtDownLatch 计时器只能使用一次,CyclicBarrier 可以通过reset循环使用,例如:当发生错误的时候可以重置计数器。
所以 CyclicBarrier 可以处理更为复杂的业务场景。
2.CyclicBarrier还提供了其他有用方法,比如getNumberWaiting方法可以获取CyclicBarrier阻塞线程数量,isBroken()方法
用来了解阻塞的线程是否被中断
3.CountDownLatch 一个或n个线程等待其他线程,CyclicBarrier 是多个线程之间相互等待,直到所有线程都满足条件之后 一起执行
//使用方法1:每个线程都持续等待
public class CyclicBarrierExample1 {
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
}
//使用方法2:每个线程只等待一段时间
private static CyclicBarrier barrier = new CyclicBarrier(5);
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
try {
barrier.await(2000, TimeUnit.MILLISECONDS);
} catch (Exception e) {
log.warn("BarrierException", e);
}
log.info("{} continue", threadNum);
}
//使用方法3:在初始化的时候设置runnable,当线程达到屏障时优先执行runnable
private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
log.info("callback is running");
});
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++){
final int threadNum = i;
Thread.sleep(1000);
executor.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executor.shutdown();
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000);
log.info("{} is ready", threadNum);
barrier.await();
log.info("{} continue", threadNum);
}
七、AQS组件:ReentrantLock
java中有两类锁,一类是Synchronized,而另一类就是J.U.C中提供的锁。ReentrantLock与Synchronized都是可重入锁,本质上都是lock与unlock的操作。接下来我们介绍三种J.U.C中的锁,其中 ReentrantLock使用synchronized与之比对介绍。
1.ReentrantLock与synchronized的区别
可重入性:两者的锁都是可重入的,差别不大,有线程进入锁,计数器自增1,等下降为0时才可以释放锁
锁的实现:synchronized是基于JVM实现的(用户很难见到,无法了解其实现),ReentrantLock是JDK实现的。
性能区别:在最初的时候,二者的性能差别差很多,当synchronized引入了偏向锁、轻量级锁(自选锁)后,二者的性能差别不大,官方推荐synchronized(写法更容易、在优化时其实是借用了ReentrantLock的CAS技术,试图在用户态就把问题解决,避免进入内核态造成线程阻塞)
功能区别:
(1)便利性:synchronized更便利,它是由编译器保证加锁与释放。ReentrantLock是需要手动释放锁,所以为了避免忘记手工释放锁造成死锁,所以最好在finally中声明释放锁。
(2)锁的细粒度和灵活度,ReentrantLock优于synchronized
2.ReentrantLock独有的功能
可以指定是公平锁还是非公平锁,synchronized只能是非公平锁。(所谓公平锁就是先等待的线程先获得锁)
提供了一个Condition类,可以分组唤醒需要唤醒的线程。不像是synchronized要么随机唤醒一个线程,要么全部唤醒。
提供能够中断等待锁的线程的机制,通过lock.lockInterruptibly()实现,这种机制 ReentrantLock是一种自选锁,通过循环调用CAS操作来实现加锁。性能比较好的原因是避免了进入内核态的阻塞状态。
3.要放弃synchronized?
从上边的介绍,看上去ReentrantLock不仅拥有synchronized的所有功能,而且有一些功能synchronized无法实现的特性。性能方面,ReentrantLock也不比synchronized差,那么到底我们要不要放弃使用synchronized呢?答案是不要这样做。
J.U.C包中的锁定类是用于高级情况和高级用户的工具,除非说你对Lock的高级特性有特别清楚的了解以及有明确的需要,或这有明确的证据表明同步已经成为可伸缩性的瓶颈的时候,否则我们还是继续使用synchronized。相比较这些高级的锁定类,synchronized还是有一些优势的,比如synchronized不可能忘记释放锁。还有当JVM使用synchronized管理锁定请求和释放时,JVM在生成线程转储时能够包括锁定信息,这些信息对调试非常有价值,它们可以标识死锁以及其他异常行为的来源。
4.如何使用ReentrantLock?
//创建锁:使用Lock对象声明,使用ReentrantLock接口创建
private final static Lock lock = new ReentrantLock();
//使用锁:在需要被加锁的方法中使用
private static void add()
{
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
分析一下源码:
//初始化方面:
//在new ReentrantLock的时候默认给了一个不公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//也可以加参数来初始化指定使用公平锁还是不公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync(); }
内置函数(部分)
基础特性:
- tryLock():仅在调用时锁定未被另一个线程保持的情况下才获取锁定。
- tryLock(long timeout, TimeUnit unit):如果锁定在给定的时间内没有被另一个线程保持且当前线程没有被中断,则获取这个锁定。
- lockInterruptbily:如果当前线程没有被中断的话,那么就获取锁定。如果中断了就抛出异常。
- isLocked:查询此锁定是否由任意线程保持
- isHeldByCurrentThread:查询当前线程是否保持锁定状态。
- isFair:判断是不是公平锁
Condition相关特性:
- hasQueuedThread(Thread):查询指定线程是否在等待获取此锁定
- hasQueuedThreads():查询是否有线程在等待获取此锁定
- getHoldCount():查询当前线程保持锁定的个数,也就是调用Lock方法的个数
5.Condition的使用:Condition可以非常灵活的操作线程的唤醒,下面是一个线程等待与唤醒的例子,其中用1234序号标出了日志输出顺序
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();
Condition condition = reentrantLock.newCondition();
new Thread(() -> {
try {
reentrantLock.lock();
log.info("wait signal"); // 1
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("get signal"); // 4
reentrantLock.unlock();
}).start();
new Thread(() -> {
reentrantLock.lock();
log.info("get lock"); // 2
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
condition.signalAll();
log.info("send signal ~ "); // 3
reentrantLock.unlock();
}).start();
}
输出过程讲解:
1、 线程1调用了reentrantLock.lock(),线程进入AQS等待队列,输出1号log
2、接着调用了awiat方法,线程从AQS队列中移除,锁释放,直接加入condition的等待队列中
3、线程2因为线程1释放了锁,拿到了锁,输出2号log
4、线程2执行condition.signalAll()发送信号,输出3号log
5、condition队列中线程1的节点接收到信号,从condition队列中拿出来放入到了AQS的等待队列,这时线程1并没有被唤醒。
6、线程2调用unlock释放锁,因为AQS队列中只有线程1,因此AQS释放锁按照从头到尾的顺序,唤醒线程1
7、线程1继续执行,输出4号log,并进行unlock操作。
八、ReentrantReadWriteLock读写锁
在没有任何读写锁的时候才可以取得写入锁(悲观读取,容易写线程饥饿),也就是说如果一直存在读操作,那么写锁一直在等待没有读的情况出现,这样我的写锁就永远也获取不到,就会造成等待获取写锁的线程饥饿。
平时使用的场景并不多。
public class LockExample3 {
private final Map<String, Data> map = new TreeMap<>();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();//读锁
private final Lock writeLock = lock.writeLock();//写锁
//加读锁
public Data get(String key) {
readLock.lock();
try {
return map.get(key);
} finally {
readLock.unlock();
}
}
public Set<String> getAllKeys() {
readLock.lock();
try {
return map.keySet();
} finally {
readLock.unlock();
}
}
//加写锁
public Data put(String key, Data value) {
writeLock.lock();
try {
return map.put(key, value);
} finally {
readLock.unlock();
}
}
class Data {
}
}
九、票据锁:StempedLock
它控制锁有三种模式(写、读、乐观读)。一个StempedLock的状态是由版本和模式两个部分组成。锁获取方法返回一个数字作为票据(stamp),他用相应的锁状态表示并控制相关的访问。数字0表示没有写锁被锁写访问,在读锁上分为悲观锁和乐观锁。
乐观读:
如果读的操作很多写的很少,我们可以乐观的认为读的操作与写的操作同时发生的情况很少,因此不悲观的使用完全的读取锁定。程序可以查看读取资料之后是否遭到写入资料的变更,再采取之后的措施。
如何使用:
//定义
private final static StampedLock lock = new StampedLock();
//需要上锁的方法
private static void add() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlock(stamp);
}
}
分析一下源码:
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();
void move(double deltaX, double deltaY) { // an exclusively locked method
long stamp = sl.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp);
}
}
//下面看看乐观读锁案例
double distanceFromOrigin() { // A read-only method
long stamp = sl.tryOptimisticRead();//获得一个乐观读锁
double currentX = x, currentY = y; //将两个字段读入本地局部变量
if (!sl.validate(stamp)) { //检查发出乐观读锁后同时是否有其他写锁发生?
stamp = sl.readLock(); //如果没有,我们再次获得一个读悲观锁
try {
currentX = x; // 将两个字段读入本地局部变量
currentY = y; // 将两个字段读入本地局部变量
} finally {
sl.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
//下面是悲观读锁案例
void moveIfAtOrigin(double newX, double newY) { // upgrade
// Could instead start with optimistic, not read mode
long stamp = sl.readLock();
try {
while (x == 0.0 && y == 0.0) { //循环,检查当前状态是否符合
long ws = sl.tryConvertToWriteLock(stamp); //将读锁转为写锁
if (ws != 0L) { //这是确认转为写锁是否成功
stamp = ws; //如果成功 替换票据
x = newX; //进行状态改变
y = newY; //进行状态改变
break;
} else { //如果不能成功转换为写锁
sl.unlockRead(stamp); //我们显式释放读锁
stamp = sl.writeLock(); //显式直接进行写锁 然后再通过循环再试
}
}
} finally {
sl.unlock(stamp); //释放读锁或写锁
}
}
}
如何选择锁?
- 当只有少量竞争者,使用synchronized 。
- 竞争者不少但是线程增长的趋势是能预估的,使用ReetrantLock 。
- synchronized不会造成死锁,jvm会自动释放死锁。
十、FutureTask
FutureTask是J.U.C中的类,是一个可删除的异步计算类。这个类提供了Future接口的的基本实现,使用相关方法启动和取消计算,查询计算是否完成,并检索计算结果。只有在计算完成时才能使用get方法检索结果;如果计算尚未完成,get方法将会阻塞。一旦计算完成,计算就不能重新启动或取消(除非使用runAndReset方法调用计算)。
Runnable与Callable对比
通常实现一个线程我们会使用继承Thread的方式或者实现Runnable接口,这两种方式有一个共同的缺陷就是在执行完任务之后无法获取执行结果。从Java1.5之后就提供了Callable与Future,这两个接口就可以实现获取任务执行结果。
Runnable接口:代码非常简单,只有一个方法run
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
Callable泛型接口:有泛型参数,提供了一个call方法,执行后可返回传入的泛型参数类型的结果。
public interface Callable<V> {
V call() throws Exception;
}
Future接口
Future接口提供了一系列方法用于控制线程执行计算,如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);//取消任务
boolean isCancelled();//是否被取消
boolean isDone();//计算是否完成
V get() throws InterruptedException, ExecutionException;//获取计算结果,在执行过程中任务被阻塞
V get(long timeout, TimeUnit unit)//timeout等待时间、unit时间单位
throws InterruptedException, ExecutionException, TimeoutException;
}
使用方法:
public class FutureExample {
static class MyCallable implements Callable<String> {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
}
public static void main(String[] args) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
Future<String> future = executorService.submit(new MyCallable());
log.info("do something in main");
Thread.sleep(1000);
String result = future.get();
log.info("result:{}", result);
}
}
运行结果:阻塞效果
FutureTask
Future实现了RunnableFuture接口,而RunnableFuture接口继承了Runnable与Future接口,所以它既可以作为Runnable被线程中执行,又可以作为callable获得返回值。
public class FutureTask<V> implements RunnableFuture<V> {
}
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
FutureTask支持两种参数类型,Callable和Runnable,在使用Runnable 时,还可以多指定一个返回结果类型。
public FutureTask(Callable<V> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable }
使用方法:
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
FutureTask<String> futureTask = new FutureTask<String>(new Callable<String>() {
@Override
public String call() throws Exception {
log.info("do something in callable");
Thread.sleep(5000);
return "Done";
}
});
new Thread(futureTask).start();
log.info("do something in main");
Thread.sleep(1000);
String result = futureTask.get();
log.info("result:{}", result);
}
}
运行结果:
十一、ForkJoin
ForkJoin是Java7提供的一个并行执行任务的框架,是把大任务分割成若干个小任务,待小任务完成后将结果汇总成大任务结果的框架。主要采用的是工作窃取算法,工作窃取算法是指某个线程从其他队列里窃取任务来执行。
在窃取过程中两个线程会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,通常我们会使用双端队列来实现工作窃取算法。被窃取任务的线程永远从队列的头部拿取任务,窃取任务的线程从队列尾部拿取任务。
局限性:
1、任务只能使用fork和join作为同步机制,如果使用了其他同步机制,当他们在同步操作时,工作线程就不能执行其他任务了。比如在fork框架使任务进入了睡眠,那么在睡眠期间内在执行这个任务的线程将不会执行其他任务了。
2、我们所拆分的任务不应该去执行IO操作,如读和写数据文件。
3、任务不能抛出检查异常。必须通过必要的代码来处理他们。
框架核心:
核心有两个类:ForkJoinPool | ForkJoinTask
ForkJoinPool:负责来做实现,包括工作窃取算法、管理工作线程和提供关于任务的状态以及他们的执行信息。
ForkJoinTask:提供在任务中执行fork和join的机制。
使用方式:(模拟加和运算)
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
//执行一个任务
Future<Integer> result = forkjoinPool.submit(task);
try {
log.info("result:{}", result.get());
} catch (Exception e) {
log.error("exception", e);
}
}
}
十二、BlockingQueue阻塞队列
主要应用场景:生产者消费者模型,是线程安全的
阻塞情况:
1、当队列满了进行入队操作
2、当队列空了的时候进行出队列操作
四套方法:
BlockingQueue提供了四套方法,分别来进行插入、移除、检查。每套方法在不能立刻执行时都有不同的反应。
- Throws Exceptions :如果不能立即执行就抛出异常。
- Special Value:如果不能立即执行就返回一个特殊的值。
- Blocks:如果不能立即执行就阻塞
- Times Out:如果不能立即执行就阻塞一段时间,如果过了设定时间还没有被执行,则返回一个值
实现类:
- ArrayBlockingQueue:它是一个有界的阻塞队列,内部实现是数组,初始化时指定容量大小,一旦指定大小就不能再变。采用FIFO方式存储元素。
- DelayQueue:阻塞内部元素,内部元素必须实现Delayed接口,Delayed接口又继承了Comparable接口,原因在于DelayQueue内部元素需要排序,一般情况按过期时间优先级排序。
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
DalayQueue内部采用PriorityQueue与ReentrantLock实现。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
}
- LinkedBlockingQueue:大小配置可选,如果初始化时指定了大小,那么它就是有边界的。不指定就无边界(最大整型值)。内部实现是链表,采用FIFO形式保存数据。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
//不指定大小,无边界采用默认值,最大整型值
}
- PriorityBlockingQueue:带优先级的阻塞队列。无边界队列,允许插入null。插入的对象必须实现Comparator接口,队列优先级的排序规则就是按照我们对Comparable接口的实现来指定的。我们可以从PriorityBlockingQueue中获取一个迭代器,但这个迭代器并不保证能按照优先级的顺序进行迭代。
public boolean add(E e) {
//添加方法
return offer(e);
}
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock; lock.lock();
int n, cap; Object[] array;
while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap);
try { Comparator<? super E> cmp = comparator;
//必须实现Comparator接口
if (cmp == null) siftUpComparable(n, e, array);
else siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
}
finally
{
lock.unlock();
}
return true;
}
- SynchronusQueue:只能插入一个元素,同步队列,无界非缓存队列,不存储元素。