首页 > 编程 > Java > 正文

Java集合类框架学习 5.1 —— ConcurrentHashMap(JDK1.6)

2019-11-06 07:15:56
字体:
来源:转载
供稿:网友
以下内容,如有问题,烦请指出,谢谢!这一篇讲1.6的ConcurrentHashMapConcurrentHashMap是java程序员接触得最多的有关并发和线程安全的类,它兼顾了并发的两个基本点——安全,高效,在很多地方都有用到,每一个Java程序员都应该真正掌握这个类。虽然1.6版本过了10年多了,现有的1.8跟1.6的差异很大。不过1.6的最适合用来学习,它实现得简单直接,分段锁的思想也讲清楚了,所以先学习1.6的,然后看看后续版本对它的改进。

废话就不多说了,直入正题。

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable

一、基本性质1、支持完全并发的读操作和可控并发级别的写操作的Map,方法的基本功能与Hashtable大致相同,但是使用锁分段技术,没有使用全局锁,整体上更有效率,能够在大多数并发场景中替代Hashtable。2、不允许null key和null value(1.7之前的代码中,指出非常特殊情况可能会读到null value,但是在jsr133后的新内存模型中是不会发生的,也就是实际中不会读到null value,1.7把这个无用的代码去掉了)。3、专为并发设计的集合类。它的读操作不加锁(null value的问题不说了),可以和写操作并行,但是读操作只反映最近完成的结果,也可能和它开始执行的那一瞬间的状态有出入,这在一般的并发场景中可以接受。基本的写操作加锁,putAll和clear这两个批量操作没有使用全局锁,因此并不保证结果和预期的一致(比如clear执行后的一瞬间,可能map中会有其他线程添加的数据)。它还不是真正意义上的线程安全,记住线程安全的关键:线程一个个串行,和多线程并发的执行,产生的效果(对外界可见的影响)一样。很明显ConcurrentHashMap达不到这种程度。它的这种并发控制更像是数据库隔离级别。它没有全局锁(数据库进程锁/库锁),使用的是Segment的锁(数据库表级别的锁)。根据写阻塞写不阻塞读这一点,可以认为它具体的隔离级别是"Read uncommitted"这个级别,即不会丢失写操作带来的影响,但是会读取到"中间态数据/脏数据"(写操作方法还没有返回,就能看到写操作带来的部分影响)。至于”不可重复读“、”幻读“这两个,根据具体实现,Segment的读方法都只读一次(加锁读不管),可以认为是没有的。“脏数据”这个,可以认为只在putAll、clear中才会出现。基本的put/remove/replace,因为都只操作一个K-V,因此可以认为它们的执行过程中是没有“脏数据”出现的。ConcurrentHashMap在多线程之间可能会存在不一致状态。但是针对简单的读写操作来说,这些不一致的状态,要么是历史的一致状态,要么是未来的一致状态,不会出现错误的状态(HashMap多线程操作就会出现错误状态,比如死循环、更新丢失)。在很多程序中还是能接受这种不一致的,当然,如果你需要更高的一致性,比如获取需要同时获取ConcurrentHashMap中某几个key在某个绝对的同一时刻对应的value,那么它自带的方法就不能完成了,这时候就要依赖更外部的范围更大锁。这种差不多就算是通常所说的事务了,api是基本不提供自带事务性的方法的,需要你自己控制。4、弱一致性。第3点中一起说了。5、ConcurrentHashMap的迭代器是弱一致性的,不是fail-fast快速失败的。即使它在迭代时发现了并发修改,它也是会继续执行下去的,它不会抛出ConcurrentModificationException,因此它的迭代器可能会反映出迭代过程中的并发修改,预期迭代结果可能和调用的瞬间情况有出入。虽然迭代器在并发时不会抛出异常,但是仍然不建议多线程并发使用此迭代器。6、继承了HashMap的一些性质。7、不支持clone,不过clone本身用得就很少,用Map拷贝构造创建一个实例能够做到一样的效果。基本结构的简单示意图如下(圆形中的数字是乱写的,非真实数据)。

