JDK7 ConcurrentHashMap源码分析

ConcurrentHashMap是JDK5以后引入的,用于弥补HashMap不可用于多线程环境以及HashTable在多线程并发竞争效率低的缺陷。ConcurrentHashMap不仅是线程安全的HashMap,而且通过引入分段锁概念来提高多线程并发访问的效率。如果你的程序处于多线程环境,而且要用到HashMap的结构来存储数据,那么ConcurrentHashMap将是你的首选。这篇文章主要介绍ConcurrentHashMap的底层实现原理以及一些常用的操作。

1. ConcurrentHashMap底层结构

Segment + HashEntry

一个ConcurrentHashMap由一个Segment数组组成(Segment数组长度默认为16),一个Segment底层是一个HashEntry数组,而HashEntry对象包括keyvaluehash值和指向下一个HashEntry的引用nextSegment中的每一个HashEntry都是一个链表,这样看来一个Segment就对应了一个HashMap结构,那么ConcurrentHashMap内部其实可以看成由多个HashMap,即多个Segment的结构组成。ConcurrentHashMap的结构图如下所示:

concurrent hashmap structure

2. 分段锁介绍

一个Segment代表了一个可重入锁(ReentrantLock),ConcurrentHashMap正是通过将元素散列到不同的Segment,来提高多线程并发访问的效率。我们知道HashTable也是线程安全的类,但是在多线程并发竞争激烈的时候,效率很低,因为HashTable就用了一把锁(自己本身),也就是说同一个时刻,最多有一个线程能够获取该锁对内部元素进行操作,其他线程都必须阻塞直到占有锁的线程释放了对HashTable的使用权。而一个ConcurrentHashMap内部持有多个锁(体现为多个Segment),如果多个线程操作的都是不同的Segment,那么这些线程持有的是不同的锁,允许并发写。因此,采用了分段锁的ConcurrentHashMap在多线程并发访问下效率有了很大的提升。

1
2
3
4
5
static final class Segment<K,V> extends ReentrantLock implements Serializable {
// ...
}

3. ConcurrentHashMap的初始化

初始化过程会涉及到以下参数的使用:

  • initialCapacity
  • loadFactor
  • concurrencyLevel
  • ssize
  • segmentShift
  • segmentMask

其中ssize表示Segment数组的长度,是由concurrencyLevel的值来决定的。ssize的值总是接近于concurrencyLevel值,并且必须是2的N次方。假如concurrencyLevel等于14、15或者16,ssize都会等于16(2的4次方)。

initialCapacity和loadFactor影响每个Segment里面HashEntry数组的长度,代码如下:

1
2
3
4
5
6
7
8
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;

默认的initialCapacity和ssize均为16,cap = MIN_SEGMENT_TABLE_CAPACITY = 2从源码中说明了HashEntry数组的长度是2的N次方

segmentShift、segmentMask两个参数用于根据key散列值来计算元素位于哪个Segment,计算方式如下:

1
(hash >>> segmentShift) & segmentMask

注意初始化过程只初始化了下标为0的Segment,其他的Segment是延迟初始化的:

1
2
3
4
5
6
7
// create segments and segments[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;

ConcurrentHashMap元素的查找需要进行2次定位,第一次是根据hash值定位到某一个Segment上,第二次是根据hash值定位到Segment对应的HashEntry链上。

4. ConcurrentHashMap操作

4.1 get操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key);
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}

get操作首先通过计算key的hash值,然后根据hash值找到对应的Segment所在下标位置,最后从Segment里面查找。从Segment的查找操作又涉及到HashEntry链的定位。即执行一次get操作需要两次定位。第一次定位到Segment,第二次定位到Segment对应的HashEntry链上。两次定位后遍历链表,查找相关元素,找到的情况下返回元素value,找不到就返回null。

从源码中我们看出get操作是不需要加锁的,那么它又是如何保持取到的值是最新的呢?分析如下:

每一个Segment里面HashEntry数组为volatile类型并且HashEntry对象的value属性和next引用均为volatile类型。定义为volatile的变量能够在不同线程之间保持可见性,可以被多个线程同时读,并且不会读到过期的值。

这是因为根据Java内存模型的happen-before原则,对volatile字段的写入操作先于读操作,即使两个线程同时修改和获取volatile变量,get操作也能拿到最新的值。这是用volatile替换锁的经典应用场景。 —— 参考《Java并发编程艺术》一书

4.2 put操作

put操作,同一个时刻只能有一个线程能够访问某个Segment,但是如果其他线程访问的是其他不同的Segment,那么这些线程可以并发写,原因为这些线程持有的是不同的锁。

1
2
3
4
5
6
7
8
9
10
11
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}

put操作首先计算hash值,并定位到对应的Segment,然后在Segment上面执行put操作,Segment的put操作比较复杂,先贴下源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash; // 二次定位,定位到HashEntry链
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) { // hash冲突,遍历查找是否有key值相同的元素
if (e != null) {
K k;
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else { // hash不冲突的情况
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
unlock();
}
return oldValue;
}

Segment的put操作分析如下:

首先会调用tryLock()方法尝试获取锁,如果获取到则node变量为null,如果获取不到锁,那么将调用scanAndLockForPut(...)方法,该操作持续查找对应的节点链上是否存在key对应的节点,如果没有找到就预先创建一个节点,持续尝试n次,直到超过尝试限制,才真正调用lock()方法等待获取锁。对于最大尝试次数MAX_SCAN_RETRIES,单核尝试1次,多核尝试64次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
HashEntry<K,V> first = entryForHash(this, hash);
HashEntry<K,V> e = first;
HashEntry<K,V> node = null;
int retries = -1; // negative while locating node
//尝试加锁
while (!tryLock()) {
HashEntry<K,V> f; // to recheck first below
if (retries < 0) {
if (e == null) {
if (node == null) // speculatively create node
node = new HashEntry<K,V>(hash, key, value, null);
retries = 0;
}
else if (key.equals(e.key))
retries = 0;
else
e = e.next;
}
else if (++retries > MAX_SCAN_RETRIES) {
lock();
break;
}
else if ((retries & 1) == 0 &&
(f = entryForHash(this, hash)) != first) {
e = first = f; // re-traverse if entry changed
retries = -1;
}
}
return node;
}

接下来就是在HashEntry链上进行遍历,如果发现有key相同的元素存在(表示出现hash冲突),那么根据具体情况处理冲突,不出现冲突则新建节点并进行链表节点插入操作。

坚持原创技术分享,您的支持将鼓励我继续创作!