java数据结构之并发队列中的阻塞队列 BlockingDeque 接口实现之 LinkedBlockingDeque

1 双向并发阻塞队列。

所谓双向是指可以从队列的头和尾同时操作,并发只是线程安全的实现,阻塞允许在入队出队不满足条件时挂起线程,这里说的队列是指支持FIFO/FILO实现的链表。

首先看下LinkedBlockingDeque的数据结构。通常情况下从数据结构上就能看出这种实现的优缺点,这样就知道如何更好的使用工具了。

java数据结构之并发队列中的阻塞队列 BlockingDeque 接口实现之 LinkedBlockingDeque

从数据结构和功能需求上可以得到以下结论:

  1. 要想支持阻塞功能,队列的容量一定是固定的,否则无法在入队的时候挂起线程。也就是capacity是final类型的。
  2. 既然是双向链表,每一个结点就需要前后两个引用,这样才能将所有元素串联起来,支持双向遍历。也即需要prev/next两个引用。
  3. 双向链表需要头尾同时操作,所以需要first/last两个节点,当然可以参考LinkedList那样采用一个节点的双向来完成,那样实现起来就稍微麻烦点。
  4. 既然要支持阻塞功能,就需要锁和条件变量来挂起线程。这里使用一个锁两个条件变量来完成此功能。

