Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

到目前为止,我们在Java世界里看到了两种实现key-value的数据结构:Hash、TreeMap,这两种数据结构各自都有着优缺点。

  1. Hash表:插入、查找最快,为O(1);如使用链表实现则可实现无锁;数据有序化需要显式的排序操作。
  2. 红黑树:插入、查找为O(logn),但常数项较小;无锁实现的复杂性很高,一般需要加锁;数据天然有序。

然而,这次介绍第三种实现key-value的数据结构:SkipList。SkipList有着不低于红黑树的效率,但是其原理和实现的复杂度要比红黑树简单多了。

SkipList

什么是SkipList?Skip List ,称之为跳表,它是一种可以替代平衡树的数据结构,其数据元素默认按照key值升序,天然有序。Skip list让已排序的数据分布在多层链表中,以0-1随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。

我们先看一个简单的链表,如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

如果我们需要查询9、21、30,则需要比较次数为3 + 6 + 8 = 17 次,那么有没有优化方案呢?有!我们将该链表中的某些元素提炼出来作为一个比较“索引”,如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

我们先与这些索引进行比较来决定下一个元素是往右还是下走,由于存在“索引”的缘故,导致在检索的时候会大大减少比较的次数。当然元素不是很多,很难体现出优势,当元素足够多的时候,这种索引结构就会大显身手。

SkipList的特性

SkipList具备如下特性:

  1. 由很多层结构组成,level是通过一定的概率随机产生的
  2. 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的Comparator进行排序,具体取决于使用的构造方法
  3. 最底层(Level 1)的链表包含所有元素
  4. 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现
  5. 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素

我们将上图再做一些扩展就可以变成一个典型的SkipList结构了

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueueJava多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue转存失败重新上传取消Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

SkipList的查找

SkipListd的查找算法较为简单,对于上面我们我们要查找元素21,其过程如下:

  1. 比较3,大于,往后找(9),
  2. 比9大,继续往后找(25),但是比25小,则从9的下一层开始找(16)
  3. 16的后面节点依然为25,则继续从16的下一层找
  4. 找到21

如图

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

红色虚线代表路径。

SkipList的插入

SkipList的插入操作主要包括:

  1. 查找合适的位置。这里需要明确一点就是在确认新节点要占据的层次K时,采用丢硬币的方式,完全随机。如果占据的层次K大于链表的层次,则重新申请新的层,否则插入指定层次
  2. 申请新的节点
  3. 调整指针

假定我们要插入的元素为23,经过查找可以确认她是位于25后,9、16、21前。当然需要考虑申请的层次K。

如果层次K > 3

需要申请新层次(Level 4)

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

如果层次 K = 2

直接在Level 2 层插入即可

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueueJava多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue转存失败重新上传取消Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

这里会涉及到以个算法:通过丢硬币决定层次K,该算法我们通过后面ConcurrentSkipListMap源码来分析。还有一个需要注意的地方就是,在K层插入元素后,需要确保所有小于K层的层次都应该出现新节点。

SkipList的删除

删除节点和插入节点思路基本一致:找到节点,删除节点,调整指针。

比如删除节点9,如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

ConcurrentSkipListMap

通过上面我们知道SkipList采用空间换时间的算法,其插入和查找的效率O(logn),其效率不低于红黑树,但是其原理和实现的复杂度要比红黑树简单多了。一般来说会操作链表List,就会对SkipList毫无压力。

ConcurrentSkipListMap其内部采用SkipLis数据结构实现。为了实现SkipList,ConcurrentSkipListMap提供了三个内部类来构建这样的链表结构:Node、Index、HeadIndex。其中Node表示最底层的单链表有序节点、Index表示为基于Node的索引层,HeadIndex用来维护索引层次。到这里我们可以这样说ConcurrentSkipListMap是通过HeadIndex维护索引层次,通过Index从最上层开始往下层查找,一步一步缩小查询范围,最后到达最底层Node时,就只需要比较很小一部分数据了。在JDK中的关系如下图:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

** Node **

 

static final class Node<K,V> {
    final K key;
    volatile Object value;
    volatile ConcurrentSkipListMap.Node<K, V> next;

    /** 省略些许代码 */
}

 

Node的结构和一般的单链表毫无区别,key-value和一个指向下一个节点的next。

Index

 

