java数据结构之并发队列中的阻塞队列 BlockingDeque 接口实现之 LinkedBlockingDeque
1 双向并发阻塞队列。
所谓双向是指可以从队列的头和尾同时操作,并发只是线程安全的实现,阻塞允许在入队出队不满足条件时挂起线程,这里说的队列是指支持FIFO/FILO实现的链表。
首先看下LinkedBlockingDeque的数据结构。通常情况下从数据结构上就能看出这种实现的优缺点,这样就知道如何更好的使用工具了。
从数据结构和功能需求上可以得到以下结论:
- 要想支持阻塞功能,队列的容量一定是固定的,否则无法在入队的时候挂起线程。也就是capacity是final类型的。
- 既然是双向链表,每一个结点就需要前后两个引用,这样才能将所有元素串联起来,支持双向遍历。也即需要prev/next两个引用。
- 双向链表需要头尾同时操作,所以需要first/last两个节点,当然可以参考LinkedList那样采用一个节点的双向来完成,那样实现起来就稍微麻烦点。
- 既然要支持阻塞功能,就需要锁和条件变量来挂起线程。这里使用一个锁两个条件变量来完成此功能。
2、LinkedBlockingDeque源码分析
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {
- /** 包含前驱和后继节点的双向链式结构 */
- static final class Node<E> {
- E item;
- Node<E> prev;
- Node<E> next;
- Node(E x, Node<E> p, Node<E> n) {
- item = x;
- prev = p;
- next = n;
- }
- }
- /** 头节点 */
- private transient Node<E> first;
- /** 尾节点 */
- private transient Node<E> last;
- /** 元素个数*/
- private transient int count;
- /** 队列容量 */
- private final int capacity;
- /** 锁 */
- private final ReentrantLock lock = new ReentrantLock();
- /** notEmpty条件 */
- private final Condition notEmpty = lock.newCondition();
- /** notFull条件 */
- private final Condition notFull = lock.newCondition();
- /** 构造方法 */
- public LinkedBlockingDeque() {
- this(Integer.MAX_VALUE);
- }
- public LinkedBlockingDeque(int capacity) {
- if (capacity <= 0) throw new IllegalArgumentException();
- this.capacity = capacity;
- }
- public LinkedBlockingDeque(Collection<? extends E> c) {
- this(Integer.MAX_VALUE);
- for (E e : c)
- add(e);
- }
- /**
- * 添加元素作为新的头节点
- */
- private boolean linkFirst(E e) {
- if (count >= capacity)
- return false;
- ++count;
- Node<E> f = first;
- Node<E> x = new Node<E>(e, null, f);
- first = x;
- if (last == null)
- last = x;
- else
- f.prev = x;
- notEmpty.signal();
- return true;
- }
- /**
- * 添加尾元素
- */
- private boolean linkLast(E e) {
- if (count >= capacity)
- return false;
- ++count;
- Node<E> l = last;
- Node<E> x = new Node<E>(e, l, null);
- last = x;
- if (first == null)
- first = x;
- else
- l.next = x;
- notEmpty.signal();
- return true;
- }
- /**
- * 返回并移除头节点
- */
- private E unlinkFirst() {
- Node<E> f = first;
- if (f == null)
- return null;
- Node<E> n = f.next;
- first = n;
- if (n == null)
- last = null;
- else
- n.prev = null;
- --count;
- notFull.signal();
- return f.item;
- }
- /**
- * 返回并移除尾节点
- */
- private E unlinkLast() {
- Node<E> l = last;
- if (l == null)
- return null;
- Node<E> p = l.prev;
- last = p;
- if (p == null)
- first = null;
- else
- p.next = null;
- --count;
- notFull.signal();
- return l.item;
- }
- /**
- * 移除节点x
- */
- private void unlink(Node<E> x) {
- Node<E> p = x.prev;
- Node<E> n = x.next;
- if (p == null) {//x是头的情况
- if (n == null)
- first = last = null;
- else {
- n.prev = null;
- first = n;
- }
- } else if (n == null) {//x是尾的情况
- p.next = null;
- last = p;
- } else {//x是中间的情况
- p.next = n;
- n.prev = p;
- }
- --count;
- notFull.signalAll();
- }
- public void putFirst(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- lock.lock();
- try {
- while (!linkFirst(e))
- notFull.await();
- } finally {
- lock.unlock();
- }
- }
- public void putLast(E e) throws InterruptedException {
- if (e == null) throw new NullPointerException();
- lock.lock();
- try {
- while (!linkLast(e))
- notFull.await();
- } finally {
- lock.unlock();
- }
- }
- public E takeFirst() throws InterruptedException {
- lock.lock();
- try {
- E x;
- while ( (x = unlinkFirst()) == null)
- notEmpty.await();
- return x;
- } finally {
- lock.unlock();
- }
- }
- public E takeLast() throws InterruptedException {
- lock.lock();
- try {
- E x;
- while ( (x = unlinkLast()) == null)
- notEmpty.await();
- return x;
- } finally {
- lock.unlock();
- }
- }
- public void put(E e) throws InterruptedException {
- putLast(e);
- }
- public E take() throws InterruptedException {
- return takeFirst();
- }
- ]
- }
3、LinkedBlockingDeque的优缺点
同时由于采用一个独占锁,因此实现起来也比较简单。所有对队列的操作都加锁就可以完成。同时独占锁也能够很好的支持双向阻塞的特性。
也由于独占锁,所以不能同时进行两个操作,这样性能上就大打折扣。从性能的角度讲LinkedBlockingDeque要比LinkedBlockingQueue要低很多,比CocurrentLinkedQueue就低更多了,这在高并发情况下就比较明显了。
4、LinkedBlockingDeque的序列化、反序列化
有趣的是此类支持序列化,但是Node并不支持序列化,因此fist/last就不能序列化,那么如何完成序列化/反序列化过程呢?
清单4 LinkedBlockingDeque的序列化、反序列化
private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException {
- lock.lock();
- try {
- // Write out capacity and any hidden stuff
- s.defaultWriteObject();
- // Write out all elements in the proper order.
- for (Node<E> p = first; p != null; p = p.next)
- s.writeObject(p.item);
- // Use trailing null as sentinel
- s.writeObject(null);
- } finally {
- lock.unlock();
- }
- }
- private void readObject(java.io.ObjectInputStream s)
- throws java.io.IOException, ClassNotFoundException {
- s.defaultReadObject();
- count = 0;
- first = null;
- last = null;
- // Read in all elements and place in queue
- for (;;) {
- E item = (E)s.readObject();
- if (item == null)
- break;
- add(item);
- }
- }
清单4 描述的是LinkedBlockingDeque序列化/反序列化的过程。序列化时将真正的元素写入输出流,最后还写入了一个null。读取的时候将所有对象列表读出来,如果读取到一个null就表示结束。这就是为什么写入的时候写入一个null的原因,因为没有将count写入流,所以就靠null来表示结束,省一个整数空间