二、常量和变量1、常量
// 默认初始化容量,这个容量指的是所有Segment中的hash桶的数量和static final int DEFAULT_INITIAL_CAPACITY = 16;// 每个Segment的默认的加载因子static final float DEFAULT_LOAD_FACTOR = 0.75f;// 整个Map的默认的并发级别,代表最大允许16个线程同时进行并发修改操作。实际并发级别要是 2^n 这种数,同时这个变量也是数组segments的长度static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 每个Segment的最大hash桶的大小(数组Segment.table的最大长度),初始容量也不能大于此值static final int MAXIMUM_CAPACITY = 1 << 30;// 数组segments的最大长度,也是最大并发级别static final int MAX_SEGMENTS = 1 << 16; // slightly conservative,翻译过来是略保守,实际远远用不到这么多Segment啊// size方法和containValue方法最大的不加锁尝试次数// 简单点说就是先不加锁执行一次,如果没发现改变,就返回,发现改变就再重复一次,再发现改变就所有对segment都加锁再操作一次,这样设计主要是为了避免加锁提高效率。static final int RETRIES_BEFORE_LOCK = 2;2、变量
// 段掩码,跟子网掩码以及HashMap中的 table.length - 1 的作用差不多,都是用为了用位运算加速hash散列定位final int segmentMask;// 段偏移量,定位到一个segment使用的是hash值的高位,segments数组长度为2^n的话,此数值为32-n,也就是把最高的n位移动到最低的n位final int segmentShift;// 段数组final Segment<K,V>[] segments;// 下面都懂transient Set<K> keySet;transient Set<Map.Entry<K,V>> entrySet;transient Collection<V> values;

三、基本类

也就是HashEntry和Segments,这两个是所有方法实现的基础。先看看HashEntry,代码如下:
static final class HashEntry<K,V> {    final K key;    final int hash;    volatile V value;    final HashEntry<K,V> next;    HashEntry(K key, int hash, HashEntry<K,V> next, V value) {        this.key = key;        this.hash = hash;        this.next = next;        this.value = value;    }    @SupPRessWarnings("unchecked")    static final <K,V> HashEntry<K,V>[] newArray(int i) {        return new HashEntry[i];    }}HashEntry相对HashMap.Entry的变化:1、value变为volatile了,对多线程具有相同的可见性,直接读取value、给value赋值为一个已经知道的值(就是写value可以不依赖它当前的值),这两个操作是是不用加锁的;2、next变为final了,这也就是说不能从HashEntry链的中间或尾部添加或删除节点,因为这需要修改 next 引用值。这让删除变得复杂了一些,要复制前面的节点并重新添加,删除的效率变低。说两个问题:1、为什么使用final修饰next?key和hash因为是判别两个hashEntry是否“相等”的重要属性,实际中不允许修改,设置为final合情合理,更重要的原因是扩容时有用到hash值不会改变这个重要的前提。扩容时一部分节点重用,本质上和HashMap1.8中的高低位时一个思想。next指针在删除节点时需要被修改,设置为final使得remove操作很麻烦。这个不是为了remove不影响正在进行在HashEntry链的读操作,虽然这个类是弱一致的,但是是允许迭代读时看到其他线程的修改的,再对比看下1.7的,可以印证这一点。关于这一点,需要知道Java中final的内存语义。关于final的内存语义,可以看下这篇文章,我这里结合HashEntry简单说下。final的内存语义,能够保证读线程读去一个HashEntry节点时,它的next指针在这之前已经被正确初始化了(构造方法中this不提前逸出的情况下,HashEntry满足这一点)。如果使用普通的变量next,处理器重排序后,有可能读线程先读取到这个节点引用,然后通过引用读取next的值。next的初始化赋值因为重排序,可能被放在后面再执行,这种情况下读取到的next是默认值null。因为get/put/remove/replace都需要使用next指针进行链表遍历,在遍历链表时,next == null本身就是一个正常的状态 ,因此这种重排序基本上对所有读写操作都有严重的影响,会造成混淆,所有方法的准确性大大降低。然后,因为next == null是正常状态,不能像value一样,在读取到null时使用readUnderLock再来一次(这也是ConcurrentHashMap为什么不能允许null value的一个原因)。为了避免上面说的这种问题,把next设置为final的,这样就能保证不会读取到未被初始化赋值时的默认值null,遍历时读到null就一定能保证是遍历到了链表末尾。

2、jdk1.7开始不再使用final next,改为使用volatile next。

