HashMap 和 ConcurrentHashMap 源码解析(JDK1.8)(红黑树部分没有解析)
以前也有看过 HashMap 的源码,但看的比较浅,这次对比 ConcurrentHashMap 一起再学习一遍。
HashMap 部分:
看了许多博客对比分析觉得这一篇还不错,参考了其中的解析思维
HashMap的结构图:数据结构图
首先我们先把 HashMap 中的成员变量看一下
/**默认的初始化长度,1<<4,位运算左移四位,乘以2的4次方为16*/
static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
/**map的最大容量值,必须是2的幂次*/
static final int MAXIMUM_CAPACITY = 1 << 30;
/**默认的负载因子*/
static final float DEFAULT_LOAD_FACTOR = 0.75f;
/**链表转换为红黑树的临界值*/
static final int TREEIFY_THRESHOLD = 8;
/**红黑树转换成链表的临界值*/
static final int UNTREEIFY_THRESHOLD = 6;
/**树形化最小hash表元素个数,如果桶内元素已经达到转化红黑树阈值,
* 但是表元素总数未达到阈值,则值进行扩容,不进行树形化
* 暂时不理解*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**存储值的Node数组*/
transient Node<K,V>[] table;
/**map的大小*/
transient int size;
/**该map被修改的次数*/
transient int modCount;
/**扩容的临界值*/
int threshold;
/**装载的负载因子*/
final float loadFactor;
大概的成员变量就是这些,threshold 扩容临界值是 HashMap 扩容的重要变量。
接下来我们看一下 HashMap的构造函数:
public HashMap() {
this.loadFactor = DEFAULT_LOAD_FACTOR;
}
public HashMap(Map<? extends K, ? extends V> m) {
this.loadFactor = DEFAULT_LOAD_FACTOR;
putMapEntries(m, false);
}
public HashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR);
}
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY) // 初始化长度大于最大值,则把初始化长度置为最大值
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor; // 负载因子符合规则就赋值
this.threshold = tableSizeFor(initialCapacity); // tableSizeFor 方法是将不规则的长度变为 2^n 次方
}
第一个是空构造,不会初始化长度,只会初始化默认负载因子
第二个直接传 Map
第三个调用了第四个构造,传入默认负载因子
第四个构造会对传入的 Map 长度做处理,转换成 2^n 次方
下面看一下 HashMap 中比较常用的一些方法
put方法
public V put(K key, V value) {
// 调用putVal方法
return putVal(hash(key), key, value, false, true);
}
put方法实际上是调用了内部的putVal方法,如下:
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
// 判断是否已经加载table,使用的是懒加载,第一次put后才加载的
if ((tab = table) == null || (n = tab.length) == 0)
// 如果为空,执行resize方法,将加载默认长度的table
n = (tab = resize()).length;
// 计算hash桶位置有没有值
if ((p = tab[i = (n - 1) & hash]) == null)
// 没有值就把传进来的key-value放入该hash桶位置
tab[i] = newNode(hash, key, value, null);
else {// hash桶位置处已经存在key-value值,产生hash冲突,该位置的key-value值赋值给了P
Node<K,V> e; K k;
// 如果p和传入key的hash值相同,并且key值也相同,则将旧的值替换为新的值
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
// 如果P是红黑树节点,使用红黑树方式添加节点,如果节点已存在,返回该节点,如果不存在,返回null
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {// p节点既不是链表首节点也不是红黑树节点
// 遍历链表
for (int binCount = 0; ; ++binCount) {
// 获取每一个节点赋值给e
if ((e = p.next) == null) {
// 如果该节点是空节点,将传入的key-value作为新值存入该位置
p.next = newNode(hash, key, value, null);
// 判断是否需要将链表转换成红黑树
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
// 如果不为空,判断hash和key值是否都相同,如果相同则则为重复节点直接跳出循环
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
// 将e赋值给p,并进入下一个循环,下一个循环的e = p.next
p = e;
}
}
// 判断是否存在重复元素,前面代码中如果存在重复值会将旧值赋值给到e,如果没有则为空
if (e != null) { // existing mapping for key
// 获取旧值
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
// 新的value值赋值给该节点
e.value = value;
// HashMap中未实现该方法,LinkedHashMap中有实现,暂时不清楚
afterNodeAccess(e);
return oldValue;
}
}
// 如果不是替换重复元素而是添加
// 修改计数器加1
++modCount;
// 如果增加后的大小大于扩容的临界值就调用扩容方法
if (++size > threshold)
resize();
// HashMap中未实现该方法,LinkedHashMap中有实现,暂时不清楚
afterNodeInsertion(evict);
return null;
}
putVal方法比较长,但是总结下来就是几点:
1.判断是否初始化过了,没有就去初始化,初始化完成之后再 put 元素。
2.判断 hash 值与数组长度取模的位置是否存在节点,没有就直接 put ,有的话就比较该位置处的节点和将要 put 的节点 hash 和 key 是否都相等,相等就会把该位置处节点赋值给 e ,一个临时变量,用来存放重复的元素。
3.如果该 hash 位置处的节点不相等,说明 put 进来的节点可能在链表或者红黑树中,hash 位置处的头节点是红黑树节点,则会按照红黑树处理,红黑树部分暂时不分析,剩下就只可能是链表了。
4.遍历链表,直到找到重复元素,就把该节点赋值给 e 跳出循环,否则一直循环完毕,如果依然没有重复元素,则补在链表尾。
5.判断 e 是否为空,不为空说明有重复元素,将新 value 赋值给该节点,返回旧 value 值。
6.修改次数 +1 ,size +1。
其中在 hash 取模过程中,我发现了 & 位与运算的神奇之处,如果 一个数 l 等于 2^n 次方,那么 hash % l == hash & (l - 1)。因为之前在判断某个 hash 值,应该放到数组中哪个位置时,并没有用到 % 运算,查了一些资料发现位运算的速度是成倍于 % 运算的。
我放上我看的一篇关于 &符号 的博客 供大家学习。
看完putVal,可能你会疑惑 resize 扩容方法到底做了什么操作,那么接下来就一起看一下 resize 方法到底做了什么事。
final Node<K,V>[] resize() {
// 把Map中原Node数组赋值给oldTab
Node<K,V>[] oldTab = table;
// 获取原table长度赋值给oldCap,预防空指针异常
int oldCap = (oldTab == null) ? 0 : oldTab.length;
// 获取原扩容临界值赋值给oldThr
int oldThr = threshold;
int newCap, newThr = 0;
// 原数组长度大于0也就是不是首次初始化数组
if (oldCap > 0) {
// 如果原数组大于等于最大容量值
if (oldCap >= MAXIMUM_CAPACITY) {
// 扩容临界值赋值为整数最大值,返回
threshold = Integer.MAX_VALUE;
return oldTab;
}
// 扩容为原数组长度的两倍,新数组长度要小于最大容量值并且原长度要大于等于默认长度
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
// 临界值也变为原来的两倍
newThr = oldThr << 1; // double threshold
}
// 如果数组长度为0,但是有扩容临界值,可能是原来有值,但是被删除了,导致长度为0
else if (oldThr > 0) // initial capacity was placed in threshold
// 将长度赋值为当前的临界值
newCap = oldThr;
// 首次初始化,既没有长度,扩容临界值也为0
else { // zero initial threshold signifies using defaults
// 长度和扩容临界值都赋值为默认值
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
// 如果原长度小于默认长度则会进入该判断
if (newThr == 0) {
// 新长度乘以加载因子
float ft = (float)newCap * loadFactor;
// 新长度和计算后的扩容临界值都小于最大容量,赋值给新的扩容临界值,否则,新临界值赋值为整数最大值
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
// 设置新的扩容临界值
threshold = newThr;
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
// 设置新长度的Node数组
table = newTab;
// 如果原数组不为空
if (oldTab != null) {
// 遍历原数组
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
// 获取每一个Node,并判断是否为空,如果为空,则进行下一个循环
if ((e = oldTab[j]) != null) {// 不为空,先用e保存,然后将Node移动到新数组中
// 将原数组该位置置为空,方便数组的回收,释放无用资源
oldTab[j] = null;
// 判断是否有next,如果没有,说明该位置处没有出现hash冲突,只有一个Node
if (e.next == null)
// 计算hash值在新数组的位置,并且存入新位置
newTab[e.hash & (newCap - 1)] = e;
// 如果e是红黑树TreeNode节点
else if (e instanceof TreeNode)
// 使用红黑树的处理方式,暂不清楚
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
// 遍历链表
do {
next = e.next;
/* 位与运算,可以百度一下,因为数组长度一定是2的幂次方,故二进制中是100...00这样的形式
跟原数组长度做位与运算,如果结果为0,则表示该hash值与新长度取模的值不会变化,
该值在新数组中的位置和原数组中的位置一致,可以直接放入低位链表中
*/
if ((e.hash & oldCap) == 0) {
// loTail是地位链表的尾,如果为空,表示链表为空,把头节点指向该元素
if (loTail == null)
loHead = e;
else
// 不为空表示链表有值,把链表尾的next指向该元素
loTail.next = e;
//把链表尾指向该元素
loTail = e;
}
/* 如果该hash值与原数组长度的位与运算结果不为0,则表示该hash值与新长度取模会发生变化
该值在新数组中的位置是原数组的位置加上原数组长度,放入高位链表中
*/
else {
// 跟低位链表操作一致
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);//e 赋值为下一个节点
// 低位链表不为空时,将低位链表的尾节点的next置为空,然后将新数组中该位置指向低位链表的头节点
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
// 高位链表不为空时,将高位链表的尾节点的next置为空,然后将新数组中j + oldCap的位置指向高位链表的头节点
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
其实 resize 方法的逻辑很简单,第一个是判断扩容后的长度是否符合长度的规定,不符合就调整。第二步,调整完长度,创建好新数组后,需要对原有的链表或者红黑树重新分配位置,因为一般来说新长度为原来长度的两倍,所以 hash 取模后的位置有可能发生变化,故对整个 HashMap 元素重新排位置。
这中间用了大量的 & 位运算,中间有一个 hash & oldCap 的操作,我自己实验后发现,他的数值会与自身长度相关,比如默认长度16为原长度,hash值从0向上递增,操作的结果会非常规律,16个0和16个16交替出现,而且如果结果为0的 hash 与原长度的两倍取模时和原来一致,而结果为16的 hash 与原长度的两倍取模时结果为 原值加上原长度 ,你们自己也可以尝试一下。
这样就可以把原链表中可能会发生位置改变的节点 和 不会发生改变的节点 分成两个链表。分别是 低位链表 和 高位链表。将原位置处指向低位链表的头节点,原位置加上原长度位置处指向高位链表的头节点,那么就可以把 长度改变带来的 hash 位置不一致的问题解决。
看完了 put 和 resize 方法 我们来看一下 get方法,相比于前面的两个方法,get会容易很多。
public V get(Object key) {
// 临时变量
Node<K,V> e;
return (e = getNode(hash(key), key)) == null ? null : e.value;
}
final Node<K,V> getNode(int hash, Object key) {
// 临时变量
Node<K,V>[] tab; Node<K,V> first, e; int n; K k;
// 数组为空或者不存在该hash值对应的节点直接返回空
// 否则将hash值对应位置的节点赋值给first
if ((tab = table) != null && (n = tab.length) > 0 &&
(first = tab[(n - 1) & hash]) != null) {
// 首先检查是否就是数组该位置处节点,是就直接返回该节点
if (first.hash == hash && // always check first node
((k = first.key) == key || (key != null && key.equals(k))))
return first;
// 该位置处产生了hash冲突,有链表或者红黑树结构
if ((e = first.next) != null) {
// 节点为TreeNode,红黑树处理
if (first instanceof TreeNode)
return ((TreeNode<K,V>)first).getTreeNode(hash, key);
do {// 否则循环链表
// 链表中hash值和key值都相同则返回该节点,否则未找到,返回null
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
return e;
} while ((e = e.next) != null);
}
}
return null;
}
跟 put 方法一样,他也会调用内部的getNode方法去处理
逻辑也很简单,第一,是查看数组是否是有值,并且 hash 对应位置处是否有值,这里也用到了 & 运算。没有就会直接返 null。第二,找该位置的节点是否就是要找的节点,是就返回该节点,不是就从链表或者红黑树中找。第三,遍历链表,直到找到为止,找到就返回节点,否则返 null。
最后看一下remove方法
public V remove(Object key) {
// 临时变量
Node<K,V> e;
// removeNode方法,第一个参数是key的hash值,第二个是key值,第三个是value值
// 第四个参数是是否删除值而不是节点,一般为false,第五个参数表示删除后是否移动节点,红黑树使用
return (e = removeNode(hash(key), key, null, false, true)) == null ?
null : e.value;
}
final Node<K,V> removeNode(int hash, Object key, Object value,
boolean matchValue, boolean movable) {
Node<K,V>[] tab; Node<K,V> p; int n, index;
// 数组不为空时,将对应hash值所在位置的Node赋值给p
if ((tab = table) != null && (n = tab.length) > 0 &&
(p = tab[index = (n - 1) & hash]) != null) {
Node<K,V> node = null, e; K k; V v;
// 如果hash值相等并且key值也相同,将node赋值为p
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
node = p;
else if ((e = p.next) != null) {//存在hash冲突该位置不止一个值
// p如果是TreeNode节点,按红黑树处理
if (p instanceof TreeNode)
node = ((TreeNode<K,V>)p).getTreeNode(hash, key);
else {// 遍历链表
do {
// 如果hash值相等并且key值也相同,将node赋值为e,结束循环
if (e.hash == hash &&
((k = e.key) == key ||
(key != null && key.equals(k)))) {
node = e;
break;
}
// p赋值为e
p = e;
} while ((e = e.next) != null);
}
}
// node如果不为空,则有对应的需要删除的值,正常情况下!matchValue都为true
if (node != null && (!matchValue || (v = node.value) == value ||
(value != null && value.equals(v)))) {
// 如果是TreeNode,使用红黑树删除操作
if (node instanceof TreeNode)
((TreeNode<K,V>)node).removeTreeNode(this, tab, movable);
// 相等则说明该值是在数组中,而非链表中,直接将该节点的下一个节点放到数组的该位置上即可
else if (node == p)
tab[index] = node.next;
// 不等,说明在链表中,将要删除的节点的上一个节点的next指向待删除节点的下一个节点即可
else
p.next = node.next;
// 修改计数器加1
++modCount;
// 长度减1
--size;
afterNodeRemoval(node);
// 返回删除的节点
return node;
}
}
// 不存在要删除的值,返回null
return null;
}
remove方法跟get方法很像,只不过remove方法找到元素后,会将该节点的上一个节点的next指向该节点的下一个节点。
HashMap 部分的源码就写到这里了,主要说了一下比较常用的 put ,get,remove方法 以及 一个 resize 扩容方法。如果有兴趣的话可以自己多研究研究。
ConcurrentHashMap部分:
ConcurrentHashMap的数据结构和HashMap一致,不同的是,ConcurrentHashMap使用了CAS算法和synchronized来控制多线程的数据安全问题。所以这一部分,会更加的难以理解,所以我查了比较多的资料来加强我的理解,在此罗列一下:
https://blog.****.net/u010412719/article/details/52145145 ConcurrentHashMap源码
https://blog.****.net/dou_yuan/article/details/77773530 ConcurrentHashMap源码
CAS算法;unsafe.compareAndSwapInt(this, valueOffset, expect, update); CAS(Compare And Swap),意思是如果valueOffset位置包含的值与expect值相同,则更新valueOffset位置的值为update,并返回true,否则不更新,返回false。
当然不止是这一些资料,之后分析到,我再贴出来给大家,方便理解。
我依然会使用分析 HashMap 一样的方式给大家 看一看 ConcurrentHashMap 的源码
首先就是他的成员变量,ConcurrentHashMap中大部分的变量跟 HashMap 一样, 不过有一个变量必须要提前说一下,方便之后看源码能更加快的理解
/**sizeCtl是控制标识符,不同的值表示不同的意义。
* 负数代表正在进行初始化或扩容操作 ,其中-1代表正在初始化 ,-N 表示有N-1个线程正在进行扩容操作
* 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,类似于扩容阈值。
* 它的值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。*/
private transient volatile int sizeCtl;
ConcurrentHashMap的构造函数:
public ConcurrentHashMap() {
}
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1);
}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
看完构造函数,你会发现和 HashMap 基本一致,就不多说了。
我们重点看他的 put 方法
public V put(K key, V value) {
// 调用了putVal方法
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
// 计算hash值,两次hash操作,
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环,没有跳出条件
for (Node<K,V>[] tab = table;;) {
// 临时变量
Node<K,V> f; int n, i, fh;
// 如果没有初始化,则执行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
/* 如果tab[i]位置处没有值,直接使用casTabAt方法存储在该位置,然后跳出循环
如果tab[i]处有值,把值赋值给f
*/
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// f的hash值如果为MOVED(-1),则说明正在扩容,则帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {// f的hash值不为MOVED,那么没有线程在进行扩容操作
// 创建旧值临时变量
V oldVal = null;
// 锁定f为头节点的hash链表
synchronized (f) {
if (tabAt(tab, i) == f) {// 重复检查,避免多线程造成数据不一致
if (fh >= 0) {// 链表节点
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {// 遍历链表
K ek;
// 如果hash值和key值都相同,则将该值赋值给旧值临时变量
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
// 一般的put操作,!onlyIfAbsent的结果都为true,会把新的value赋值给该节点,结束循环
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
// 当前节点为尾节点时进入,并且意味着链表中不存在传入的key,将尾节点的next指向新的Node,结束循环
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {// 红黑树节点
Node<K,V> p;
binCount = 2;
// 使用红黑树节点方式添加
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// binCount不为0代表插入成功
if (binCount != 0) {
// 如果binCount大于链表转红黑树临界值,则调用treeifyBin,变为红黑树结构
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)// 旧值临时变量不为空返回旧值
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
我把我在看这个源码时的一些疑问或者见解分享一下。
第一,看到 putVal 方法,你会发现你遇到了一个死循环,这是 ConcurrentHashMap 中遇到的第一个死循环,这个死循环是因为在多线程环境下,线程进来修改 map 发现要修改的桶被其他线程锁住了,那只能不断地循环来抢这个桶的锁。
第二,抢到锁以后,又重复的检查了一次,此时桶中元素与我抢到锁的 f 是否还是一致的。这是因为之前线程已经获取到桶节点元素并且赋值给了 f ,但是他抢到 f 锁的过程中桶节点的值却已经被修改过了,那么此时使用 f 势必会造成线程安全问题,导致数据异常,所以要再次判断是否是正确的数据,如果不是就会重新抢锁,直到数据正常,再进行操作。
其他的放节点部分的逻辑和 HashMap 是比较类似的,我就不做过多的分析了。
接下来我们看一下 putVal 中出现的两个方法,分别是 initTable 和 addCount 方法
initTable:
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 死循环
while ((tab = table) == null || tab.length == 0) {
// sizeCtl < 0 表示已有线程在进行初始化
if ((sc = sizeCtl) < 0)
// 停止初始化,让出服务器,进行旋转
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// CAS操作将sizeCtl置为-1,如果失败,则再循环一次
try {
/* 再次判断是否已经初始化过了
这里再次判断的原因是因为多线程环境下
可能会有多个线程进到了循环,而且到了else这里
如果不再次判断可能会初始化两次以上
*/
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);// 减去1/4, 位运算自己百度,或者看上面有给一篇博客
}
} finally {
// 把扩容临界值赋值给sizeCtl控制符,结束循环
sizeCtl = sc;
}
break;
}
}
return tab;
}
这是ConcurrentHashMap中初始化数组的方法,跟HashMap不同的是,扩容和初始化分成了两个方法。initTable方法里面再次出现了死循环+重复判断的逻辑,保证table只被初始化一次。
addCount :
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 这一部分,没看懂,望有明白的大牛解释解释
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {// 修改 baseCount 值,加 1
CounterCell a; long v; int m;
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
// s 赋值为 Map 的 size
s = sumCount();
}
// 如果 putVal 方法过来的,肯定是 true
if (check >= 0) {
// 临时变量
Node<K,V>[] tab, nt; int n, sc;
/* map 的实际长度大于等于扩容临界值
并且 table 不为空,并且 table 长度小于最大容量
*/
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据长度得到一个标识
int rs = resizeStamp(n);
if (sc < 0) {// sc < 0 说明正在扩容
/* 1.如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
2.如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)
(默认第一个线程设置 sc == rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
3.如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
4.如果 nextTable == null(结束扩容了)
5.如果 transferIndex <= 0 (转移状态变化了)
结束循环
*/
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,则 sc 加 1 ,扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 没有扩容,更新 sc 标识符左移 16 位 然后 + 2 变为一个负数,扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
addCount方法在我当时看的时候是特别的懵,所以我看了比较多的关于addCount方法的博客
https://blog.****.net/weixin_34032779/article/details/87363128
https://blog.****.net/weixin_34034261/article/details/87564963
addCount方法就是在判断是不是正在扩容,如果是,就在可以帮助扩容的情况下帮助扩容
否则就自己扩容。
看完 addCount,我们最后看一下它里面的 transfer 方法
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// cpu 数 > 1 ,tab 长度 / 8 / cpu 数 结果如果小于 16 则使用 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 等于空,说明未初始化新数组
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];// 扩容为原来的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容可能会产生 OOME ,把 sizeCtl 赋值为整数最大值
sizeCtl = Integer.MAX_VALUE;
return;
}
// 把新数组赋值给成员变量 nextTable 此变量只在扩容时有用, 其他时间都是 null
nextTable = nextTab;
// 把数组长度赋值给 transferIndex
transferIndex = n;
}
// 新数组长度
int nextn = nextTab.length;
// 特殊的占位节点,如果其他线程发现该类型节点,则跳过
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
/* i >= bound 说明任务还没有完成,继续进行扩容操作
finishing 为 true 说明已经扩容完成,将 advance 置为 false 跳出当前死循环
*/
if (--i >= bound || finishing)
advance = false;
/* 如果此判断成立,说明没有区间需要扩容,此线程不用再做扩容操作
把 i 置为 -1,advance 置为 false 跳出当前死循环
i 置为 -1 是为了下面的 if 中结束扩容做判断
*/
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 修改 transferIndex 的值,每次减去一个区间长度
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 当前线程能处理区间的最小值
bound = nextBound;
// 给 i 赋值,当前线程能处理区间的最大值
i = nextIndex - 1;
advance = false;
}
}
/* 此处有三个条件
1.i < 0:一般出现在当前线程将负责区间内桶内元素都扩容完成会递减为 -1,或者是map所有的桶都已经被线程处理,
导致 transferIndex 为 0,会将 i 置为-1 上面注释有提到 新进来的线程直接结束扩容方法
2.i >= n:一般 i 初始赋值为 n - 1,并且逐次递减,暂时不清楚此条件什么时候会满足
3.i + n >= nextn:如上一条所述,前两值相加应该是不会大于等于新数组长度的,新数组长度为原来的两倍,不清楚什么条件下满足
*/
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
/* 判断是否已经结束扩容
成立:
1.将成员变量中的 nextTable 置为 null,结束扩容后 nextTable 不再使用
2.将新数组赋值给 table ,即扩容完成
3.新的 sizeCtl(扩容临界值) 赋值为原长度的两倍减去原长度的一半即 2n - n/2
*/
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
// 将扩容线程数减 1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
/* 如果 sc - 2 == 标识符左移 16 位,说明没有线程在进行扩容
将finishing 和 advance 都置为 true
i = n ,再次检查一遍整个区间
*/
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 从原数组中获取第 i 个元素,如果为空,该位置指向特殊占位符,advance 置为 true 进入下一个循环
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)// 如果该节点不为空且 hash 值等于 MOVED 说明正在被别的线程修改 进入下一个循环
advance = true; // already processed
else {// 节点不为空,且没有被别的线程修改,锁住该节点,防止其他线程在过程中修改该位置链表
synchronized (f) {
//再次判断,防止中途已经被修改过了
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {// 链表
/* 跟原数组长度做位与运算,结果为0或者长度本身(因数组长度只可能为 2^n 次方)
如果为 0 ,则在新数组中的位置跟原数组中取模的位置一致,不会变化
如果不为 0 ,则在新数组中位置跟原数组中位置不一致,新位置为当前位置加上原数组长度
(可以自己测试)
*/
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
/* 链表的下一个节点也会做位与运算
会生成一个 lastRun,他之后的节点hash值与长度位与是一致的
*/
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 为0即是低位
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 不为0即是高位
else {
hn = lastRun;
ln = null;
}
// 再次遍历,但只遍历到 lastRun 为止
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 为 0 说明在低位链表上,新建低位节点,把 next 指向此时的 ln ,再赋值给 ln 形成一个新的低位链表
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 不为 0 说明在高位,新建高位节点,把 新建节点的 next 指向此时的 hn,再赋值给 hn 形成新的高位链表
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
/* 设置 nextTab 的 i 位置处为新的低位链表
设置 nextTab 的 i + n 位置为新高位链表
设置原 tab 的 i 位置为特殊占位节点
*/
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
// 设置 advance 为 true 进行下一个循环
advance = true;
}
// 红黑树部分逻辑差不多,只是最后操作不同
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {// 遍历
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {// 位与运算为0 低位
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {// 位与运算不为0 高位
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 小于红黑树转换链表的额临界值就转换,否则重新建新树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
扩容机制跟 HashMap 是差不多的,不同的是它可以支持多线程分组扩容
他会按照 cpu 来分配每一个所能操作的数组区间,从区间的最大值开始一直到区间的最小值,在处理过程中,处理到index位置时,会把该位置锁住,变成特殊的占位节点,此时,其他线程无法修改该位置的数据,而里面的 链表重新分配的机制和HashMap一致,也是使用高低位链表的方式。而且该方法也用到了死循环+重新判断的机制来保证线程安全。
最后看一下 get 方法
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 重复hash
int h = spread(key.hashCode());
/* table 不为空
获取 hash 值所在位置的节点赋值给 e
*/
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 该 hash 位置处是链表
if ((eh = e.hash) == h) {
// 如果该节点 hash 和 key 都相同 则返回该节点的 value
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)// 该 hash 位置是红黑树,调用红黑树的查找节点方法
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {// 遍历链表,找到 hash 和 key 都相同的话返回该节点的 value
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
这个方法就比较简单了,找到了对应的节点就返回 value 值
到此,两个 Map 的基本方法都看的差不多了,总结一下。
在单线程的环境下肯定是HashMap会更好一些,虽然ConcurrentHashMap,在Hashtable的锁全部基础上改进成了,锁一部分数据,但依然还是会有影响。
然而在多线程环境下,ConcurrentHashMap就非常值得使用,该类基本是在线程安全的情况下,效率比较好的,通过CAS和synchronized来控制,每次只锁住其中一个桶,可以保证多个线程并行对不同桶操作。
因为我自己能力有限,不可能保证说的都是正确的,只是我理解的是这样,希望有更有能力的人看到我的错误,可以指证出来,以免误导他人,也能够让我学的更加深刻,谢谢浏览。