static class Index<K,V> {
    final ConcurrentSkipListMap.Node<K,V> node;
    final ConcurrentSkipListMap.Index<K,V> down;
    volatile ConcurrentSkipListMap.Index<K,V> right;

    /** 省略些许代码 */
}

 

Index提供了一个基于Node节点的索引Node,一个指向下一个Index的right,一个指向下层的down节点。

HeadIndex

 

static final class HeadIndex<K,V> extends Index<K,V> {
    final int level;  //索引层,从1开始,Node单链表层为0
    HeadIndex(Node<K,V> node, Index<K,V> down, Index<K,V> right, int level) {
        super(node, down, right);
        this.level = level;
    }
}

 

HeadIndex内部就一个level来定义层级。

ConcurrentSkipListMap提供了四个构造函数,每个构造函数都会调用initialize()方法进行初始化工作。

 

final void initialize() {
    keySet = null;
    entrySet = null;
    values = null;
    descendingMap = null;
    randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
    head = new ConcurrentSkipListMap.HeadIndex<K,V>(new ConcurrentSkipListMap.Node<K,V>(null, BASE_HEADER, null),
            null, null, 1);
}

 

注意,initialize()方法不仅仅只在构造函数中被调用,如clone,clear、readObject时都会调用该方法进行初始化步骤。这里需要注意randomSeed的初始化。

 

private transient int randomSeed;
randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero

 

randomSeed一个简单的随机数生成器(在后面介绍)。

put操作

CoucurrentSkipListMap提供了put()方法用于将指定值与此映射中的指定键关联。源码如下:

 

public V put(K key, V value) {
    if (value == null)
        throw new NullPointerException();
    return doPut(key, value, false);
}

 

首先判断value如果为null,则抛出NullPointerException,否则调用doPut方法,其实如果各位看过JDK的源码的话,应该对这样的操作很熟悉了,JDK源码里面很多方法都是先做一些必要性的验证后,然后通过调用do**()方法进行真正的操作。

doPut()方法内容较多,我们分步分析。

 