这样在jdk1.5(jsr133内存模型改动)之后也是也是可以的,能够避免读取到未被初始化赋值的next。可以参考下这篇讲volatile的和这篇讲双检锁单例的。这些代码是1.5时写的,1.6基本没变。在写这段代码时作者对内存模型理解有些偏差(看下这里),所以没有使用volatile。1.7开始就改为volatile next,并使用Unsafe.putOrderedObject来降低volatile的写开销,尽量提升性能。因为理解上的偏差,所以才有了readUnderLock方法。readUnderLock是在读取到value == null时重新加锁读一次,避免读取到的value是未执行初始化赋值的默认值null。在jsr133后的新的内存模型中,读取到value == null是不可能的发生的。因此1.7开始,这个设计直接去掉了,因为本身就不允许null存进来,所以不会读取到,也谈不上加锁再读一次。这个问题本身是1.5中的,但是1.6的代码基本没实质性改动,继承了这个问题,1.7的改进了不少,修复了这个问题。另外再扯下,为什么不允许null value?value好像并不影响Map的特性,网上简单找了下,大概四个原因:1、最初设计中,null value可以区别是否要加锁读,如果允许null value,那么不好区别要不要加锁,现在这已经不需要了;2、containsKey方法的有些实现是 get(key) != null,这里也需要区别出 null value,但是可以不这样实现;3、为了完全替换Hashtable,这个类是不允许null value的;4、历史原因,最开始是null value,后面的版本也不好改了,就保留下来了。

再看下Segment,很多要说的都在代码上写注释了,有些注释只是简单的翻译下,各位还是要自己看下源码

