并发编程四ConcurrentHashMap源码分析

概述

ConcurrentHashMap,通过这个名字,可以知道Concurrent是并发的,HashMap是我们常用的一种用来存放键值对的数据结构,所以ConcurrentHashMap就是一种用来解决高并发的HashMap。这个是JDK在1.5之后提供的一种数据结构。

我们知道Java中其实已经有了HashTable这个线程安全的Map,但是他是通过对整个table使用synchronized来保证线程安全的,当有多个线程对HashTable进行读写的时候,每次只能有一个线程会获取到HashTable的对象锁,别的只能处于等待状态,所以在高并发的情况下,它的效率是比较低的。
当然JDK还提供了另外一种方式,那就是Collections.synchronizedMap(hashMap),这个底层也是通过synchronized对要高并发的数据结构来实现线程安全的。
通过上面的分析可以知道,ConcurrentHashMap为了解决synchronized在高并发时的低效问题,ConcurrentHashMap之所以比HashTable高效,是因为他并不是对整个HashMap上锁,而且在JDK1.8跟之前的JDK1.6,1.7的实现都不一样,下面进行对比一下

  • JDK 1.6:ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。
  • JDK 1.8:放弃了Segment的概念,而是直接用Node数组+链表+红黑树的数据结构来实现,并发控制使用Synchronized和CAS来操作,整个看起来就像是优化过且线程安全的HashMap。

继承关系

ConcurrentHashMap

通过继承关系,可以发现,ConcurrentHashMap并没有继承HashMap,而是单独实现了ConcurrentMap这个接口,其余的跟HashMap是一样的

成员变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//table的最大容量
private static final int MAXIMUM_CAPACITY = 1 << 30;
//table的默认容量16
private static final int DEFAULT_CAPACITY = 16;
//默认的并发度为16
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//负载因子
private static final float LOAD_FACTOR = 0.75f;
//链表转化为红黑树的阈值
static final int TREEIFY_THRESHOLD = 8;
//红黑树转链表的阈值
static final int UNTREEIFY_THRESHOLD = 6;
//红黑树的最小容量
static final int MIN_TREEIFY_CAPACITY = 64;

//存放Node的数组
transient volatile Node<K,V>[] table;
private transient volatile Node<K,V>[] nextTable;
private transient volatile long baseCount;
//sizeCtl作为table是否在初始化或者在resize的标志,如果为负数说明正在初始化或者扩容,需要table进行CRUD操作的线程只能等待,稍后进行重试,否则就可以直接进行CRUD操作
//正数则表明table没有初始化
private transient volatile int sizeCtl;
//该节点是否已经移动到新数组中去
static final int MOVED = -1;
//是否是TREEBIN节点
static final int TREEBIN = -2;

除此之外,还有几个类需要解释一下

基本原理

TreeNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; //根节点
TreeNode<K,V> left;//左节点
TreeNode<K,V> right;//右节点
TreeNode<K,V> prev; //上个节点的指针
boolean red;//默认节点为黑
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}

Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}

//查找相应的树节点
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
//此处省略一万行代码......
}

跟一般的树节点都差不多,在前面分析二叉树的时候也差不多是这种数据结构,所以还是比较好理解的,注释说Nodes for use in TreeBins,也就是说这个TreeNode是用来服务TreeBin的,那么继续看Treebins

TreeBins

这个通过注释可以知道,是用来包装TreeNode的,然后作为红黑树的树节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static final class TreeBin<K,V> extends Node<K,V> {
TreeNode<K,V> root;
volatile TreeNode<K,V> first;//TreeNode的根节点
volatile Thread waiter;//waiter线程
volatile int lockState;//当前节点的锁状态
// values for lockState
static final int WRITER = 1; // 获得写数据的锁状态
static final int WAITER = 2; // 等待写数据的锁状态
static final int READER = 4; // 添加数据时的锁状态

TreeBin(TreeNode<K,V> b) {
super(TREEBIN, null, null, null);
this.first = b;//头结点赋值
TreeNode<K,V> r = null;
//递归遍历传过来的Node,从根节点开始
for (TreeNode<K,V> x = b, next; x != null; x = next)
{
next = (TreeNode<K,V>)x.next;//拿到下一个节点
x.left = x.right = null;//将左右节点置空,因为需要重新赋值
//判断r节点是否为空
if (r == null) {
x.parent = null;//根节点置空
x.red = false;//黑子树
r = x;//x赋值为r节点
}
else {
K k = x.key;//拿到key
int h = x.hash;//计算hash值
Class<?> kc = null;//K对应的泛型实体类
//从根节点r开始遍历
for (TreeNode<K,V> p = r;;) {
int dir, ph;
K pk = p.key;
if ((ph = p.hash) > h)
dir = -1;
else if (ph < h)
dir = 1;
else if ((kc == null &&
(kc = comparableClassFor(k)) == null) ||
(dir = compareComparables(kc, k, pk)) == 0)
dir = tieBreakOrder(k, pk);
TreeNode<K,V> xp = p;
if ((p = (dir <= 0) ? p.left : p.right) == null) {
x.parent = xp;
if (dir <= 0)
xp.left = x;
else
xp.right = x;
r = balanceInsertion(r, x);
break;
}
}
}
}
this.root = r;//将r赋值给根节点
assert checkInvariants(root);
}
}

ForwardingNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}

ForwardingNode有两个功能,一个是用来指向nextTable节点,便于将原有的table数组复制到新的table中,另外一个是用来占位空节点,告诉其他节点这个节点已经被处理过了,自动遍历后面的节点。

CAS操作

tabAt

1
2
3
4

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

获得在i索引位置上的Node节点

casTabAt

1
2
3
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
  • 1.利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
  • 2.在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改

setTabAt

1
2
3
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

利用volatile方法设置节点位置的值

核心方法

构造方法

ConcurrentHashMap_constructor

之前分析过HashMap的源码,所以看这些还是比较轻松的,ConcurrentHashMap采用的是懒加载,也就是存入第一对KeyValue的时候才会去初始化table,所以前面四个构造方法只是做了一些参数配置,只有最后一个构造方法,才会去真正的初始化,下面重点看一下最后一个方法

1
2
3
4
5
6
7
8
9
10
 public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet())
putVal(e.getKey(), e.getValue(), false);
}

最后调用了putAll方法,这个会在后面的put方法里面再重点讲解。

transfer方法

transfer实际上执行的就是table数组的扩容操作,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
//创建一个容量是原来2倍的table数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
//创建一个ForwardingNode,指向新table,并且用于填充已经移动的Node位置
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
//如果所有的节点都已经完成复制工作
//就把nextTable赋值给table 清空临时对象nextTable

nextTable = null;
table = nextTab;
// 扩容至现在容量的0.75倍
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
else if ((f = tabAt(tab, i)) == null)
//用ForwardingNode来填充Null节点
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//如果当前节点移动过,将次标志设置为true
//这样后续线程访问便可以直接跳过此节点
advance = true; // already processed
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//普通Node节点
int runBit = fh & n;//通过runBit将链表打散,便于高效遍历
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
//TreeBin节点,
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
//通过h & n的值将TreeBin的节点进行打散
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//如果扩容后已经不再需要tree的结构 反向转换为链表结构
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//在nextTable的i位置上插入一个链表
setTabAt(nextTab, i, ln);
//在nextTable的i+n的位置上插入另一个链表
setTabAt(nextTab, i + n, hn);
//在table的i位置上插入forwardNode节点 表示已经处理过该节点
setTabAt(tab, i, fwd);
//设置advance为true 返回到上面的while循环中 就可以执行i--操作
advance = true;
}
}
}
}
}
}
  • 1.根据当前数组长度n,新建一个两倍长度的数组nextTable;
  • 2.初始化ForwardingNode节点,其中保存了新数组nextTable的引用,在处理完每个槽位的节点之后当做占位节点,表示该槽位已经处理过了
  • 3.通过for自循环处理每个槽位中的链表元素,默认advace为真,通过CAS设置transferIndex属性值
  • 4.如果槽位中没有节点,则通过CAS插入在第二步中初始化的ForwardingNode节点,用于告诉其它线程该槽位已经处理过了
  • 5.如果当前Node已经被线程A处理了,那么线程B处理到这个节点时,取到该节点的hash值应该为MOVED,值为-1,则直接跳过,继续处理下一个Node
  • 6.处理不为null的节点,如果是链表结构,先定义两个变量节点ln和hn,通过CAS把ln链表设置到新数组的i位置,hn链表设置到i+n的位置
  • 7.如果该槽位是红黑树结构,同样将节点分成两类,通过CAS插入到nextTable中

helpTransfer

如果一个线程在进行扩容的时候中断了,但是此时nextTable已经创建了,那么他就会调用helpTransfer方法,帮助扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}