private V doPut(K key, V value, boolean onlyIfAbsent) {
    Node<K,V> z;             // added node
    if (key == null)
        throw new NullPointerException();
    // 比较器
    Comparator<? super K> cmp = comparator;
    outer: for (;;) {
        for (Node<K, V> b = findPredecessor(key, cmp), n = b.next; ; ) {

        /** 省略代码 */

 

doPut()方法有三个参数,除了key,value外还有一个boolean类型的onlyIfAbsent,该参数作用与如果存在当前key时,该做何动作。当onlyIfAbsent为false时,替换value,为true时,则返回该value。用代码解释为:

 

 if (!map.containsKey(key))
    return map.put(key, value);
else
     return map.get(key);

 

首先判断key是否为null,如果为null,则抛出NullPointerException,从这里我们可以确认ConcurrentSkipList是不支持key或者value为null的。然后调用findPredecessor()方法,传入key来确认位置。findPredecessor()方法其实就是确认key要插入的位置。

 

private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
     if (key == null)
         throw new NullPointerException(); // don't postpone errors
     for (;;) {
         // 从head节点开始,head是level最高级别的headIndex
         for (Index<K,V> q = head, r = q.right, d;;) {

             // r != null,表示该节点右边还有节点,需要比较
             if (r != null) {
                 Node<K,V> n = r.node;
                 K k = n.key;
                 // value == null,表示该节点已经被删除了
                 // 通过unlink()方法过滤掉该节点
                 if (n.value == null) {
                     //删掉r节点
                     if (!q.unlink(r))
                         break;           // restart
                     r = q.right;         // reread r
                     continue;
                 }

                 // value != null,节点存在
                 // 如果key 大于r节点的key 则往前进一步
                 if (cpr(cmp, key, k) > 0) {
                     q = r;
                     r = r.right;
                     continue;
                 }
             }

             // 到达最右边,如果dowm == null,表示指针已经达到最下层了,直接返回该节点
             if ((d = q.down) == null)
                 return q.node;
             q = d;
             r = d.right;
         }
     }
 }

 

findPredecessor()方法意思非常明确:寻找前辈。从最高层的headIndex开始向右一步一步比较,直到right为null或者右边节点的Node的key大于当前key为止,然后再向下寻找,依次重复该过程,直到down为null为止,即找到了前辈,看返回的结果注意是Node,不是Item,所以插入的位置应该是最底层的Node链表。

在这个过程中ConcurrentSkipListMap赋予了该方法一个其他的功能,就是通过判断节点的value是否为null,如果为null,表示该节点已经被删除了,通过调用unlink()方法删除该节点。

 

final boolean unlink(Index<K,V> succ) {
    return node.value != null && casRight(succ, succ.right);
}

 

删除节点过程非常简单,更改下right指针即可。

通过findPredecessor()找到前辈节点后,做什么呢?看下面:

 

for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
       // 前辈节点的next != null
       if (n != null) {
           Object v; int c;
           Node<K,V> f = n.next;

           // 不一致读,主要原因是并发,有节点捷足先登
           if (n != b.next)               // inconsistent read
               break;

           // n.value == null,该节点已经被删除了
           if ((v = n.value) == null) {   // n is deleted
               n.helpDelete(b, f);
               break;
           }

           // 前辈节点b已经被删除
           if (b.value == null || v == n) // b is deleted
               break;

           // 节点大于,往前移
           if ((c = cpr(cmp, key, n.key)) > 0) {
               b = n;
               n = f;
               continue;
           }

           // c == 0 表示,找到一个key相等的节点,根据onlyIfAbsent参数来做判断
           // onlyIfAbsent ==false,则通过casValue,替换value
           // onlyIfAbsent == true,返回该value
           if (c == 0) {
               if (onlyIfAbsent || n.casValue(v, value)) {
                   @SuppressWarnings("unchecked") V vv = (V)v;
                   return vv;
               }
               break; // restart if lost race to replace value
           }
           // else c < 0; fall through
       }

       // 将key-value包装成一个node,插入
       z = new Node<K,V>(key, value, n);
       if (!b.casNext(n, z))
           break;         // restart if lost race to append to b
       break outer;
   }

 

找到合适的位置后,就是在该位置插入节点咯。插入节点的过程比较简单,就是将key-value包装成一个Node,然后通过casNext()方法加入到链表当中。当然是插入之前需要进行一系列的校验工作。

在最下层插入节点后,下一步工作是什么?新建索引。前面博主提过,在插入节点的时候,会根据采用抛硬币的方式来决定新节点所插入的层次,由于存在并发的可能,ConcurrentSkipListMap采用ThreadLocalRandom来生成随机数。如下:

 

int rnd = ThreadLocalRandom.nextSecondarySeed();

 

抛硬币决定层次的思想很简单,就是通过抛硬币如果硬币为正面则层次level + 1 ,否则停止,如下:

 

// 抛硬币决定层次
while (((rnd >>>= 1) & 1) != 0)
    ++level;

 

在阐述SkipList插入节点的时候说明了,决定的层次level会分为两种情况进行处理,一是如果层次level大于最大的层次话则需要新增一层,否则就在相应层次以及小于该level的层次进行节点新增处理。

level <= headIndex.level

 

// 如果决定的层次level比最高层次head.level小,直接生成最高层次的index
// 由于需要确认每一层次的down,所以需要从最下层依次往上生成
if (level <= (max = h.level)) {
    for (int i = 1; i <= level; ++i)
        idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);
}

 

从底层开始,小于level的每一层都初始化一个index,每次的node都指向新加入的node,down指向下一层的item,右侧next全部为null。整个处理过程非常简单:为小于level的每一层初始化一个index,然后加入到原来的index链条中去。

level > headIndex.level

 

