Java并发编程笔记之PriorityBlockingQueue源码分析
JDK 中无界优先级队列PriorityBlockingQueue 内部使用堆算法保证每次出队都是优先级最高的元素,元素入队时候是如何建堆的,元素出队后如何调整堆的平衡的?
PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最好或者最低的元素,内部是平衡二叉树堆的实现。
首先看一下PriorityBlockingQueue类图结构,如下:
可以看到PriorityBlockingQueue内部有个数组queue用来存放队列元素,size用来存放队列元素个数,allocationSpinLock 是个自旋锁,用CAS操作来保证只有一个线程可以扩容队列,
状态为0 或者1,其中0标示当前没有在进行扩容,1标示当前正在扩容。
我们首先看看PriorityBlockingQueue的构造函数,源码如下:
private static final int DEFAULT_INITIAL_CAPACITY = 11; public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; }
如上构造函数,默认队列容量为11,默认比较器为null,也就是使用元素的compareTo方法进行比较来确定元素的优先级,这意味着队列元素必须实现Comparable接口。
接下来我们主要看PriorityBlockingQueue的几个操作的源码,如下:
1.offer 操作,offer操作的作用是在队列插入一个元素,由于是无界队列,所以一直返回true,源码如下:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); //获取独占锁 final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //如果当前元素个数>=队列容量,则扩容(1) while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; //默认比较器为null (2) if (cmp == null) siftUpComparable(n, e, array); else //自定义比较器 (3) siftUpUsingComparator(n, e, array, cmp); //队列元素增加1,并且**notEmpty的条件队列里面的一个阻塞线程(9) size = n + 1; notEmpty.signal();//**调用take()方法被阻塞的线程 } finally { //释放独占锁 lock.unlock(); } return true; }
可以看到上面代码,offer操作主流程比较简单,接下来主要关注PriorityBlockingQueue是如何进行扩容的和内部如何建立堆的,首先看扩容源码如下:
private void tryGrow(Object[] array, int oldCap) { lock.unlock(); //释放获取的锁 Object[] newArray = null; //cas成功则扩容(4) if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { //oldGap<64则扩容新增oldcap+2,否者扩容50%,并且最大为MAX_ARRAY_SIZE int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // 如果一开始容量很小,则扩容宽度变大 (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // 可能溢出 int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { allocationSpinLock = 0; } } //第一个线程cas成功后,第二个线程会进入这个地方,然后第二个线程让出cpu,尽量让第一个线程执行下面点获取锁,但是这得不到肯定的保证。(5) if (newArray == null) // 如果两外一个线程正在分配,则让出 Thread.yield(); lock.lock();//(6) if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
tryGrow 目的是扩容,这里要思考下为啥在扩容前要先释放锁,然后使用 cas 控制只有一个线程可以扩容成功呢?
其实这里不先释放锁也是可以的,也就是在整个扩容期间一直持有锁,但是扩容是需要花时间的,如果扩容的时候还占用锁,那么其他线程在这个时候是不能进行出队和入队操作的,
这大大降低了并发性。所以为了提高性能,使用CAS控制只有一个线程可以进行扩容,并且在扩容前释放了锁,让其他线程可以进行入队和出队操作。