个人觉得一些要注意的点:1、Segment本质上还是一个hash表,很多操作跟HashMap很像,所以有HahsMap的基础,理解这些代码就很简单了。ConcurrentHashMap的操都是由segment对应的操作加上一些额外策略实现的。2、直接读取操作是不加锁的,读取null值才加锁读一次。这一点上面讲了,新内存模型中已经不会发生了,jdk1.7中加锁读也不要了,get在任何情况下都不加锁,所以这一点可以不管了。3、写操作replace/remove/put/clear都需要加锁,扩容rehash操作只在put内部的调用,所以也是隐含加锁的,写操作要联系HashEntry的性质(final next)来看。4、扩容和HashMap有些细小的区别,这个可以看下面写的注释。
// Segment是一个特殊的hash表,写上继承ReentrantLock只是只是为了简化一些锁定,避免单独new一个ReentrantLock。static final class Segment<K,V> extends ReentrantLock implements Serializable {    private static final long serialVersionUID = 2249069246763182397L;    // 类似size,外部修改Segment结构的都会修改这个变量,它是volatile的,几个读方法在一开始就判断这个值是否为0,能尽量保证不做无用的读取操作。    transient volatile int count;    // 跟集合类中的modCount一样,检测到这个数值变化说明Segment一定有被修改过    // 因为modCount不是volatile,有可能无法反映出一次修改操作的中间状态(历史的一致状态/未来的一致状态),    //     这些状态本身在当时就不应该被看见,看见了也没事,所以使用普通变量也合理    transient int modCount;    // 扩容阈值    transient int threshold;    // 类似HashMap的table数组    transient volatile HashEntry<K,V>[] table;    // 加载因子,所有segment都是一样的,放在这里就不需要依赖外部类了    final float loadFactor;    Segment(int initialCapacity, float lf) {        loadFactor = lf;        setTable(HashEntry.<K,V>newArray(initialCapacity));    }    @SuppressWarnings("unchecked")    static final <K,V> Segment<K,V>[] newArray(int i) {        return new Segment[i];    }    // 只有持有本segment的锁或者是构造方法中才能调用这个方法,反序列化构造时也有用这个    void setTable(HashEntry<K,V>[] newTable) {        threshold = (int)(newTable.length * loadFactor);        table = newTable;    }    // 跟indexFor算法一样    HashEntry<K,V> getFirst(int hash) {        HashEntry<K,V>[] tab = table;        return tab[hash & (tab.length - 1)];    }    // 加锁读取value,在直接读取value得到null时调用    // 源码这里有英文注释:读到value为null,只有当某种重排序的HashEntry初始化代码让volatile变量初始化重排序到构造方法外面时才会出现,    //     这一点旧的内存模型下是合法的,但是不知道会不会发生。    // 具体怎么发生,就是节点的构造方法执行完了,但是value的赋值重排序到构造方法外面,然后节点被引用了,挂载成了HashEntry链第一个节点,    //     此时可以读取到这个节点,但是value的赋值不一定被执行了,value是默认值null    // 搜下stackoverflow,说在新内存模型下已经不会发生了,是作者自己理解有些问题,后续的1.7也修复了    // http://stackoverflow.com/questions/5002428/concurrenthashmap-reorder-instruction/5144515#5144515    V readValueUnderLock(HashEntry<K,V> e) {        lock();        try {            return e.value;        } finally {            unlock();        }    }    // 下面的方法是为了实现map中的方法,名字都很像    // 直接读value是不用加锁的,碰到读到value == null,才加锁再读一次,这个前面说了,后面的也一样    V get(Object key, int hash) {        if (count != 0) { // read-volatile            HashEntry<K,V> e = getFirst(hash);            while (e != null) {                if (e.hash == hash && key.equals(e.key)) {                    V v = e.value;                    if (v != null)                        return v;                    return readValueUnderLock(e); // recheck                }                e = e.next;            }        }        return null;    }    boolean containsKey(Object key, int hash) {        if (count != 0) { // read-volatile            HashEntry<K,V> e = getFirst(hash);            while (e != null) {                if (e.hash == hash && key.equals(e.key)) // ConcurrentHashMap禁止添加null key,所以这里不用考虑key为null的情况,下同                    return true;                e = e.next;            }        }        return false;    }    // 跟get差不多    boolean containsValue(Object value) {        if (count != 0) { // read-volatile            HashEntry<K,V>[] tab = table;            int len = tab.length;            for (int i = 0 ; i < len; i++) {                for (HashEntry<K,V> e = tab[i]; e != null; e = e.next) {                    V v = e.value;                    if (v == null) // recheck                        v = readValueUnderLock(e);                    if (value.equals(v))                        return true;                }            }        }        return false;    }    // 几个写操作,需要加锁,下同    boolean replace(K key, int hash, V oldValue, V newValue) {        lock();        try {            HashEntry<K,V> e = getFirst(hash);            while (e != null && (e.hash != hash || !key.equals(e.key)))                e = e.next;            boolean replaced = false;            if (e != null && oldValue.equals(e.value)) {                // replace不会修改modCount,这降低了containValue方法的准确性,jdk1.7修复了这一点                replaced = true;                e.value = newValue;            }            return replaced;        } finally {            unlock();        }    }    V replace(K key, int hash, V newValue) {        lock();        try {            HashEntry<K,V> e = getFirst(hash);            while (e != null && (e.hash != hash || !key.equals(e.key)))                e = e.next;            V oldValue = null;            if (e != null) {                // replace不会修改modCount,这降低了containValue方法的准确性,jdk1.7修复了这一点                oldValue = e.value;                e.value = newValue;            }            return oldValue;        } finally {            unlock();        }    }    // 基本思路跟1.6的HashMap一样    V put(K key, int hash, V value, boolean onlyIfAbsent) {        lock();        try {            int c = count;            // 先执行扩容,再添加节点,1.6的HashMap是先添加节点,再扩容            // 并且Segment这里是先用大于号判断大小,再count++,扩容时实际容量会比HashMap中同情况时多一个,            //     会出现put完成后Segment.count > threshold应该扩容但是却没有扩容的情况,这不太符合设计            // eg:threshold = 12,那么在这个Segment上执行第13次put时,判断语句为 12++ > 12,为false,执行完成后Segment.count = 13,threshold = 12            // 另外默认构造情况下threshold = 0,而且每个Segment的table.length都为1,如果更改为符合设计的思路的扩容方式,第一次put又一定会触发扩容            // 1.7修改了上面两点,判断语句先让count加1,再用大于号执行判断,同时让table.length 最小值为2,这样第一次put也不会扩容,完善了整体的扩容机制            if (c++ > threshold) // ensure capacity                rehash();            HashEntry<K,V>[] tab = table;            int index = hash & (tab.length - 1); // 定位方式和1.6的HashMap一样            HashEntry<K,V> first = tab[index];            HashEntry<K,V> e = first;            while (e != null && (e.hash != hash || !key.equals(e.key)))                e = e.next;            V oldValue;            if (e != null) {                oldValue = e.value;                if (!onlyIfAbsent)                    // put相同的key(相当于replace)不会修改modCount,这降低了containsValue方法的准确性,jdk1.7修复了这一点                    e.value = value;            }            else {                oldValue = null;                ++modCount;                // 也是添加在HashEntry链的头部,前面说了,这里的HashEntry的next指针是final的,new后就不能变                tab[index] = new HashEntry<K,V>(key, hash, first, value);                count = c;            }            return oldValue;        } finally {            unlock();        }    }    // 扩容时为了不影响正在进行的读线程,最好的方式是全部节点复制一次并重新添加    // 这里根据扩容时节点迁移的性质,最大可能的重用一部分节点,这个性质跟1.8的HashMap中的高低位是一个道理,必须要求hash值是final的    void rehash() {        HashEntry<K,V>[] oldTable = table;        int oldCapacity = oldTable.length;        if (oldCapacity >= MAXIMUM_CAPACITY)            return;        // 这里有段注释是说下面的算法及其作用的,跟1.8的HashMap的resize中用到的那个高低温的原理一样:        //     扩容前在一个hash桶中的节点,扩容后只会有两个去向。这里是用 &,后续改为用高低位,实质上是一样的。        //     根据这个去向,找到最末尾的去向都一样的连续的一部分,这部分可以重用,不需要复制        // HashEntry的next是final的,resize/rehash时需要重新new,这里的特殊之处就是最大程度重用HashEntry链尾部的一部分,尽量减少重新new的次数        // 作者说从统计角度看,默认设置下有大约1/6的节点需要被重新复制一次,所以通常情况还是能节省不少时间的        HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);        threshold = (int)(newTable.length * loadFactor); // 跟1.6的HashMap有一样的小问题,可能会过早变为Integer.MAX_VALUE从而导致后续永远不能扩容        int sizeMask = newTable.length - 1;        for (int i = 0; i < oldCapacity ; i++) {            // We need to guarantee that any existing reads of old Map can proceed. So we cannot yet null out each bin.            // 为了保证其他线程能够继续执行读操作,不执行手动将原来table赋值为null,只是再最后修改一次table的引用            // 1.6的HashMap对应的地方有个src[j] = null,这句话在多线程时某些程度上能加快回收旧table数组,但是放在这里会影响其他线程的读操作            // 只要其他线程完成了读操作,就不会再引用旧HashEntry,旧的就会自动被垃圾回收器回收            // 下面有个关于resize中重用节点的示意图,可以看下            HashEntry<K,V> e = oldTable[i];            if (e != null) {                HashEntry<K,V> next = e.next;                int idx = e.hash & sizeMask;                //  Single node on list                if (next == null)                    newTable[idx] = e;                else {                    // Reuse trailing consecutive sequence at same slot                    HashEntry<K,V> lastRun = e;                    int lastIdx = idx;                    // 这个循环是寻找HashEntry链最大的可重用的尾部                    // 看过1.8的HashMap就知道,如果hash值是final的,那么每次扩容,扩容前在一条链表上的节点,扩容后只会有两个去向                    // 这里重用部分中,所有节点的去向相同,它们可以不用被复制                    for (HashEntry<K,V> last = next;                         last != null;                         last = last.next) {                        int k = last.hash & sizeMask;                        if (k != lastIdx) {                            lastIdx = k;                            lastRun = last;                        }                    }                    newTable[lastIdx] = lastRun; // 把重用部分整体放在扩容后的hash桶中                    // 复制不能重用的部分,并把它们插入到rehash后的所在HashEntry链的头部                    for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {                        int k = p.hash & sizeMask;                        HashEntry<K,V> n = newTable[k];                        newTable[k] = new HashEntry<K,V>(p.key, p.hash, n, p.value);                    }                    // 这里也可以看出,重用部分rehash后相对顺序不变,并且还是在rehash后的链表的尾部                    // 不能重用那些节点在rehash后,再一个个重头添加到链表的头部,如果还在一条链表上面,那么不能重用节点的相对顺序会颠倒                    // 所以ConcurrentHashMap的迭代顺序会变化,本身它也不保证迭代顺序不变                }            }        }        table = newTable;    }    // 因为1.6的HashEntry的next指针是final的,所以比普通的链表remove要复杂些,只有被删除节点的后面可以被重用,前面的都要再重新insert一次    V remove(Object key, int hash, Object value) {        lock();        try {            int c = count - 1;            HashEntry<K,V>[] tab = table;            int index = hash & (tab.length - 1);            HashEntry<K,V> first = tab[index];            HashEntry<K,V> e = first;            while (e != null && (e.hash != hash || !key.equals(e.key)))                e = e.next;            V oldValue = null;            if (e != null) {                V v = e.value;                if (value == null || value.equals(v)) {                    oldValue = v;                    // All entries following removed node can stay in list, but all preceding ones need to be cloned.                    // 因为next指针是final的,所以删除不能用简单的链表删除,需要把前面的节点都重新复制再插入一次,后面的节点可以重用                    // 删除后,后面的可以重用的那部分顺序不变且还是放在最后,前面的被复制的那部分顺序颠倒地放在前面                    // 后面Map.remove那里有个remove的示意图                    ++modCount;                    HashEntry<K,V> newFirst = e.next;                    for (HashEntry<K,V> p = first; p != e; p = p.next)                        newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value);                    tab[index] = newFirst;                    count = c; // write-volatile                }            }            return oldValue;        } finally {            unlock();        }    }    void clear() {        if (count != 0) {            lock();            try {                HashEntry<K,V>[] tab = table;                for (int i = 0; i < tab.length ; i++)                    tab[i] = null;                ++modCount;                count = 0; // write-volatile            } finally {                unlock();            }        }    }}下面是jdk1.6 ConcurrentHashMap Segment resize中重用节点的示意图。