// leve > head.level 则新增一层
 else { // try to grow by one level
     // 新增一层
     level = max + 1;

     // 初始化 level个item节点
     @SuppressWarnings("unchecked")
     ConcurrentSkipListMap.Index<K,V>[] idxs =
             (ConcurrentSkipListMap.Index<K,V>[])new ConcurrentSkipListMap.Index<?,?>[level+1];
     for (int i = 1; i <= level; ++i)
         idxs[i] = idx = new ConcurrentSkipListMap.Index<K,V>(z, idx, null);

     //
     for (;;) {
         h = head;
         int oldLevel = h.level;
         // 层次扩大了,需要重新开始(有新线程节点加入)
         if (level <= oldLevel) // lost race to add level
             break;
         // 新的头结点HeadIndex
         ConcurrentSkipListMap.HeadIndex<K,V> newh = h;
         ConcurrentSkipListMap.Node<K,V> oldbase = h.node;
         // 生成新的HeadIndex节点,该HeadIndex指向新增层次
         for (int j = oldLevel+1; j <= level; ++j)
             newh = new ConcurrentSkipListMap.HeadIndex<K,V>(oldbase, newh, idxs[j], j);

         // HeadIndex CAS替换
         if (casHead(h, newh)) {
             h = newh;
             idx = idxs[level = oldLevel];
             break;
         }
     }

 

当抛硬币决定的level大于最大层次level时,需要新增一层进行处理。处理逻辑如下:

  1. 初始化一个对应的index数组,大小为level + 1,然后为每个单位都创建一个index,个中参数为:Node为新增的Z,down为下一层index,right为null
  2. 通过for循环来进行扩容操作。从最高层进行处理,新增一个HeadIndex,个中参数:节点Node,down都为最高层的Node和HeadIndex,right为刚刚创建的对应层次的index,level为相对应的层次level。最后通过CAS把当前的head与新加入层的head进行替换。

通过上面步骤我们发现,尽管已经找到了前辈节点,也将node插入了,也确定确定了层次并生成了相应的Index,但是并没有将这些Index插入到相应的层次当中,所以下面的代码就是将index插入到相对应的层当中。

 

// 从插入的层次level开始
  splice: for (int insertionLevel = level;;) {
      int j = h.level;
      //  从headIndex开始
      for (ConcurrentSkipListMap.Index<K,V> q = h, r = q.right, t = idx;;) {
          if (q == null || t == null)
              break splice;

          // r != null;这里是找到相应层次的插入节点位置,注意这里只横向找
          if (r != null) {
              ConcurrentSkipListMap.Node<K,V> n = r.node;

              int c = cpr(cmp, key, n.key);

              // n.value == null ,解除关系,r右移
              if (n.value == null) {
                  if (!q.unlink(r))
                      break;
                  r = q.right;
                  continue;
              }

              // key > n.key 右移
              if (c > 0) {
                  q = r;
                  r = r.right;
                  continue;
              }
          }

          // 上面找到节点要插入的位置,这里就插入
          // 当前层是最顶层
          if (j == insertionLevel) {
              // 建立联系
              if (!q.link(r, t))
                  break; // restart
              if (t.node.value == null) {
                  findNode(key);
                  break splice;
              }
              // 标志的插入层 -- ,如果== 0 ,表示已经到底了,插入完毕,退出循环
              if (--insertionLevel == 0)
                  break splice;
          }

          // 上面节点已经插入完毕了,插入下一个节点
          if (--j >= insertionLevel && j < level)
              t = t.down;
          q = q.down;
          r = q.right;
      }
  }

 

这段代码分为两部分看,一部分是找到相应层次的该节点插入的位置,第二部分在该位置插入,然后下移。

至此,ConcurrentSkipListMap的put操作到此就结束了。代码量有点儿多,这里总结下:

  1. 首先通过findPredecessor()方法找到前辈节点Node
  2. 根据返回的前辈节点以及key-value,新建Node节点,同时通过CAS设置next
  3. 设置节点Node,再设置索引节点。采取抛硬币方式决定层次,如果所决定的层次大于现存的最大层次,则新增一层,然后新建一个Item链表。
  4. 最后,将新建的Item链表插入到SkipList结构中。

get操作

相比于put操作 ,get操作会简单很多,其过程其实就只相当于put操作的第一步:

 

private V doGet(Object key) {
      if (key == null)
          throw new NullPointerException();
      Comparator<? super K> cmp = comparator;
      outer: for (;;) {
          for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
              Object v; int c;
              if (n == null)
                  break outer;
              ConcurrentSkipListMap.Node<K,V> f = n.next;
              if (n != b.next)                // inconsistent read
                  break;
              if ((v = n.value) == null) {    // n is deleted
                  n.helpDelete(b, f);
                  break;
              }
              if (b.value == null || v == n)  // b is deleted
                  break;
              if ((c = cpr(cmp, key, n.key)) == 0) {
                  @SuppressWarnings("unchecked") V vv = (V)v;
                  return vv;
              }
              if (c < 0)
                  break outer;
              b = n;
              n = f;
          }
      }
      return null;
  }

 

与put操作第一步相似,首先调用findPredecessor()方法找到前辈节点,然后顺着right一直往右找即可,同时在这个过程中同样承担了一个删除value为null的节点的职责。

