ConcurrentHashMap

java.util.HashMap是线程不安全的集合类,如果想要使用线程安全的Map,可以选择使用java.util.Hashtable(不建议),或者java.util.Collections.synchronizedMap,或者java.util.concurrent.ConcurrentHashMap。本篇文章就是ConcurrentHashMap(JDK1.8)的源码笔记。

继承/实现关系图

1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable { ... }

ConcurrentHashMap

关键常量与成员变量

常量

1
2
3
4
5
6
7
8
9
10
11
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
private static final float LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;

static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** bucket桶数组, 在第一次插入时才懒加载 */
transient volatile Node<K,V>[] table;
/** 在 resize 时使用的桶数组, 只有在 resize 的时候非空 */
private transient volatile Node<K,V>[] nextTable;
/**
* table 初始化与 resize 的标志
* 为负数时, table 正在被初始化或者 resize
* -1 代表正在初始化
* -x: 有 x - 1 个线程正在进行 resize, 即-(1 + 正在进行 resize 的线程数量)
* 0: table还未被初始化.
* 正数: 下一次进行扩容的容量大小
*/
private transient volatile int sizeCtl;

/** Unsafe 类 用于实现CAS操作 */
private static final sun.misc.Unsafe U;

CHM的实现中可以看到大量的U.compareAndSwapXxxx的方法去操作CHM的一些属性,也就是大量地使用CASCAS是乐观锁的一种实现方式,每次操作前都假设不会产生冲突,等到发生冲突的时候就会去自旋重试

CAS的思想不再多说,问题在于会产生ABA问题,可通过version number机制加以解决。

内部类

  • Node类,普通的结点。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;

Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}

public final K getKey() { return key; }
public final V getValue() { return val; }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException();
}

public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}

/**
* Virtualized support for map.get(); overridden in subclasses.
*/
Node<K,V> find(int h, Object k) { ... }
}
  • TreeNode类,继承于Node类,封装为红黑树的结点,但是没有具体操作,同时被TreeBin再一次封装
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Nodes for use in TreeBins
*/
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;

TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) { ... }
}
  • TreeBin,不处理用户的key-value信息,而是包装了TreeNode结点,封装了大量的红黑树操作。在实际的ConcurrentHashMap数组中,存放的是TreeBin对象,而不是TreeNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;
volatile Thread waiter;
volatile int lockState;
// values for lockState
static final int WRITER = 1; // set while holding write lock
static final int WAITER = 2; // set when waiting for write lock
static final int READER = 4; // increment value for setting read lock

static int tieBreakOrder(Object a, Object b) { ... }
TreeBin(TreeNode<K,V> b) { ... }
private final void lockRoot() { ... }
private final void unlockRoot() { lockState = 0; }
private final void contendedLock() { ... }
final Node<K,V> find(int h, Object k) { ... }
final TreeNode<K,V> putTreeVal(int h, K k, V v) { ... }
final boolean removeTreeNode(TreeNode<K,V> p) { ... }

/** 红黑树操作方法 */
/** ---------------------- */
/** 左旋操作 */
static <K,V> TreeNode<K,V> rotateLeft(TreeNode<K,V> root, TreeNode<K,V> p) { ... }
/** 右旋操作 */
static <K,V> TreeNode<K,V> rotateRight(TreeNode<K,V> root, TreeNode<K,V> p) { ... }
/** 插入后进行树的调整 */
static <K,V> TreeNode<K,V> balanceInsertion(TreeNode<K,V> root, TreeNode<K,V> x) { ... }
/** 删除后进行树的调整 */
static <K,V> TreeNode<K,V> balanceDeletion(TreeNode<K,V> root, TreeNode<K,V> x) { ... }

/** Unsafe 类 */
private static final sun.misc.Unsafe U;

private static final long LOCKSTATE; // 锁状态
static {
try {
U = sun.misc.Unsafe.getUnsafe();
Class<?> k = TreeBin.class;
LOCKSTATE = U.objectFieldOffset
(k.getDeclaredField("lockState"));
} catch (Exception e) {
throw new Error(e);
}
}
}
  • ForwardingNode,特殊结点,在扩容时才会出现的特殊结点,key,value,hash全部为null,由nextTable指向新的table数组。