这是一个协助扩容的方法。这个方法被调用的时候,当前ConcurrentHashMap一定已经有了nextTable对象,首先拿到这个nextTable对象,调用上面讲到的transfer方法来进行扩容

put方法

首先调用了put方法

1
2
3
public V put(K key, V value) {
return putVal(key, value, false);
}

紧接着调用了putVal方法,下面来分析下一年putVal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
final V putVal(K key, V value, boolean onlyIfAbsent) {
//key和value都不能为空
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());//计算hash值
int binCount = 0;//链表长度
//开启死循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//table为null或者table的长度为0,初始化table
tab = initTable();
//根据hash值计算table的索引
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果索引处为null,则直接在此处进行插入
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
//通过CAS进行插入,插入成功之后跳出循环
break;
}
else if ((fh = f.hash) == MOVED)
//当遇到表ForwardNode时,需要对表进行整合
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//采用同步代码块进行上锁
synchronized (f) {
if (tabAt(tab, i) == f) {
//普通节点
if (fh >= 0) {
binCount = 1;
//遍历所有节点
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果hash值跟key值都相同,则进行值替换
if (e.hash == hash && ((ek = e.key) == key
||(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null)
{
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
//树节点
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
//判断链表长度,如果链表长度大于阈值,进行树节点转换
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);//改变table中的元素的个数
return null;
}

下面分解一下步骤:

  • 1.计算记录的key的hashCode,然后通过hashcode计算table的index位置
  • 2.获取table[index],为null直接采用casTabAt插入一条新的记录
  • 3.不为null,则锁住根节点,然后通过节点的hashcode值来区分是链表还是红黑树
  • 4.遍历链表或者红黑树,找到相应的节点,进行更新操作
  • 5.如果是更新操作,则不需要进行transfer操作,否则判断一下更新之后的size有没有超过阈值,主要是判断链表的节点数跟table的数组长度

如何保证线程安全

当对table中的某个节点进行写操作的时候,如果为null,直接进行插入,注意如果不为null,则用syncronized关键字锁住当前节点,然后进行写操作。为什么为空的时候插入不需要上锁,因为ConcurrentHashMap采用的是CAS的乐观锁机制,如果有线程在进行修改,就会在外面等待,然后过一会儿再过来查询一下,这样可以保证高效并发。

get方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());//计算hash值
//根据hash值定位元素
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
//如果搜索到的节点key与传入的key相同且不为null,直接返回这个节点
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
//树节点
return (p = e.find(h, key)) != null ? p.val : null;
//通过寻址法进行寻找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  • 1.通过计算key的hash值进而计算key在table中的index
  • 2.拿到table[index],为null,直接返回null
  • 3.如果table[index]不为null,通过Node的hash值来判断是链表还是红黑树
  • 4.遍历到相应的value之后直接返回,否则返回null

如何保证线程安全
首先当多个线程同时读取的时候,ConcurrentHashMap给每个Node节点加了volatile关键字修饰来保证可见性。

总结

看了很多数据结构,感觉ConcurrentHashMap是最复杂的,里面涉及到了很多的CAS操作,以及硬件层的线程安全操作,分析了好几天,才弄懂,下面总结一下c的JDK1.7跟1.8的区别。

JDK 1.7

ConcurrentHashMap1

JDK1.8

ConcurrentHashMap 1

对比分析

  • 1.7采用链表,1.8使用红黑树来替换之前的链表,提高了查找的效率。
  • 1.7版本锁的粒度是基于Segment的,采用的是分段锁,粒度是每个数组1.8的实现降低锁的粒度到每个Node节点,
  • 1.7采用的Segment继承自ReentrantLock,自带锁,1.8的数据结构采用数组+链表+红黑树,使用了大量 的CAS操作,提升了高并发时的性能,丢弃了分段锁的概念。
  • 1.7采用的是ReentrantLock进行同步,1.8 采用了synchronized进行同步,其实一开始没理解,因为ReentrantLock比synchronized更加灵活,后来思考了一下觉得是因为锁的粒度进一步降低,在粗粒度加锁中ReentrantLock可能通过Condition来控制各个低粒度的边界,更加的灵活,而在低粒度中,Condition的优势不大。

参考资料

http://www.importnew.com/22007.html
http://www.jianshu.com/p/f6730d5784ad
http://blog.csdn.net/mine_song/article/details/70140596