众所周知 ConcurrentHashMap 是 HashMap 的多线程版本,HashMap 在并发操作时会有各种问题,比如死循环问题、数据覆盖等问题。而这些问题,只要使用 ConcurrentHashMap 就可以完美解决了,那问题来了,ConcurrentHashMap 是如何保证线程安全的?它的底层又是如何实现的?
ConcurrentHashMap 线程安全实现简述
ConcurrentHashMap 在 JDK 1.7 时,使用的是分段锁也就是 Segment 来实现线程安全的。 然而它在 JDK 1.8 之后,使用的是 CAS + synchronized 或 CAS + volatile 来实现线程安全的。
JDK 1.7 底层结构
ConcurrentHashMap 在不同的 JDK 版本中实现是不同的,**在 JDK 1.7 中它使用的是数组加链表的形式实现的,而数组又分为:大数组 Segment 和小数组 HashEntry。**大数组 Segment 可以理解为 MySQL 中的数据库,而每个数据库(Segment)中又有很多张表 HashEntry,每个 HashEntry 中又有多条数据,这些数据是用链表连接的,如下图所示:
JDK 1.7 线程安全实现
了解了 ConcurrentHashMap 的底层实现,再看它的线程安全实现就比较简单了。 接下来,我们通过添加元素 put 方法,来看 JDK 1.7 中 ConcurrentHashMap 是如何保证线程安全的,具体实现源码如下:
final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 堆代码 duidaima.com // 在往该 Segment 写入前,先确保获取到锁 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // Segment 内部数组 HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash; HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; // 更新已有值... } else { // 放置 HashEntry 到特定位置,如果超过阈值则进行 rehash // 忽略其他代码... } } } finally { // 释放锁 unlock(); } return oldValue; }
从上述源码我们可以看出,Segment 本身是基于 ReentrantLock 实现的加锁和释放锁的操作,这样就能保证多个线程同时访问 ConcurrentHashMap 时,同一时间只有一个线程能操作相应的节点,这样就保证了 ConcurrentHashMap 的线程安全了。
也就是说 ConcurrentHashMap 的线程安全是建立在 Segment 加锁的基础上的,所以我们把它称之为分段锁或片段锁,如下图所示:
JDK 1.8 底层结构
在 JDK 1.7 中,ConcurrentHashMap 虽然是线程安全的,但因为它的底层实现是数组 + 链表的形式,所以在数据比较多的情况下访问是很慢的,因为要遍历整个链表,而 JDK 1.8 则使用了数组 + 链表/红黑树的方式优化了 ConcurrentHashMap 的实现,具体实现结构如下:
链表升级为红黑树的规则:当链表长度大于 8,并且数组的长度大于 64 时,链表就会升级为红黑树的结构。
PS:ConcurrentHashMap 在 JDK 1.8 虽然保留了 Segment 的定义,但这仅仅是为了保证序列化时的兼容性,不再有任何结构上的用处了。
JDK 1.8 线程安全实现在
JDK 1.8 中 ConcurrentHashMap 使用的是 CAS + volatile 或 synchronized 的方式来保证线程安全的,它的核心实现源码如下:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; K fk; V fv; if (tab == null || (n = tab.length) == 0) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 节点为空 // 利用 CAS 去进行无锁线程安全操作,如果 bin 是空的 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value))) break; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else if (onlyIfAbsent && fh == hash && ((fk = f.key) == key || (fk != null && key.equals(fk))) && (fv = f.val) != null) return fv; else { V oldVal = null; synchronized (f) { // 细粒度的同步修改操作... } } // 如果超过阈值,升级为红黑树 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); return null; }
从上述源码可以看出,在 JDK 1.8 中,添加元素时首先会判断容器是否为空,如果为空则使用 volatile 加 CAS 来初始化。如果容器不为空则根据存储的元素计算该位置是否为空,如果为空则利用 CAS 设置该节点;如果不为空则使用 synchronize 加锁,遍历桶中的数据,替换或新增节点到桶中,最后再判断是否需要转为红黑树,这样就能保证并发访问时的线程安全了。
我们把上述流程简化一下,我们可以简单的认为在 JDK 1.8 中,ConcurrentHashMap 是在头节点加锁来保证线程安全的,锁的粒度相比 Segment 来说更小了,发生冲突和加锁的频率降低了,并发操作的性能就提高了。而且 JDK 1.8 使用的是红黑树优化了之前的固定链表,那么当数据量比较大的时候,查询性能也得到了很大的提升,从之前的 O(n) 优化到了 O(logn) 的时间复杂度,具体加锁示意图如下:
小结
ConcurrentHashMap 在 JDK 1.7 时使用的是数据加链表的形式实现的,其中数组分为两类:大数组 Segment 和小数组 HashEntry,而加锁是通过给 Segment 添加 ReentrantLock 锁来实现线程安全的。而 JDK 1.8 中 ConcurrentHashMap 使用的是数组+链表/红黑树的方式实现的,它是通过 CAS 或 synchronized 来实现线程安全的,并且它的锁粒度更小,查询性能也更高。