1
2
3
4
5
6
7
8
9
10
/** A node inserted at head of bins during transfer operations. */
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

Node<K,V> find(int h, Object k) { ... }
}

CAS操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 返回 tab 中索引为 i 处的元素 */
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

/** 利用 CAS 设置 tab 中索引为 i 处的元素 */
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

/** 设置 tab 中索引为 i 处的元素为 v */
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

关键方法

构造方法

  • CHM为我们提供了 $5$ 种构造方法,从中可以看出比HashMap多了一个加入了concurrencyLevel的构造方法,这个代表预先估计的并发量。不同的构造函数计算sizeCtl的方法不一样。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/** 返回一个默认容量为 16 的CHM */
public ConcurrentHashMap() {}

/** 手动设置初始最大容量
* 小于 0 则抛出异常
* 大于MAXIMUM_CAPACITY/2,设置为MAXIMUM_CAPACITY
* 否则利用 tableSizeFor()方法将 initialCapacity * 1.5 + 1 的值转化为 2 的幂的大小
* 设置sizeCtl = cap, 即下一次扩容时的容量.
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, 1); // 调用下面的全参构造 1 代表1个线程
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
// 初始容量 < 并发线程量
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

初始化方法 initTable()

  1. 首先检查tab是否为空,如果为空的话则尝试去初始化。
  2. 检查sizeCtl是否小于 0,因为sizeCtl 的默认值为0,所以最先进入方法的一个或多个线程会尝试使用CAS去初始化数组,设置成功的话sizeCtl=-1,表示正在初始化,后续的线程将会进入yield状态。
  3. 在将sizeCtl设置为 -1 后,会再次检查 tab 是否已被初始化,这是双端检锁的思想,避免tab被重复初始化。
  4. 此时将会去设置指定容量的 Node 数组,默认为 16,并将成员变量table指向新创建的 Node 数组。
  5. 接着降回去计算阈值,这里使用sc=n - (n >>> 2);,可以保证新的阈值=新容量*0.75。
  6. 最后再更新sizeCtl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
// 当 tab == null 或者长度为 0 时, 则一直循环试图初始化 table
while ((tab = table) == null || tab.length == 0) {
// 1. sizeCtl < 0, 说明有其他线程正在进行初始化或者扩容, 则挂起当前线程
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
// 2. sizeCtl >= 0, 调用 CAS 试图将 sizeCtl 设置为 -1, 表示正在初始化.
// 此时如果别的线程也进入了 initTable() 方法, 将会执行上面的 Thread.yield();
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
// 3. 如果设置 sizeCtl = -1 成功的话
try {
// 4. 在此确认 table 是否被初始化 双端检锁的思想
if ((tab = table) == null || tab.length == 0) {
// 5. 如果 sc > 0 则 n = sc, 否则 n = 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 6. 创建指定容量大小的 Node 数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
// 7. 将 table 指向新创建的 Node 数组
table = tab = nt;
// 8. 计算阈值, sc = 0.75n 当 CHM 储存的键值对数量大于这个值时将会扩容
// 8.1 这里的 0.75 等于默认负载因子,
// HashMap、Hashtable如果使用传入了负载因子的构造函数的话
// 那么每次扩容, 新阈值=新容量*负载因子
// 8.2 CHM不管使用哪种构造函数初始化, 新阈值=新容量*0.75
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
  • Thread.yield()方法,使当前线程由执行状态变为就绪状态,让出CPU时间,在下一个线程执行时候,此线程可能被执行,也可能没被执行。

put(K key, V val)

  1. 检查k-v是否为null,是的话则抛出NPE
  2. 根据 key 计算 hash 值。
  3. 检查 table 是否已被初始化,如果没有的话将会尝试去初始化,此时允许多个线程去尝试初始化,但是initTable()中的CAS操作保证了只有一个线程能够初始化成功。
  4. 利用 (n-1) & hash 计算索引值,然后检查 table 中这个位置的首结点是否为空,为空的话则使用CAS尝试去设置值,设置成功则跳出。
  5. 如果首结点不为空,说明发生了碰撞,这个时候将会使用synchronized去加锁头结点,但是不影响table中的其他 Node 的工作。接下来将会去判断加锁的头结点是属于链表结点还是红黑树节点。
    1. 如果是链表结点的话, 比较 key 和 hash 是否均相等,如果相等则进行覆盖,如果找到链表尾部 key 都不相等则在尾部进行插入(尾插法)。然后还要检查链表结点数量是否达到了树化阈值,是的话则进行树化。
    2. 如果是红黑树节点的话,则调用红黑树的操作进行节点插入,并进行相应地调整。
  6. 如果此时进来的其他线程检查到正在扩容操作的话,将会去协助扩容,减少扩容时间。
  7. 最后调用addCount(1L, binCount) 方法区近似地估计一下table中的总的元素数量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* Maps the specified key to the specified value in this table.
* Neither the key nor the value can be null.
* 也就是说 key-value 均不能为 null
*/
public V put(K key, V value) {
return putVal(key, value, false);
}