remove操作

remove操作为删除指定key节点,如下:

 

public V remove(Object key) {
    return doRemove(key, null);
}

 

直接调用doRemove()方法,这里remove有两个参数,一个是key,另外一个是value,所以doRemove方法即提供remove key,也提供同时满足key-value。

 

final V doRemove(Object key, Object value) {
       if (key == null)
           throw new NullPointerException();
       Comparator<? super K> cmp = comparator;
       outer: for (;;) {
           for (ConcurrentSkipListMap.Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
               Object v; int c;
               if (n == null)
                   break outer;
               ConcurrentSkipListMap.Node<K,V> f = n.next;

               // 不一致读,重新开始
               if (n != b.next)                    // inconsistent read
                   break;

               // n节点已删除
               if ((v = n.value) == null) {        // n is deleted
                   n.helpDelete(b, f);
                   break;
               }

               // b节点已删除
               if (b.value == null || v == n)      // b is deleted
                   break;

               if ((c = cpr(cmp, key, n.key)) < 0)
                   break outer;

               // 右移
               if (c > 0) {
                   b = n;
                   n = f;
                   continue;
               }

               /*
                * 找到节点
                */

               // value != null 表示需要同时校验key-value值
               if (value != null && !value.equals(v))
                   break outer;

               // CAS替换value
               if (!n.casValue(v, null))
                   break;
               if (!n.appendMarker(f) || !b.casNext(n, f))
                   findNode(key);                  // retry via findNode
               else {
                   // 清理节点
                   findPredecessor(key, cmp);      // clean index

                   // head.right == null表示该层已经没有节点,删掉该层
                   if (head.right == null)
                       tryReduceLevel();
               }
               @SuppressWarnings("unchecked") V vv = (V)v;
               return vv;
           }
       }
       return null;
   }

 

调用findPredecessor()方法找到前辈节点,然后通过右移,然后比较,找到后利用CAS把value替换为null,然后判断该节点是不是这层唯一的index,如果是的话,调用tryReduceLevel()方法把这层干掉,完成删除。

其实从这里可以看出,remove方法仅仅是把Node的value设置null,并没有真正删除该节点Node,其实从上面的put操作、get操作我们可以看出,他们在寻找节点的时候都会判断节点的value是否为null,如果为null,则调用unLink()方法取消关联关系,如下:

 

if (n.value == null) {
    if (!q.unlink(r))
        break;           // restart
    r = q.right;         // reread r
    continue;
}

 

size操作

ConcurrentSkipListMap的size()操作和ConcurrentHashMap不同,它并没有维护一个全局变量来统计元素的个数,所以每次调用该方法的时候都需要去遍历。

 

public int size() {
    long count = 0;
    for (Node<K,V> n = findFirst(); n != null; n = n.next) {
        if (n.getValidValue() != null)
            ++count;
    }
    return (count >= Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int) count;
}

 

调用findFirst()方法找到第一个Node,然后利用node的next去统计。最后返回统计数据,最多能返回Integer.MAX_VALUE。注意这里在线程并发下是安全的。

 

 

要实现一个线程安全的队列有两种方式:阻塞和非阻塞。阻塞队列无非就是锁的应用,而非阻塞则是CAS算法的应用。下面我们就开始一个非阻塞算法的研究:CoucurrentLinkedQueue。

ConcurrentLinkedQueue是一个基于链接节点的无边界的线程安全队列,它采用FIFO原则对元素进行排序。采用“wait-free”算法(即CAS算法)来实现的。

CoucurrentLinkedQueue规定了如下几个不变性:

  1. 在入队的最后一个元素的next为null
  2. 队列中所有未删除的节点的item都不能为null且都能从head节点遍历到
  3. 对于要删除的节点,不是直接将其设置为null,而是先将其item域设置为null(迭代器会跳过item为null的节点)
  4. 允许head和tail更新滞后。这是什么意思呢?意思就说是head、tail不总是指向第一个元素和最后一个元素(后面阐述)。

head的不变性和可变性:

  • 不变性
    1. 所有未删除的节点都可以通过head节点遍历到
    2. head不能为null
    3. head节点的next不能指向自身
  • 可变性
    1. head的item可能为null,也可能不为null 2.允许tail滞后head,也就是说调用succc()方法,从head不可达tail