四、构造方法

这里就很简单了,看下代码和注释就行。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)        throw new IllegalArgumentException();    if (concurrencyLevel > MAX_SEGMENTS)        concurrencyLevel = MAX_SEGMENTS;    // 寻找适合的2^n,以及n,默认构造时是16和4    int sshift = 0;    int ssize = 1;    while (ssize < concurrencyLevel) {        ++sshift;        ssize <<= 1;    }    segmentShift = 32 - sshift; // 定位Segment的index时hash值的移位,为32 - n,默认构造时是28    segmentMask = ssize - 1; // 用于 & 运算定位Segment的index,默认构造时是0x0f    this.segments = Segment.newArray(ssize); // 实际的concurrentLevel就是Segment数组的长度    if (initialCapacity > MAXIMUM_CAPACITY) // 虽然整个Map的capacity可以大于MAXIMUM_CAPACITY,但是初始化时为了满足2^n的要求,最大也只允许这么多        initialCapacity = MAXIMUM_CAPACITY;    int c = initialCapacity / ssize;    if (c * ssize < initialCapacity)        ++c;    int cap = 1;    while (cap < c)        cap <<= 1;    // 上面这段,求每个Segment代表的实际的hash表的table数组的长度(专业点叫hash桶的数量),这个值是cap,默认构造时是1    // 参数initailCapacity是一个容量参考值,用来计算每个Segment在初始化时有多少个hash桶(这个桶的数量要符合2^n)    // 整个ConcurrentHashMap的容量capacity是所有Segment的容量和,在初始化时每个Segment都一样,    //     但是每个Segment都可以单独扩容,所以构造完成后这个capacity值就没啥用了    for (int i = 0; i < this.segments.length; ++i)        this.segments[i] = new Segment< K,V > (cap, loadFactor);}public ConcurrentHashMap(int initialCapacity, float loadFactor) {    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(int initialCapacity) {    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap() {    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(Map<? extends K, ? extends V> m) {    this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY),         DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);    putAll(m);}