/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 1. 检查 key-value 是否为空
if (key == null || value == null) throw new NullPointerException();
// 2. 计算 hash 索引, return (h ^ (h >>> 16)) & HASH_BITS(0x7fffffff);
// 加 & HASH_BITS(0x7fffffff)是为了保障结果为正数
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
// 3. 检查 table 是否为空, 这里没加 synchronized
// 也就是允许多个线程去尝试初始化
// 但是 initTable() 方法里使用CAS保证了只有一个线程执行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 4. (n - 1) & hash 这个操作其实就是取模, 跟HashMap一样
// 检查 tab 中索引为 i 处的位置是否为空
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 5. 如果为空的话则使用 CAS 的方式尝试去设置值 设置成功则跳出
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
// 6. 如果正在扩容
else if ((fh = f.hash) == MOVED)
// 7. tab 等于其他线程帮助扩容后的结果 下面有代码
tab = helpTransfer(tab, f);
else {
// 8. 出现了碰撞
V oldVal = null;
synchronized (f) { // 加锁当前下标的链表, 其他bucket并未加锁, 提升了效率
if (tabAt(tab, i) == f) { // 9. hashcode值此时相等了
// 10. 此时为链表, 在链表中寻找插入点
if (fh >= 0) { // 如果 hashcode >= 0
binCount = 1;
for (Node<K,V> e = f;; ++binCount) { // 遍历链表
K ek;
// 10. 比较待插入点的 key 是不是相等
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
// 11. 如果相等则覆盖
oldVal = e.val;
// 12. 如果不是只在空的时候才插入, 并跳出
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
// 13. 如果找到链表尾部都没找到相同key的, 则插入到尾部.
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
// 14. 如果 f 是红黑树结点, 则插入树结点.
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
// 15. binCount在遍历链表的时候自增, 如果大于阈值则树化
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 16. 对于table的size的近似估计
// 在多线程环境下没有办法去估计究竟有多少线程做了多少操作
// 产生了什么样的结果.
addCount(1L, binCount);
return null;
}

helpTransfer()

transfer()方法为CHM扩容的核心方法。在此过程中,CHM支持多线程扩容。扩容主要分为两个步骤:

  • 构建nextTable桶数组,大小为之前的两倍,这个操作在单线程下完成。
  • oldTable里面的内容复制到nextTable,这个操作允许多线程操作。可以减少扩容时间。

基本思路与HashMap差不多,主要区别在于CHM多线程加锁synchronized (f)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/** 如果 resize 正在进行的话则其他线程帮助一起 resize */
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
// 1. 声明一个新数组, 以及sc, 也就是 sizeCtl
Node<K,V>[] nextTab; int sc;
// 2. 如果tab不空, 并且是扩容结点 ForwardingNode(上面有这个结点的介绍)
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) { // 3. sc < 0, 说明可能有多个线程在扩容
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 4. 调用下面的 transfer 方法
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

transfer()

在此过程中用到的常量或方法

1
2
3
static final int NCPU = Runtime.getRuntime().availableProcessors();
private static final int MIN_TRANSFER_STRIDE = 16;
private transient volatile Node<K,V>[] nextTable;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
* 把旧的桶中的数组中的结点移动或者复制到新的 table 中
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 1. 根据CPU数量来为每个线程分配"力所能及"的bucket(bin)数量, 避免因扩容线程过多反而影响性能
// 主要是计算 stride, 如果CPU核大于1,则stride=n无符号右移3位除以CPU核
// 如果只有1核的话, 那 stride=n, 那么接下来就辛苦你了, 仅有的CPU, 全部都要交给你了.
// 如果stride最后小于16的话, 则强制给每个线程分配16个bin(桶)
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
// 2. 如果辅助数组为空的话则进行初始化, 且初始化为原数组的n<<1倍,也就是2倍
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 捕获 OOM
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true; // 前进, 继续的意思.
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 迁移过程
// 逻辑与HashMap类似, 拿旧数组的容量当做mask(掩码),然后与当前结点的hash值做 & 运算
// 可以得出新结点的新增有效位
// 是 0 的话索引没变,是 1 的话索引变成索引+旧数组容量
// 可以参考HashMap那篇文章分析.
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) { // 如果 forwardingNode hash>=0, 为链表
int runBit = fh & n; // 计算新增的有效位
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 为 0, 索引不变
setTabAt(nextTab, i + n, hn); // 为 1, i + n
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) { // 如果是红黑树结点的话, 进行红黑树相关操作
TreeBin<K,V> t = (TreeBin<K,V>)f; // 强转 forwardingNode
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

addCount()

addCount()的作用是估计table.size,也就是table中存储的键值对数量,在使用put()/remove()方法并且成功执行之后都会调用addCount()。需要注意的是,这个值也只是估计值,因为多线程并发的情况下,没有办法精确统计有多少线程做了多少操作,作用的效果如何。

方法中用到的常量或方法

1
2
3
4
5
6
private transient volatile CounterCell[] counterCells; // 计数单元格,当非空时大小为2的幂
private static final long BASECOUNT; // Unsafe
private static final long CELLVALUE; // Unsafe
// 主要在没有竞争时使用, 也就是单线程情况下统计的table中的结点
// 但是也作为table初始化时的回调, 通过 CAS 更新
private transient volatile long baseCount;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
* 添加count, 如果table太小或者没有正在扩容,则初始化迁移操作.
* 如果已经正在扩容, 尽可能去帮助迁移操作.
* 在迁移之后再次检查坑位, 看看是否又需要resize, 因为正在扩容这个动作存在滞后(也就是不实时)
*
* @param x the count to add 要添加的统计数量
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
// 尝试修改baseCount, 以求达到计数的目的, 如果修改失败, 说明处于竞争情况, 即多线程操作
boolean uncontended = true; // 未处于竞争状态
/**
* 如果计算单元格为空, 或者随机取一个数组为空, 或者CAS修改失败
* 表示处于并发竞争中, 则执行fullAddCount(x, uncontended)以死循环插入, 同时返回.
* 否则代表这个slot处的变量修改成功, 继续执行.
*
* 每个线程通过 ThreadLocalRandom.getProbe() & m 寻址找到属于它的CounterCell并计数
* ThreadLocalRandom是线程私有的伪随机数生成器, 每个probe都是不同的.
* CounterCell的大小是2的幂, 初始容量为2, 每次扩容为之前的2倍.
* 出于性能考虑, 最大容量为机器的CPU数量, CounterCell碰撞冲突是很严重的.
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended); // 死循环插入
return;
}
if (check <= 1) // check if uncontended
return;
s = sumCount();
}
// if < 0, don't check resize, if <= 1 only check if uncontended
if (check >= 0) { // 检查是否需要扩容
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}

final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

get(Object key)

  1. 根据 key 计算 哈希值
  2. 检查table是否为空,非空的话检查table中根据key计算出的索引处的头结点也不为空
    1. 如果这两个检查中有一个为空,则返回null
    2. 否则的话,检查定位到的头结点的 hash 与 key 是否均相等,相等的话则返回结果。
    3. 否则的话,判断是红黑树节点还是链表结点。如果是红黑树节点的话则到红黑树中查找,如果是链表结点的话则遍历链表查找,并返回相应的结果。
  3. 如果最后没有查到,则返回 null
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
// 1. 计算哈希值
int h = spread(key.hashCode());
// 2. 如果table不空 并且 table 长度大于 0 且计算出的下标上的 bucket 不为空
// 说明这个 bucket 存在, 进入到 bucket 中查找
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
// 3. 如果 hashcode 相等 并且 key 相等表示查找到, 返回 val
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 3. 哈希地址小于 0, 代表是红黑树的 bucket, 在红黑树中查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 4. 如果bucket头节点的哈希地址不小于0, 则代表bucket为链表, 遍历链表, 在链表中查找.
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}

remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
/** 从map中移除key, 如果key不存在的话什么也不做. */
public V remove(Object key) {
return replaceNode(key, null, null);
}

final V replaceNode(Object key, V value, Object cv) {
// 1. 计算 hashcode
int hash = spread(key.hashCode());
for (Node<K,V>[] tab = table;;) {
// 遍历 table
Node<K,V> f; int n, i, fh;
// 2. 如果tab为空或者key所在的bucket为空, 则什么也不做, 跳出返回.
if (tab == null || (n = tab.length) == 0 ||
(f = tabAt(tab, i = (n - 1) & hash)) == null)
break;
else if ((fh = f.hash) == MOVED)
// 3. 如果正在扩容, 则帮助扩容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
boolean validated = false;
synchronized (f) { // 4. 将key所在的bucket加锁, 其他bucket仍未加锁
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 5. 哈希地址大于0, 为链表
validated = true;
for (Node<K,V> e = f, pred = null;;) { // 6. 遍历链表
K ek;
// 7. 检查key是否相等, 如果相等则进行移除.
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
V ev = e.val;
if (cv == null || cv == ev ||
(ev != null && cv.equals(ev))) {
oldVal = ev;
if (value != null)
e.val = value;
else if (pred != null)
pred.next = e.next;
else
setTabAt(tab, i, e.next);
}
break;
}
pred = e;
if ((e = e.next) == null)
break;
}
}
// 8. 如果是树结点则在红黑树中寻找并删除
else if (f instanceof TreeBin) {
validated = true;
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r, p;
if ((r = t.root) != null &&
(p = r.findTreeNode(hash, key, null)) != null) {
V pv = p.val;
if (cv == null || cv == pv ||
(pv != null && cv.equals(pv))) {
oldVal = pv;
if (value != null)
p.val = value;
else if (t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
if (validated) {
if (oldVal != null) {
if (value == null)
addCount(-1L, -1);
return oldVal;
}
break;
}
}
}
return null;
}

比较

比较方面 HashMap Hashtable ConcurrentHashMap
是否线程安全
线程安全采用的方式 采用synchronized类锁,效率低 CAS + synchronized,锁住的只有当前操作的bucket,不影响其他线程对其他bucket的操作,效率高
数据结构 数组+链表+红黑树(链表长度超过8则转红黑树) 数组+链表 数组+链表+红黑树(链表长度超过8则转红黑树)
是否允许null键值
哈希地址算法 (h=key.hashCode())^(h>>>16 key.hashCode() (h=key.hashCode())^(h>>>16)&0x7fffffff
定位算法 (n-1)&hash (hash&0x7fffffff)%n (n-1)&hash
扩容算法 当键值对数量大于阈值,则容量扩容到原来的2倍 当键值对数量大于等于阈值,则容量扩容到原来的2倍+1 当键值对数量大于等于sizeCtl,单线程创建新哈希表,多线程复制bucket到新哈希表,容量扩容到原来的2倍
链表插入 将新节点插入到链表尾部 将新节点插入到链表头部 将新节点插入到链表尾部
继承的类 继承abstractMap抽象类 继承Dictionary抽象类 继承abstractMap抽象类
实现的接口 实现Map接口 实现Map接口 实现ConcurrentMap接口
默认容量大小 16 11 16
默认负载因子 0.75 0.75 0.75
统计size方式 直接返回成员变量size 直接返回成员变量count 遍历CounterCell数组的值进行累加,最后加上baseCount的值即为size

评论