2、LinkedBlockingDeque源码分析

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>,  java.io.Serializable {  

  1.     /** 包含前驱和后继节点的双向链式结构 */  
  2.     static final class Node<E> {  
  3.  E item;  
  4.         Node<E> prev;  
  5.         Node<E> next;  
  6.         Node(E x, Node<E> p, Node<E> n) {  
  7.             item = x;  
  8.             prev = p;  
  9.             next = n;  
  10.         }  
  11.     }  
  12.     /** 头节点 */  
  13.     private transient Node<E> first;  
  14.     /** 尾节点 */  
  15.     private transient Node<E> last;  
  16.     /** 元素个数*/  
  17.     private transient int count;  
  18.     /** 队列容量 */  
  19.     private final int capacity;  
  20.     /** 锁 */  
  21.     private final ReentrantLock lock = new ReentrantLock();  
  22.     /** notEmpty条件 */  
  23.     private final Condition notEmpty = lock.newCondition();  
  24.     /** notFull条件 */  
  25.     private final Condition notFull = lock.newCondition();  
  26.     /** 构造方法 */  
  27.     public LinkedBlockingDeque() {  
  28.         this(Integer.MAX_VALUE);  
  29.     }  
  30.     public LinkedBlockingDeque(int capacity) {  
  31.         if (capacity <= 0) throw new IllegalArgumentException();  
  32.         this.capacity = capacity;  
  33.     }  
  34.     public LinkedBlockingDeque(Collection<? extends E> c) {  
  35.         this(Integer.MAX_VALUE);  
  36.         for (E e : c)  
  37.             add(e);  
  38.     }  
  39.   
  40.     /** 
  41.      * 添加元素作为新的头节点 
  42.      */  
  43.     private boolean linkFirst(E e) {  
  44.         if (count >= capacity)  
  45.             return false;  
  46.         ++count;  
  47.         Node<E> f = first;  
  48.         Node<E> x = new Node<E>(e, null, f);  
  49.         first = x;  
  50.         if (last == null)  
  51.             last = x;  
  52.         else  
  53.             f.prev = x;  
  54.         notEmpty.signal();  
  55.         return true;  
  56.     }  
  57.     /** 
  58.      * 添加尾元素 
  59.      */  
  60.     private boolean linkLast(E e) {  
  61.         if (count >= capacity)  
  62.             return false;  
  63.         ++count;  
  64.         Node<E> l = last;  
  65.         Node<E> x = new Node<E>(e, l, null);  
  66.         last = x;  
  67.         if (first == null)  
  68.             first = x;  
  69.         else  
  70.             l.next = x;  
  71.         notEmpty.signal();  
  72.         return true;  
  73.     }  
  74.     /** 
  75.      * 返回并移除头节点 
  76.      */  
  77.     private E unlinkFirst() {  
  78.         Node<E> f = first;  
  79.         if (f == null)  
  80.             return null;  
  81.         Node<E> n = f.next;  
  82.         first = n;  
  83.         if (n == null)  
  84.             last = null;  
  85.         else  
  86.             n.prev = null;  
  87.         --count;  
  88.         notFull.signal();  
  89.         return f.item;  
  90.     }  
  91.     /** 
  92.      * 返回并移除尾节点 
  93.      */  
  94.     private E unlinkLast() {  
  95.         Node<E> l = last;  
  96.         if (l == null)  
  97.             return null;  
  98.         Node<E> p = l.prev;  
  99.         last = p;  
  100.         if (p == null)  
  101.             first = null;  
  102.         else  
  103.             p.next = null;  
  104.         --count;  
  105.         notFull.signal();  
  106.         return l.item;  
  107.     }  
  108.     /** 
  109.      * 移除节点x 
  110.      */  
  111.     private void unlink(Node<E> x) {  
  112.         Node<E> p = x.prev;  
  113.         Node<E> n = x.next;  
  114.         if (p == null) {//x是头的情况  
  115.             if (n == null)  
  116.                 first = last = null;  
  117.             else {  
  118.                 n.prev = null;  
  119.                 first = n;  
  120.             }  
  121.         } else if (n == null) {//x是尾的情况  
  122.             p.next = null;  
  123.             last = p;  
  124.         } else {//x是中间的情况  
  125.             p.next = n;  
  126.             n.prev = p;  
  127.         }  
  128.         --count;  
  129.         notFull.signalAll();  
  130.     }  
  131.    
  132.     public void putFirst(E e) throws InterruptedException {  
  133.         if (e == nullthrow new NullPointerException();  
  134.         lock.lock();  
  135.         try {  
  136.             while (!linkFirst(e))  
  137.                 notFull.await();  
  138.         } finally {  
  139.             lock.unlock();  
  140.         }  
  141.     }  
  142.     public void putLast(E e) throws InterruptedException {  
  143.         if (e == nullthrow new NullPointerException();  
  144.         lock.lock();  
  145.         try {  
  146.             while (!linkLast(e))  
  147.                 notFull.await();  
  148.         } finally {  
  149.             lock.unlock();  
  150.         }  
  151.     }  
  152.     
  153.  
  154.     public E takeFirst() throws InterruptedException {  
  155.         lock.lock();  
  156.         try {  
  157.             E x;  
  158.             while ( (x = unlinkFirst()) == null)  
  159.                 notEmpty.await();  
  160.             return x;  
  161.         } finally {  
  162.             lock.unlock();  
  163.         }  
  164.     }  
  165.     public E takeLast() throws InterruptedException {  
  166.         lock.lock();  
  167.         try {  
  168.             E x;  
  169.             while ( (x = unlinkLast()) == null)  
  170.                 notEmpty.await();  
  171.             return x;  
  172.         } finally {  
  173.             lock.unlock();  
  174.         }  
  175.     }  
  176.   
  177.   
  178.  
  179.     public void put(E e) throws InterruptedException {  
  180.  putLast(e);  
  181.     }  
  182.       public E take() throws InterruptedException {  
  183.  return takeFirst();  
  184.     }  
  185.  ]
  186.  }  

 

 

3、LinkedBlockingDeque的优缺点

 

同时由于采用一个独占锁,因此实现起来也比较简单。所有对队列的操作都加锁就可以完成。同时独占锁也能够很好的支持双向阻塞的特性。

也由于独占锁,所以不能同时进行两个操作,这样性能上就大打折扣。从性能的角度讲LinkedBlockingDeque要比LinkedBlockingQueue要低很多,比CocurrentLinkedQueue就低更多了,这在高并发情况下就比较明显了。

 

4、LinkedBlockingDeque的序列化、反序列化

 

有趣的是此类支持序列化,但是Node并不支持序列化,因此fist/last就不能序列化,那么如何完成序列化/反序列化过程呢?

清单4 LinkedBlockingDeque的序列化、反序列化

 

private void writeObject(java.io.ObjectOutputStream s)  throws java.io.IOException {  

  1.     lock.lock();  
  2.     try {  
  3.         // Write out capacity and any hidden stuff  
  4.         s.defaultWriteObject();  
  5.         // Write out all elements in the proper order.  
  6.         for (Node<E> p = first; p != null; p = p.next)  
  7.             s.writeObject(p.item);  
  8.         // Use trailing null as sentinel  
  9.         s.writeObject(null);  
  10.     } finally {  
  11.         lock.unlock();  
  12.     }  
  13. }  
  14.   
  15. private void readObject(java.io.ObjectInputStream s)  
  16.     throws java.io.IOException, ClassNotFoundException {  
  17.     s.defaultReadObject();  
  18.     count = 0;  
  19.     first = null;  
  20.     last = null;  
  21.     // Read in all elements and place in queue  
  22.     for (;;) {  
  23.         E item = (E)s.readObject();  
  24.         if (item == null)  
  25.             break;  
  26.         add(item);  
  27.     }  
  28. }  
 

 

清单4 描述的是LinkedBlockingDeque序列化/反序列化的过程。序列化时将真正的元素写入输出流,最后还写入了一个null。读取的时候将所有对象列表读出来,如果读取到一个null就表示结束。这就是为什么写入的时候写入一个null的原因,因为没有将count写入流,所以就靠null来表示结束,省一个整数空间