五、一些基础方法

有了HashMap作为基础,这里就简单在注释上说下。
// Wang/Jenkins hash的变种算法// 因为算出来的hash的最高的几位用于定位段,最低的几位用于段内定位hash桶,所以高位低位都需要扰动,要两个方向移位// HashMap中算出来的hash值只有最低的几位会被用到,所以只对需要对低位进行操作,一个方向移位就行 ,不需要下面这种更复杂的hash函数private static int hash(int h) {    // Spread bits to regularize both segment and index locations,    // using variant of single-Word Wang/Jenkins hash.    h += (h <<  15) ^ 0xffffcd7d;    h ^= (h >>> 10);    h += (h <<   3);    h ^= (h >>>  6);    h += (h <<   2) + (h << 14);    return h ^ (h >>> 16);}// 定位到某个具体的Segment,使用hash值的高位,算法和HashMap的indexFor一样// ConcurrentHashMap中没有抽象出indexFor了,实际实现还是一样的,看Segment的代码final Segment<K,V> segmentFor(int hash) {    return segments[(hash >>> segmentShift) & segmentMask];}这里可以看到,ConcurrentHashMap的有两种hash散列定位,分别使用的是hash值中不同的地方,所以理论上ConcurrentHashMap的冲突要比同参数的HashMap要少。因为ConcurrentHashMap每个Segment都可以单独扩容,扩容方法移动到Segment里面去了,Segment.rehash就是扩容方法。六、常用方法1、读方法比较简单,核心思路是代理到对应的Segment去做相应的操作,要批量读取所有Sement的方法特殊处理下。
// ConcurrentHashMap.get是通过hash值定位到Segment,有这个Segment的get来完成的public V get(Object key) {    int hash = hash(key.hashCode());    return segmentFor(hash).get(key, hash);}// 同getpublic boolean containsKey(Object key) {    int hash = hash(key.hashCode());    return segmentFor(hash).containsKey(key, hash);}// containsValue和size虽然是读操作,但是会批量读取到所有Segment,所以特殊处理// 先不加锁尝试两次以获得比较近似的结果,如果contains就直接返回(因为有一个存在就是存在,不存在才需要遍历全部),不contains才继续,如果发现modCount被其他线程修改,就全部加锁再执行// 不加锁读两次时,可能会碰见写操作的中间状态,也可能在循环到后面时有线程修改了前面,所以这个方法不是100%准确的// 设计成这样主要是为了提高效率,很多业务还是可以接受这种误差,需要更强一致性的时候,可以自己写个方法// 上面Segment的分析中指出了,put相同的key、replace方法不会修改modCount,但是会改变value,这一点使得后面检测modCount是否改变可能成为无用功,让containsValue方法的准确性降低了,1.7进行了修复public boolean containsValue(Object value) {    if (value == null)        throw new NullPointerException();    final Segment<K,V>[] segments = this.segments;    int[] mc = new int[segments.length];    // 先不加锁执行RETRIES_BEFORE_LOCK = 2次    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {        int sum = 0;        int mcsum = 0;        for (int i = 0; i < segments.length; ++i) {            int c = segments[i].count; // 这个c没哪里用,意义不明            mcsum += mc[i] = segments[i].modCount; // 就是 mc[i] = segments[i].count; mssum += mc[i],临时保存一份modCount            if (segments[i].containsValue(value)) // 碰见contains直接return                return true;        }        boolean cleanSweep = true;        // mcsum是modCount的和,为0可以认为遍历开始时没有任何put完成过任何HashEntry,即方法开始执行时不包含任何HashEntry,可以认为(近似认为,几率比较大)此时也不包含        if (mcsum != 0) {            for (int i = 0; i < segments.length; ++i) {                int c = segments[i].count;                if (mc[i] != segments[i].modCount) { // modCount改变,说明有其他线程修改了Segment的结构,退出循环。会有replace的问题,前面说了                    cleanSweep = false;                    break;                }            }        }        if (cleanSweep)            return false;    }    // 如果连续两次都碰见modCount改变的情况,这时候一次性对全部Segment加锁,最大程度保证遍历时的一致性    // 因为是全部加锁后再遍历,遍历开始后没有线程可以修改任何Segment的结构,可以保证当前线程得到的是准确值    for (int i = 0; i < segments.length; ++i)        segments[i].lock();    boolean found = false;    try {        for (int i = 0; i < segments.length; ++i) {            if (segments[i].containsValue(value)) {                found = true;                break;            }        }    } finally {        for (int i = 0; i < segments.length; ++i)            segments[i].unlock();    }    return found;}// 不是Map接口的方法,为了兼容Hashtable,等价于containsValuepublic boolean contains(Object value) {    return containsValue( value);}// 基本同containValue,但是只执行一次且不会加锁public boolean isEmpty() {    final Segment<K,V>[] segments = this.segments;    int[] mc = new int[segments.length];    int mcsum = 0;    for (int i = 0; i < segments.length; ++i) {        if (segments[i].count != 0)            return false;        else            mcsum += mc[i] = segments[i].modCount;    }    if (mcsum != 0) {        for (int i = 0; i < segments.length; ++i) {            if (segments[i].count != 0 || mc[i] != segments[i].modCount)                return false;        }    }    return true;}// 跟containsValue差不多,但是size不会受put相同的key、replace方法的影响// 注意最后一个int溢出处理,因为HashMap以及ConcurrentHashMap是个特殊的集合类,我们通常所说的容量是hash桶的数目,这并不是实际容量// 因为使用链表解决hash冲突的原因,实际的可以容纳得更多,可能会远远超多Integer.MAX_VALUE,这这时返回值就是个错误的值,但还是尽量返回了一个“比较有用”的值。// 这纯粹是历史原因造成的坑,返回个int,没考虑实际情况,1.8的新增了一个mappingCount方法,返回long型准确数字public int size() {    final Segment<K,V>[] segments = this.segments;    long sum = 0;    long check = 0;    int[] mc = new int[segments.length];    // 不加锁执行两次,如果两次数据不一样,或者碰到modCount++被修改了,就全部加锁在执行一次    for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {        check = 0;        sum = 0;        int mcsum = 0;        for (int i = 0; i < segments.length; ++i) {            sum += segments[i].count;            mcsum += mc[i] = segments[i].modCount;        }        if (mcsum != 0) {            for (int i = 0; i < segments.length; ++i) {                check += segments[i].count;                if (mc[i] != segments[i].modCount) {                    check = -1; // force retry                    break;                }            }        }        if (check == sum)            break;    }    if (check != sum) {        sum = 0;        for (int i = 0; i < segments.length; ++i)            segments[i].lock();        for (int i = 0; i < segments.length; ++i)            sum += segments[i].count;        for (int i = 0; i < segments.length; ++i)            segments[i].unlock();    }    if (sum > Integer.MAX_VALUE) // int溢出处理,因此返回值可能会是错误的。                                 // 并且因为兼容性的原因,这个还无法解决,只能新增一个方法,1.8的ConcurrentHashMap就是新增了一个返回long型的方法        return Integer.MAX_VALUE;    else        return (int)sum;}2、写方法基本思路还是通过hash定位到Segment,由Segment完成。注意下putAll和clear这两个批量操作。
// ConcurrentHashMap.put是通过hash值定位到Segment,有这个Segment的put来完成的// ConcurrentHashMap的实现中不允许null key和null valuepublic V put(K key, V value) {    if (value == null)        throw new NullPointerException();    int hash = hash(key.hashCode()); // 这里 null key 会抛出NPE    return segmentFor(hash).put(key, hash, value, false);}// 下面几个基本思路同put,都是代理给相应的Segment的对应方法进行操作public V putIfAbsent(K key, V value);public V remove(Object key);public boolean remove(Object key, Object value);public boolean replace(K key, V oldValue, V newValue);public V replace(K key, V value);// putAll和clear都是循环操作,没有全局加锁,在执行期间还是可以执行完成其他的写操作的,事务性比较差的方法,设计成不用全局锁是为了提高效率public void putAll(Map<? extends K, ? extends V> m) {    for (Map.Entry<? extends K, ? extends V> e : m.entrySet())        put(e.getKey(), e.getValue());}public void clear() {    for (int i = 0; i < segments.length; ++i)        segments[i].clear();}put的简单个示意图如下(圆形中的数字是乱写的,非真实数据)。remove的简单示意图如下(圆形中的数字是乱写的,非真实数据)。

