• ConcurrentHashMap是如何实现线程安全的?
  • 发布于 2个月前
  • 290 热度
    0 评论

众所周知 ConcurrentHashMap 是 HashMap 的多线程版本,HashMap 在并发操作时会有各种问题,比如死循环问题、数据覆盖等问题。而这些问题,只要使用 ConcurrentHashMap 就可以完美解决了,那问题来了,ConcurrentHashMap 是如何保证线程安全的?它的底层又是如何实现的?


 ConcurrentHashMap 线程安全实现简述

ConcurrentHashMap 在 JDK 1.7 时,使用的是分段锁也就是 Segment 来实现线程安全的。 然而它在 JDK 1.8 之后,使用的是 CAS + synchronized 或 CAS + volatile 来实现线程安全的。


JDK 1.7 底层结构

ConcurrentHashMap 在不同的 JDK 版本中实现是不同的,**在 JDK 1.7 中它使用的是数组加链表的形式实现的,而数组又分为:大数组 Segment 和小数组 HashEntry。**大数组 Segment 可以理解为 MySQL 中的数据库,而每个数据库(Segment)中又有很多张表 HashEntry,每个 HashEntry 中又有多条数据,这些数据是用链表连接的,如下图所示:

JDK 1.7 线程安全实现

了解了 ConcurrentHashMap 的底层实现,再看它的线程安全实现就比较简单了。 接下来,我们通过添加元素 put 方法,来看 JDK 1.7 中 ConcurrentHashMap 是如何保证线程安全的,具体实现源码如下:

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 堆代码 duidaima.com
    // 在往该 Segment 写入前,先确保获取到锁
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); 
    V oldValue;
    try {
        // Segment 内部数组
        HashEntry<K,V>[] tab = table;
        int index = (tab.length - 1) & hash;
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            if (e != null) {
                K k;
                // 更新已有值...
            }
            else {
                // 放置 HashEntry 到特定位置,如果超过阈值则进行 rehash
                // 忽略其他代码...
            }
        }
    } finally {
        // 释放锁
        unlock();
    }
    return oldValue;
}

从上述源码我们可以看出,Segment 本身是基于 ReentrantLock 实现的加锁和释放锁的操作,这样就能保证多个线程同时访问 ConcurrentHashMap 时,同一时间只有一个线程能操作相应的节点,这样就保证了 ConcurrentHashMap 的线程安全了。


也就是说 ConcurrentHashMap 的线程安全是建立在 Segment 加锁的基础上的,所以我们把它称之为分段锁或片段锁,如下图所示:

JDK 1.8 底层结构

在 JDK 1.7 中,ConcurrentHashMap 虽然是线程安全的,但因为它的底层实现是数组 + 链表的形式,所以在数据比较多的情况下访问是很慢的,因为要遍历整个链表,而 JDK 1.8 则使用了数组 + 链表/红黑树的方式优化了 ConcurrentHashMap 的实现,具体实现结构如下:  

链表升级为红黑树的规则:当链表长度大于 8,并且数组的长度大于 64 时,链表就会升级为红黑树的结构。

PS:ConcurrentHashMap 在 JDK 1.8 虽然保留了 Segment 的定义,但这仅仅是为了保证序列化时的兼容性,不再有任何结构上的用处了。


JDK 1.8 线程安全实现在 

JDK 1.8 中 ConcurrentHashMap 使用的是 CAS + volatile 或 synchronized 的方式来保证线程安全的,它的核心实现源码如下:

final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();

    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 节点为空
            // 利用 CAS 去进行无锁线程安全操作,如果 bin 是空的
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break; 
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else if (onlyIfAbsent
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        else {
            V oldVal = null;
            synchronized (f) {
                   // 细粒度的同步修改操作... 
                }
            }
            // 如果超过阈值,升级为红黑树
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

从上述源码可以看出,在 JDK 1.8 中,添加元素时首先会判断容器是否为空,如果为空则使用 volatile 加 CAS 来初始化。如果容器不为空则根据存储的元素计算该位置是否为空,如果为空则利用 CAS 设置该节点;如果不为空则使用 synchronize 加锁,遍历桶中的数据,替换或新增节点到桶中,最后再判断是否需要转为红黑树,这样就能保证并发访问时的线程安全了。


我们把上述流程简化一下,我们可以简单的认为在 JDK 1.8 中,ConcurrentHashMap 是在头节点加锁来保证线程安全的,锁的粒度相比 Segment 来说更小了,发生冲突和加锁的频率降低了,并发操作的性能就提高了。而且 JDK 1.8 使用的是红黑树优化了之前的固定链表,那么当数据量比较大的时候,查询性能也得到了很大的提升,从之前的 O(n) 优化到了 O(logn) 的时间复杂度,具体加锁示意图如下: 

小结

ConcurrentHashMap 在 JDK 1.7 时使用的是数据加链表的形式实现的,其中数组分为两类:大数组 Segment 和小数组 HashEntry,而加锁是通过给 Segment 添加 ReentrantLock 锁来实现线程安全的。而 JDK 1.8 中 ConcurrentHashMap 使用的是数组+链表/红黑树的方式实现的,它是通过 CAS 或 synchronized 来实现线程安全的,并且它的锁粒度更小,查询性能也更高。


用户评论