tail的不变性和可变性

  • 不变性
    1. tail不能为null
  • 可变性
    1. tail的item可能为null,也可能不为null
    2. tail节点的next域可以指向自身 3.允许tail滞后head,也就是说调用succc()方法,从head不可达tail

这些特性是否已经晕了?没关系,我们看下面的源码分析就可以理解这些特性了。

ConcurrentLinkedQueue源码分析

CoucurrentLinkedQueue的结构由head节点和tail节点组成,每个节点由节点元素item和指向下一个节点的next引用组成,而节点与节点之间的关系就是通过该next关联起来的,从而组成一张链表的队列。节点Node为ConcurrentLinkedQueue的内部类,定义如下:

 

private static class Node<E> {
      /** 节点元素域 */
      volatile E item;
      volatile Node<E> next;

      //初始化,获得item 和 next 的偏移量,为后期的CAS做准备

      Node(E item) {
          UNSAFE.putObject(this, itemOffset, item);
      }

      boolean casItem(E cmp, E val) {
          return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
      }

      void lazySetNext(Node<E> val) {
          UNSAFE.putOrderedObject(this, nextOffset, val);
      }

      boolean casNext(Node<E> cmp, Node<E> val) {
          return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
      }

      // Unsafe mechanics

      private static final sun.misc.Unsafe UNSAFE;
      /** 偏移量 */
      private static final long itemOffset;
      /** 下一个元素的偏移量 */

     private static final long nextOffset;

      static {
          try {
              UNSAFE = sun.misc.Unsafe.getUnsafe();
              Class<?> k = Node.class;
              itemOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("item"));
              nextOffset = UNSAFE.objectFieldOffset
                      (k.getDeclaredField("next"));
          } catch (Exception e) {
              throw new Error(e);
          }
      }
  }

 

入列

入列,我们认为是一个非常简单的过程:tail节点的next执行新节点,然后更新tail为新节点即可。从单线程角度我们这么理解应该是没有问题的,但是多线程呢?如果一个线程正在进行插入动作,那么它必须先获取尾节点,然后设置尾节点的下一个节点为当前节点,但是如果已经有一个线程刚刚好完成了插入,那么尾节点是不是发生了变化?对于这种情况ConcurrentLinkedQueue怎么处理呢?我们先看源码:

offer(E e):将指定元素插入都队列尾部:

 

public boolean offer(E e) {
        //检查节点是否为null
        checkNotNull(e);
        // 创建新节点
        final Node<E> newNode = new Node<E>(e);

        //死循环 直到成功为止
        for (Node<E> t = tail, p = t;;) {
            Node<E> q = p.next;
            // q == null 表示 p已经是最后一个节点了,尝试加入到队列尾
            // 如果插入失败,则表示其他线程已经修改了p的指向
            if (q == null) {                                // --- 1
                // casNext:t节点的next指向当前节点
                // casTail:设置tail 尾节点
                if (p.casNext(null, newNode)) {             // --- 2
                    // node 加入节点后会导致tail距离最后一个节点相差大于一个,需要更新tail
                    if (p != t)                             // --- 3
                        casTail(t, newNode);                    // --- 4
                    return true;
                }
            }
            // p == q 等于自身
            else if (p == q)                                // --- 5
                // p == q 代表着该节点已经被删除了
                // 由于多线程的原因,我们offer()的时候也会poll(),如果offer()的时候正好该节点已经poll()了
                // 那么在poll()方法中的updateHead()方法会将head指向当前的q,而把p.next指向自己,即:p.next == p
                // 这样就会导致tail节点滞后head(tail位于head的前面),则需要重新设置p
                p = (t != (t = tail)) ? t : head;           // --- 6
            // tail并没有指向尾节点
            else
                // tail已经不是最后一个节点,将p指向最后一个节点
                p = (p != t && t != (t = tail)) ? t : q;    // --- 7
        }
    }

 

光看源码还是有点儿迷糊的,插入节点一次分析就会明朗很多。

初始化

ConcurrentLinkedQueue初始化时head、tail存储的元素都为null,且head等于tail:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

添加元素A

按照程序分析:第一次插入元素A,head = tail = dummyNode,所有q = p.next = null,直接走步骤2:p.casNext(null, newNode),由于 p == t成立,所以不会执行步骤3:casTail(t, newNode),直接return。插入A节点后如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

添加元素B