七、视图以及迭代器

视图指的的 keySet()、values()、entrySet()、keys()、elements()这几个方法,三个视图属性实质上也是用这几个方法。视图的操作都是基本上代理给map本身来进行操作,因此视图中进行写操作(一般是remove,没实现add),也会造成map发生更改。特别注意下values的类型就是Collection,因此操作都很原始,没有任何子类的特性可以使用,平时尽量也不要用这个。迭代器不使用没有全局加锁,没有使用modCount机制,允许迭代时有其他线程并发进行写操作,因此ConcurrentHashMap的迭代器是弱一致的,也不是快速失败的。它永远不会抛出ConcurrentModificationException,也就是迭代过程允许Map被修改,这些修改可以(但是不保证绝对可以)实时地反映在当前的迭代结果中。跟HashMap的迭代器有些区别,CoincurrentHashMap的迭代器是”逆序“的,也就是在segments、Segment.table中都是从大下标开始迭代。这个应该是为了降低迭代与写操作的冲突,但是感觉作用并不明显啊。1.8的ConcurrentHashMap中迭代和扩容就是逆序的,但是1.8的这一点很明显能降低迭代操作与写操作冲突。1.6的这里感觉最多是为了clear()、putAll之类的冲突减少。另外,由于实际的Map.Entry,也就是内部类HashEntry中没有public get/set方法,返回的EntrySet无法被使用,这里多了一个简单的封装类WriteThroughEntry entends Abstract.SimpleEntry,用于封装外部在迭代操作对Entry的写操作。HashMap中的HashMap.Entry本身就有public get/set方法,所以HashMap的EntrySet直接返回HashMap.Entry类型的Set就行,不必再封装一次。
final class WriteThroughEntry extends AbstractMap.SimpleEntry<K,V>{    WriteThroughEntry(K k, V v) {        super(k,v);    }    public V setValue(V value) {        if (value == null) throw new NullPointerException();        V v = super.setValue(value);        ConcurrentHashMap.this.put(getKey(), value);        return v;    }}特别的一点,这里会进行一次put操作,保证value所在的hashEntry在当时是一定存在的。这里把setValue 看成是 put相同的key,如果此时value不存在,也会添加一个进去。另外,由于读操作不加锁,因此value返回的也是一个过期的值,ConcurrentHashMap本身也保证不了更多。1.6的代码跟基本1.5的一样,都是比较古老的代码了。之后的1.7/1.8都进行不少优化,实现上也变化了很多,变得更加复杂了。学习1.6的很重要的一点,就是它把ConcurrentHashMap的基本的原理、分段锁思想、各个方法的基本实现,讲得足够清楚、简单,这一点很重要。现在各种编程语言的使用成本在降低,但是相对的学习成本变高了,为了更加强大、实用,许多标准api都进行了各种细节上的优化,代码不再简单(比如1.6的这个类的源码总共1282行,到了1.8变成了6313行,有大概40%的是函数式、Stream相关的,也就是基本的功能用了大约3倍的代码量来实现),想弄清楚一个基本的思路都得看半天。学习最好是要从简单、基础的东西开始,1.6的源码也是集合类中的基础,看懂了,理解后续版本就会简单了。下一篇说下1.7的ConcurrentHashMap,它相对1.6来说,改动不算特别大。以上内容,如有问题,烦请指出,谢谢!


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表