ConcurrentHashMap源码分析
ConcurrentHashMap是util.concurrent 的重要成员,是Java 5中支持高并发、高吞吐量的线程安全HashMap实现。类似于Hashtable, ConcurrentHashMap是线程安全的;但是,区别在于ConcurrentHashMap使用了一种完全不同的加锁策略来提供更高的并发性和伸缩性。它并不是将每个方法都在同一个锁上同步并使得每次只能有一个线程访问容器,而是使用一种粒度更细的加锁机制来实现更大程度的共享,这也就是分段锁(Lock Striping)。在这种机制下,任意数量的读取线程可以并发地访问HashMap,并且允许一定数量的写线程并发地修改HaspMap。
ConcurrentHashMap内部划分了多个Segments,默认的并发级别是16,所以划分出了16个Segment;而每个Segment相当于一个Hash Table。它的这种内部结构使得可以并发地锁定其中的某些Segments,对这些Segment并发地写;而读操作则是完全并发的,而且读操作之前不需要加锁。下面引用一张图来描述ConcurrentHashMap的这种分段结构:
ConcurrentHashMap 类中包含两个静态内部类 HashEntry 和 Segment。HashEntry 用来封装映射表的键 / 值对;Segment 用来充当锁的角色,每个 Segment 对象守护整个散列映射表的若干个桶。每个桶是由若干个 HashEntry 对象链接起来的链表。一个 ConcurrentHashMap 实例中包含由若干个 Segment 对象组成的数组。
HashEntry类的定义
static final class HashEntry<K,V> {
final int hash; //声明hash值为final类型
final K key; //声明key为final类型
volatile V value; //声明value为volatile类型
volatile HashEntry<K,V> next; //声明next域为volatile类型
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
/**
* Sets next field with volatile write semantics.
*/
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
}
这是JDK 1.7的定义(本文的源码全部来自JDK 1.7),它定义了hash值,key、value键值对和next域。这里的next也被声明成volatile变量了,而在JDK 1.6是final的。声明成volatile的变量可以保证此变量对所有线程的可见性,也就是说线程修改了volatile变量的值,其他线程可以立刻看到新的值。ConcurrentHashMap允许多个线程并发修改HashEntry、添加或删除HashEntry,由于value和next是volatile变量,所以这种修改对于其他线程是立即可见的,读线程可以立即得到最新的修改。
另一个跟JDK 1.6不同的地方是,setNext()方法调用了Unsafe类的putOrderedObject()。Unsafe.java是jdk并发类库java.util.concurrent包中使用的底层类,该类中的方法都是通过native方法调用操作系统命令。putOrderedObject()方法,只对volatile字段有用,但是它不能保证内存对其他线程的即时可见性。
Segment类
Segment其实是Hash Table的特殊版本,它继承了ReentrantLock,优化了锁操作。 Segments维护着一组Hash桶的头结点,通过这个头结点可以遍历整个Hash桶对应的链表。这个类的读操作不需要加锁,写操作则需要先获得锁,再执行写操作,然后释放锁。这里锁是可控的自旋机制,这种机制通过Segment的scanAndLock()和scanAndLockForPut()方法来实现,进一步减少了加锁的可能性。
先看一下scanAndLockForPut()和scanAndLock()这两个方法的实现:
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}
在scanAndLockForPut()方法中,首先根据hash值得到Segment中Hash桶的头结点,然后在while循环中不断尝试加锁。这里完全有可能尝试一次就获得了锁,或者尚未遍历完Hash桶的链表并且没找到key对应的HashEntry的情况下就获得了锁,这种两种情况下会该方法会返回null。
如果遍历完Hash桶,但是没找到key对应的HashEntry,则创建一个新的HashEntry结点,将retries设置成0。如果在Hash桶中找到了key对应的HashEntry结点,同样会把retries设置成0。一旦retries从-1变成0,就表明对Hash桶中链表的遍历已经结束。接下来,会不断尝试加锁,直到尝试的次数达到了最大限制(MAX_SCAN_RETRIES),才会进入等待的状态,这就是所谓的自旋等待。MAX_SCAN_RETRIES是根据系统是否是核来决定的,单核系统最大限制次数为1,多核系统最大限制次数为64。这自旋等待到什么时候结束呢?直到真正获得了锁,才退出while循环,所以scanAndLockForPut()方法是可以保证获得了锁的。
另外要注意的是,如果在自旋的过程中检测到Hash桶的头结点发生了变化,需要获取新的Hash桶头结点,将retries设成-1,重新为获取锁而进行自旋遍历。
private void scanAndLock(Object key, int hash) {
// similar to but simpler than scanAndLockForPut
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
int retries = -1;
while (!tryLock()) {
HashEntry<K,V> f;
if (retries < 0) {
if (e == null || key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f;
retries = -1;
}
}
}
scanAndLock()方法是用于Segment的remove()和replace()方法的,主要的思路跟scanAndLockForPut()基本一致,主要区别就是在没找到key对应的HashEntry结点时不需要创建新的HashEntry结点。无论是否遍历到key对应的HashEntry结点,最后都会将retries设置成-1,进入自旋等待。尝试加锁的过程,Hash桶头结点改变后为获取锁而重新进行自旋遍历的实现逻辑与scanAndLockForPut()是一样的。在这个方法返回的时候,它可以保证已经获得了锁,而且即使没找到key对应的HashEntry结点,也一样需要执行加锁操作,从而保证写操作的一致性。
接着分析一下Segment里面的put(), rehash(), remove(), replace()和clear()方法:
put()操作
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}
put()方法一开始就会尝试获取锁,如果成功得到了锁,就不再需要执行scanAndLockForPut()了,否则就执行这个方法保证能够取到锁。然后根据hash值计算出Segment中相应的Hash桶的头结点。
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i,
HashEntry<K,V> e) {
UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e);
}
static final <K,V> HashEntry<K,V> entryAt(HashEntry<K,V>[] tab, int i) {
return (tab == null) ? null :
(HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)i << TSHIFT) + TBASE);
}
根据hash值计算出Hash桶的头结点之后,就开始遍历Hash桶的链表。如果找到key对应的HashEntry结点,则将它的value值赋值给oldValue;如果这不是putIfAbsent,则修改该结点的value,同时modCount自增1,put操作完毕,释放锁。如果遍历的过程中找不到key对应的HashEntry结点,说明这是一个新结点,需要添加到这个Hash桶中。put()方法获取了锁,但是新结点可能在scanAndLockForPut()中创建出来了,当然也可能返回null。所以,在put()方法中,要检测一下这个新结点是不是null。如果不是null,说明它已经在scanAndLockForPut()准备好了,直接将它的next域指向当前Hash桶的头结点;否则,需要生成一个新的HashEntry结点,并将它的next域指向当前Hash桶的头结点。至此,插入新结的操作已经完成了一半。接着,计算Hash桶链表的结点个数,如果它超过了阀值并且tab数组长度没超过最大容量,就需要做rehash();如果个数不超过阀值,就把新的HashEntry结点当作头结点插入到当前Hash桶中。这里使用setEntryAt()操作以实现对链头的延时写,以提升性能,因为此时并不需要将该更新写入到内存,而在锁退出后该更新自然会写入内存。最后,modCount自增1,更新count变量和oldValue变量,退出循环,put操作完毕,释放锁。
private void rehash(HashEntry<K,V> node) {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
int newCapacity = oldCapacity << 1;
threshold = (int)(newCapacity * loadFactor);
HashEntry<K,V>[] newTable =
(HashEntry<K,V>[]) new HashEntry[newCapacity];
int sizeMask = newCapacity - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null) // Single node on list
newTable[idx] = e;
else { // Reuse consecutive sequence at same slot
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// Clone remaining nodes
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
}
}
}
}
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
在rehash()方法里面,将容器的容量扩大为原来的2倍,重新计算阀值,然后创建一个长度是原来的table数组2倍的新数组。接着遍历每一个table数组项,也就是Hash桶,以及Hash桶对应的HashEntry链表。对每个节点重新计算它的数组索引,然后创建一个新的节点插入到新数组。重新创建一个新节点,而不是直接修改已有节点的next域,其实是为了在做rehash过程中能够保证其他线程的遍历操作可以正常在原有的链表上正常进行。final V remove(Object key, int hash, Object value) {
if (!tryLock())
scanAndLock(key, hash);
V oldValue = null;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> e = entryAt(tab, index);
HashEntry<K,V> pred = null;
while (e != null) {
K k;
HashEntry<K,V> next = e.next;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
V v = e.value;
if (value == null || value == v || value.equals(v)) {
if (pred == null)
setEntryAt(tab, index, next);
else
pred.setNext(next);
++modCount;
--count;
oldValue = v;
}
break;
}
pred = e;
e = next;
}
} finally {
unlock();
}
return oldValue;
}
在JDK 1.6版本中,remove操作先找到key对应的Hash桶的头结点,然后遍历该Hash桶链表,如果在链表中找到key对应的结点,则为该结点之前的所有结点重新创建结点并组成一条新链表,将该新链表的尾结点指向找到结点的下一个结点。这样就会同时存在两条链表,即使有另一个线程正在该链表上遍历也不会出问题。final boolean replace(K key, int hash, V oldValue, V newValue) {
if (!tryLock())
scanAndLock(key, hash);
boolean replaced = false;
try {
HashEntry<K,V> e;
for (e = entryForHash(this, hash); e != null; e = e.next) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
if (oldValue.equals(e.value)) {
e.value = newValue;
++modCount;
replaced = true;
}
break;
}
}
} finally {
unlock();
}
return replaced;
}
replace()是ConcurrentHashMap新添加的接口,它与put()的区别是: put()方法如果没找到与key相应的HashEntry结点,就会添加一个新的结点,而replace()方法则不会这样做,它只会在找与key相应的HashEntry结点才会更新它的值,如果找不到,就不会做任何操作,返回false。同样,replace()方法也通过tryLock()或者scanAndLock()获得锁,完成操作之后,释放锁。 final void clear() {
lock();
try {
HashEntry<K,V>[] tab = table;
for (int i = 0; i < tab.length ; i++)
setEntryAt(tab, i, null);
++modCount;
count = 0;
} finally {
unlock();
}
}
clear()操作不像其他的put(), remove(), replace()那样通过scanAndLock()或者scanAndLockForPut()来加锁,它直接调用ReentrantLock的lock()方法获得锁,从而不存在自旋等待的过程。这有可能是因为它不像其他操作只是对table中的一个Hash桶链表操作,而是需要对整个table进行操作,因而需要等到所有在table上操作的线程退出后才能执行。对一个Hash桶链表操作的线程执行得比较快,因而自旋可以后获得锁的可能性比较大,对table操作的等待相对要比较久,因而自旋等待意义不大。clear()操作只是将table数组的每个项设置为null,将count变量重置为0。另外,注意到它使用了setEntryAt的延迟设置,从而能够保证其他读线程的正常工作。private Segment<K,V> ensureSegment(int k) {
final Segment<K,V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // raw offset
Segment<K,V> seg;
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K,V> proto = ss[0]; // use segment 0 as prototype
int cap = proto.table.length;
float lf = proto.loadFactor;
int threshold = (int)(cap * lf);
HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) { // recheck
Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
== null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
break;
}
}
}
return seg;
}
该方法返回给定索引位置的Segment,如果Segment不存在,则参考Segment表中的第一个Segment的参数创建一个Segment并通过CAS操作将它记录到Segment表中去。public int size() {
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
final Segment<K,V>[] segments = this.segments;
int size;
boolean overflow; // true if size overflows 32 bits
long sum; // sum of modCounts
long last = 0L; // previous sum
int retries = -1; // first iteration isn't retry
try {
for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
sum = 0L;
size = 0;
overflow = false;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
if (sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
首先看看RETRIES_BEFORE_LOCK,它是在未获行锁之前非同步的情况下尝试计算的次数,这个常量在size()和containsValue()中都用到了。这是用来避免在有其他线程不断地对Hash桶及其链表进行修改的情况下无限次尝试计算而设计的。public boolean containsValue(Object value) {
// Same idea as size()
if (value == null)
throw new NullPointerException();
final Segment<K,V>[] segments = this.segments;
boolean found = false;
long last = 0;
int retries = -1;
try {
outer: for (;;) {
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
ensureSegment(j).lock(); // force creation
}
long hashSum = 0L;
int sum = 0;
for (int j = 0; j < segments.length; ++j) {
HashEntry<K,V>[] tab;
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null && (tab = seg.table) != null) {
for (int i = 0 ; i < tab.length; i++) {
HashEntry<K,V> e;
for (e = entryAt(tab, i); e != null; e = e.next) {
V v = e.value;
if (v != null && value.equals(v)) {
found = true;
break outer;
}
}
}
sum += seg.modCount;
}
}
if (retries > 0 && sum == last)
break;
last = sum;
}
} finally {
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return found;
}
containsValues()方法跟size()方法的设计思路是一致的,都是在非同步的情况下先尝试计算3次,如果未能得到结果,就在所有Segment加锁的情况下,再计算,直到获得正确的结果。跟size()不同的地方是其中的计算逻辑,它遍历了每个Hash桶的链表去查找HashEntry结点的value是否和给定的value相等。如果相等,则返回true。如果遍历完所有Hash桶都找不到给定的value,则返回false。contains()其实就是简单地调用了containsValue(),不再赘述。public boolean isEmpty() {
long sum = 0L;
final Segment<K,V>[] segments = this.segments;
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum += seg.modCount;
}
}
if (sum != 0L) { // recheck unless no modifications
for (int j = 0; j < segments.length; ++j) {
Segment<K,V> seg = segmentAt(segments, j);
if (seg != null) {
if (seg.count != 0)
return false;
sum -= seg.modCount;
}
}
if (sum != 0L)
return false;
}
return true;
}
跟containsValue()等方法一样,isEmpty()也汇总每个Segment的修改次数并比较前后两次的汇总结果,从而可以避免在查检一个Segment的时候另一个Segment添加或删除了HashEntry结点。如果在检查某个Segment的时候发现它的count不是0,就直接返回false。另外,这个方法并没有加锁操作,它是在非同步的状态下执行的。public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
get()方法并不需要获得锁,ConcurrentHashMap的读操作是完全并发的。get()方法的实现比较简单,它首先根据key计算hash值,然后通过hash值计算Segment的索引,通过UnSafe.getObjectVolatile()方法找到Segment,最后对Hash桶的链表进行遍历查找key,找到就返回key对应的value。如果找不到,返回null。abstract class HashIterator {
int nextSegmentIndex;
int nextTableIndex;
HashEntry<K,V>[] currentTable;
HashEntry<K, V> nextEntry;
HashEntry<K, V> lastReturned;
HashIterator() {
nextSegmentIndex = segments.length - 1;
nextTableIndex = -1;
advance();
}
/**
* Set nextEntry to first node of next non-empty table
* (in backwards order, to simplify checks).
*/
final void advance() {
for (;;) {
if (nextTableIndex >= 0) {
if ((nextEntry = entryAt(currentTable,
nextTableIndex--)) != null)
break;
}
else if (nextSegmentIndex >= 0) {
Segment<K,V> seg = segmentAt(segments, nextSegmentIndex--);
if (seg != null && (currentTable = seg.table) != null)
nextTableIndex = currentTable.length - 1;
}
else
break;
}
}
final HashEntry<K,V> nextEntry() {
HashEntry<K,V> e = nextEntry;
if (e == null)
throw new NoSuchElementException();
lastReturned = e; // cannot assign until after null check
if ((nextEntry = e.next) == null)
advance();
return e;
}
public final boolean hasNext() { return nextEntry != null; }
public final boolean hasMoreElements() { return nextEntry != null; }
public final void remove() {
if (lastReturned == null)
throw new IllegalStateException();
ConcurrentHashMap.this.remove(lastReturned.key);
lastReturned = null;
}
}
HashIterator是KeyIterator、ValueIterator、EntryIterator父类,其子类的方法大都是基于HashIterator进行的,比较简单。这里关键理解一下advance()方法。初始化HashIterator的时候,将nextSegmentIndex初始值设为最后一个Segment的index,将nextTableIndex的初始值设为-1,然后调用advance()方法,将nextEntry设为第一个非空Segment的第一个非空Hash桶的头结点。 在nextEntry()方法中,通过advance()方法找到下一个非空Hash桶的头结点。