q = p.next = A ,p = tail = dummyNode,所以直接跳到步骤7:p = (p != t && t != (t = tail)) ? t : q;。此时p = q,然后进行第二次循环 q = p.next = null,步骤2:p == null成立,将该节点插入,因为p = q,t = tail,所以步骤3:p != t 成立,执行步骤4:casTail(t, newNode),然后return。如下:Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

添加节点C

此时t = tail ,p = t,q = p.next = null,和插入元素A无异,如下:Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

这里整个offer()过程已经分析完成了,可能p == q 有点儿难理解,p 不是等于q.next么,怎么会有p == q呢?这个疑问我们在出列poll()中分析

出列

ConcurrentLinkedQueue提供了poll()方法进行出列操作。入列主要是涉及到tail,出列则涉及到head。我们先看源码:

 

public E poll() {
        // 如果出现p被删除的情况需要从head重新开始
        restartFromHead:        // 这是什么语法?真心没有见过
        for (;;) {
            for (Node<E> h = head, p = h, q;;) {

                // 节点 item
                E item = p.item;

                // item 不为null,则将item 设置为null
                if (item != null && p.casItem(item, null)) {                    // --- 1
                    // p != head 则更新head
                    if (p != h)                                                 // --- 2
                        // p.next != null,则将head更新为p.next ,否则更新为p
                        updateHead(h, ((q = p.next) != null) ? q : p);          // --- 3
                    return item;
                }
                // p.next == null 队列为空
                else if ((q = p.next) == null) {                                // --- 4
                    updateHead(h, p);
                    return null;
                }
                // 当一个线程在poll的时候,另一个线程已经把当前的p从队列中删除——将p.next = p,p已经被移除不能继续,需要重新开始
                else if (p == q)                                                // --- 5
                    continue restartFromHead;
                else
                    p = q;                                                      // --- 6
            }
        }
    }

 

这个相对于offer()方法而言会简单些,里面有一个很重要的方法:updateHead(),该方法用于CAS更新head节点,如下:

 

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

 

我们先将上面offer()的链表poll()掉,添加A、B、C节点结构如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

poll A

head = dumy,p = head, item = p.item = null,步骤1不成立,步骤4:(q = p.next) == null不成立,p.next = A,跳到步骤6,下一个循环,此时p = A,所以步骤1 item != null,进行p.casItem(item, null)成功,此时p == A != h,所以执行步骤3:updateHead(h, ((q = p.next) != null) ? q : p),q = p.next = B != null,则将head CAS更新成B,如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

poll B

head = B , p = head = B,item = p.item = B,步骤成立,步骤2:p != h 不成立,直接return,如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

poll C

head = dumy ,p = head = dumy,tiem = p.item = null,步骤1不成立,跳到步骤4:(q = p.next) == null,不成立,然后跳到步骤6,此时,p = q = C,item = C(item),步骤1成立,所以讲C(item)设置为null,步骤2:p != h成立,执行步骤3:updateHead(h, ((q = p.next) != null) ? q : p),如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

看到这里是不是一目了然了,在这里我们再来分析offer()的步骤5:

 

else if(p == q){
	p = (t != (t = tail))? t : head;
}

 

ConcurrentLinkedQueue中规定,p == q表明,该节点已经被删除了,也就说tail滞后于head,head无法通过succ()方法遍历到tail,怎么做? (t != (t = tail))? t : head;(这段代码的可读性实在是太差了,真他妈难理解:不知道是否可以理解为t != tail ? tail : head)这段代码主要是来判读tail节点是否已经发生了改变,如果发生了改变,则说明tail已经重新定位了,只需要重新找到tail即可,否则就只能指向head了。

就上面那个我们再次插入一个元素D。则p = head,q = p.next = null,执行步骤1: q = null且 p != t ,所以执行步骤4:,如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

再插入元素E,q = p.next = null,p == t,所以插入E后如下:

Java多线程(十一):J.U.C 之并发容器ConcurrentSkipListMap/ConcurrentLinkedQueue

到这里ConcurrentLinkedQueue的整个入列、出列都已经分析完毕了,对于ConcurrentLinkedQueue LZ真心感觉难看懂,看懂之后也感叹设计得太精妙了,利用CAS来完成数据操作,同时允许队列的不一致性,这种弱一致性确实是非常强大。再次感叹Doug Lea的天才。