废话就不多说了,直入正题。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable
一、基本性质1、支持完全并发的读操作和可控并发级别的写操作的Map,方法的基本功能与Hashtable大致相同,但是使用锁分段技术,没有使用全局锁,整体上更有效率,能够在大多数并发场景中替代Hashtable。2、不允许null key和null value(1.7之前的代码中,指出非常特殊情况可能会读到null value,但是在jsr133后的新内存模型中是不会发生的,也就是实际中不会读到null value,1.7把这个无用的代码去掉了)。3、专为并发设计的集合类。它的读操作不加锁(null value的问题不说了),可以和写操作并行,但是读操作只反映最近完成的结果,也可能和它开始执行的那一瞬间的状态有出入,这在一般的并发场景中可以接受。基本的写操作加锁,putAll和clear这两个批量操作没有使用全局锁,因此并不保证结果和预期的一致(比如clear执行后的一瞬间,可能map中会有其他线程添加的数据)。它还不是真正意义上的线程安全,记住线程安全的关键:线程一个个串行,和多线程并发的执行,产生的效果(对外界可见的影响)一样。很明显ConcurrentHashMap达不到这种程度。它的这种并发控制更像是数据库隔离级别。它没有全局锁(数据库进程锁/库锁),使用的是Segment的锁(数据库表级别的锁)。根据写阻塞写不阻塞读这一点,可以认为它具体的隔离级别是"Read uncommitted"这个级别,即不会丢失写操作带来的影响,但是会读取到"中间态数据/脏数据"(写操作方法还没有返回,就能看到写操作带来的部分影响)。至于”不可重复读“、”幻读“这两个,根据具体实现,Segment的读方法都只读一次(加锁读不管),可以认为是没有的。“脏数据”这个,可以认为只在putAll、clear中才会出现。基本的put/remove/replace,因为都只操作一个K-V,因此可以认为它们的执行过程中是没有“脏数据”出现的。ConcurrentHashMap在多线程之间可能会存在不一致状态。但是针对简单的读写操作来说,这些不一致的状态,要么是历史的一致状态,要么是未来的一致状态,不会出现错误的状态(HashMap多线程操作就会出现错误状态,比如死循环、更新丢失)。在很多程序中还是能接受这种不一致的,当然,如果你需要更高的一致性,比如获取需要同时获取ConcurrentHashMap中某几个key在某个绝对的同一时刻对应的value,那么它自带的方法就不能完成了,这时候就要依赖更外部的范围更大锁。这种差不多就算是通常所说的事务了,api是基本不提供自带事务性的方法的,需要你自己控制。4、弱一致性。第3点中一起说了。5、ConcurrentHashMap的迭代器是弱一致性的,不是fail-fast快速失败的。即使它在迭代时发现了并发修改,它也是会继续执行下去的,它不会抛出ConcurrentModificationException,因此它的迭代器可能会反映出迭代过程中的并发修改,预期迭代结果可能和调用的瞬间情况有出入。虽然迭代器在并发时不会抛出异常,但是仍然不建议多线程并发使用此迭代器。6、继承了HashMap的一些性质。7、不支持clone,不过clone本身用得就很少,用Map拷贝构造创建一个实例能够做到一样的效果。基本结构的简单示意图如下(圆形中的数字是乱写的,非真实数据)。二、常量和变量1、常量// 默认初始化容量,这个容量指的是所有Segment中的hash桶的数量和static final int DEFAULT_INITIAL_CAPACITY = 16;// 每个Segment的默认的加载因子static final float DEFAULT_LOAD_FACTOR = 0.75f;// 整个Map的默认的并发级别,代表最大允许16个线程同时进行并发修改操作。实际并发级别要是 2^n 这种数,同时这个变量也是数组segments的长度static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 每个Segment的最大hash桶的大小(数组Segment.table的最大长度),初始容量也不能大于此值static final int MAXIMUM_CAPACITY = 1 << 30;// 数组segments的最大长度,也是最大并发级别static final int MAX_SEGMENTS = 1 << 16; // slightly conservative,翻译过来是略保守,实际远远用不到这么多Segment啊// size方法和containValue方法最大的不加锁尝试次数// 简单点说就是先不加锁执行一次,如果没发现改变,就返回,发现改变就再重复一次,再发现改变就所有对segment都加锁再操作一次,这样设计主要是为了避免加锁提高效率。static final int RETRIES_BEFORE_LOCK = 2;2、变量// 段掩码,跟子网掩码以及HashMap中的 table.length - 1 的作用差不多,都是用为了用位运算加速hash散列定位final int segmentMask;// 段偏移量,定位到一个segment使用的是hash值的高位,segments数组长度为2^n的话,此数值为32-n,也就是把最高的n位移动到最低的n位final int segmentShift;// 段数组final Segment<K,V>[] segments;// 下面都懂transient Set<K> keySet;transient Set<Map.Entry<K,V>> entrySet;transient Collection<V> values;三、基本类
也就是HashEntry和Segments,这两个是所有方法实现的基础。先看看HashEntry,代码如下:static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } @SupPRessWarnings("unchecked") static final <K,V> HashEntry<K,V>[] newArray(int i) { return new HashEntry[i]; }}HashEntry相对HashMap.Entry的变化:1、value变为volatile了,对多线程具有相同的可见性,直接读取value、给value赋值为一个已经知道的值(就是写value可以不依赖它当前的值),这两个操作是是不用加锁的;2、next变为final了,这也就是说不能从HashEntry链的中间或尾部添加或删除节点,因为这需要修改 next 引用值。这让删除变得复杂了一些,要复制前面的节点并重新添加,删除的效率变低。说两个问题:1、为什么使用final修饰next?key和hash因为是判别两个hashEntry是否“相等”的重要属性,实际中不允许修改,设置为final合情合理,更重要的原因是扩容时有用到hash值不会改变这个重要的前提。扩容时一部分节点重用,本质上和HashMap1.8中的高低位时一个思想。next指针在删除节点时需要被修改,设置为final使得remove操作很麻烦。这个不是为了remove不影响正在进行在HashEntry链的读操作,虽然这个类是弱一致的,但是是允许迭代读时看到其他线程的修改的,再对比看下1.7的,可以印证这一点。关于这一点,需要知道Java中final的内存语义。关于final的内存语义,可以看下这篇文章,我这里结合HashEntry简单说下。final的内存语义,能够保证读线程读去一个HashEntry节点时,它的next指针在这之前已经被正确初始化了(构造方法中this不提前逸出的情况下,HashEntry满足这一点)。如果使用普通的变量next,处理器重排序后,有可能读线程先读取到这个节点引用,然后通过引用读取next的值。next的初始化赋值因为重排序,可能被放在后面再执行,这种情况下读取到的next是默认值null。因为get/put/remove/replace都需要使用next指针进行链表遍历,在遍历链表时,next == null本身就是一个正常的状态 ,因此这种重排序基本上对所有读写操作都有严重的影响,会造成混淆,所有方法的准确性大大降低。然后,因为next == null是正常状态,不能像value一样,在读取到null时使用readUnderLock再来一次(这也是ConcurrentHashMap为什么不能允许null value的一个原因)。为了避免上面说的这种问题,把next设置为final的,这样就能保证不会读取到未被初始化赋值时的默认值null,遍历时读到null就一定能保证是遍历到了链表末尾。2、jdk1.7开始不再使用final next,改为使用volatile next。
这样在jdk1.5(jsr133内存模型改动)之后也是也是可以的,能够避免读取到未被初始化赋值的next。可以参考下这篇讲volatile的和这篇讲双检锁单例的。这些代码是1.5时写的,1.6基本没变。在写这段代码时作者对内存模型理解有些偏差(看下这里),所以没有使用volatile。1.7开始就改为volatile next,并使用Unsafe.putOrderedObject来降低volatile的写开销,尽量提升性能。因为理解上的偏差,所以才有了readUnderLock方法。readUnderLock是在读取到value == null时重新加锁读一次,避免读取到的value是未执行初始化赋值的默认值null。在jsr133后的新的内存模型中,读取到value == null是不可能的发生的。因此1.7开始,这个设计直接去掉了,因为本身就不允许null存进来,所以不会读取到,也谈不上加锁再读一次。这个问题本身是1.5中的,但是1.6的代码基本没实质性改动,继承了这个问题,1.7的改进了不少,修复了这个问题。另外再扯下,为什么不允许null value?value好像并不影响Map的特性,网上简单找了下,大概四个原因:1、最初设计中,null value可以区别是否要加锁读,如果允许null value,那么不好区别要不要加锁,现在这已经不需要了;2、containsKey方法的有些实现是 get(key) != null,这里也需要区别出 null value,但是可以不这样实现;3、为了完全替换Hashtable,这个类是不允许null value的;4、历史原因,最开始是null value,后面的版本也不好改了,就保留下来了。再看下Segment,很多要说的都在代码上写注释了,有些注释只是简单的翻译下,各位还是要自己看下源码。
个人觉得一些要注意的点:1、Segment本质上还是一个hash表,很多操作跟HashMap很像,所以有HahsMap的基础,理解这些代码就很简单了。ConcurrentHashMap的操都是由segment对应的操作加上一些额外策略实现的。2、直接读取操作是不加锁的,读取null值才加锁读一次。这一点上面讲了,新内存模型中已经不会发生了,jdk1.7中加锁读也不要了,get在任何情况下都不加锁,所以这一点可以不管了。3、写操作replace/remove/put/clear都需要加锁,扩容rehash操作只在put内部的调用,所以也是隐含加锁的,写操作要联系HashEntry的性质(final next)来看。4、扩容和HashMap有些细小的区别,这个可以看下面写的注释。// Segment是一个特殊的hash表,写上继承ReentrantLock只是只是为了简化一些锁定,避免单独new一个ReentrantLock。static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; // 类似size,外部修改Segment结构的都会修改这个变量,它是volatile的,几个读方法在一开始就判断这个值是否为0,能尽量保证不做无用的读取操作。 transient volatile int count; // 跟集合类中的modCount一样,检测到这个数值变化说明Segment一定有被修改过 // 因为modCount不是volatile,有可能无法反映出一次修改操作的中间状态(历史的一致状态/未来的一致状态), // 这些状态本身在当时就不应该被看见,看见了也没事,所以使用普通变量也合理 transient int modCount; // 扩容阈值 transient int threshold; // 类似HashMap的table数组 transient volatile HashEntry<K,V>[] table; // 加载因子,所有segment都是一样的,放在这里就不需要依赖外部类了 final float loadFactor; Segment(int initialCapacity, float lf) { loadFactor = lf; setTable(HashEntry.<K,V>newArray(initialCapacity)); } @SuppressWarnings("unchecked") static final <K,V> Segment<K,V>[] newArray(int i) { return new Segment[i]; } // 只有持有本segment的锁或者是构造方法中才能调用这个方法,反序列化构造时也有用这个 void setTable(HashEntry<K,V>[] newTable) { threshold = (int)(newTable.length * loadFactor); table = newTable; } // 跟indexFor算法一样 HashEntry<K,V> getFirst(int hash) { HashEntry<K,V>[] tab = table; return tab[hash & (tab.length - 1)]; } // 加锁读取value,在直接读取value得到null时调用 // 源码这里有英文注释:读到value为null,只有当某种重排序的HashEntry初始化代码让volatile变量初始化重排序到构造方法外面时才会出现, // 这一点旧的内存模型下是合法的,但是不知道会不会发生。 // 具体怎么发生,就是节点的构造方法执行完了,但是value的赋值重排序到构造方法外面,然后节点被引用了,挂载成了HashEntry链第一个节点, // 此时可以读取到这个节点,但是value的赋值不一定被执行了,value是默认值null // 搜下stackoverflow,说在新内存模型下已经不会发生了,是作者自己理解有些问题,后续的1.7也修复了 // http://stackoverflow.com/questions/5002428/concurrenthashmap-reorder-instruction/5144515#5144515 V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } } // 下面的方法是为了实现map中的方法,名字都很像 // 直接读value是不用加锁的,碰到读到value == null,才加锁再读一次,这个前面说了,后面的也一样 V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; } boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) // ConcurrentHashMap禁止添加null key,所以这里不用考虑key为null的情况,下同 return true; e = e.next; } } return false; } // 跟get差不多 boolean containsValue(Object value) { if (count != 0) { // read-volatile HashEntry<K,V>[] tab = table; int len = tab.length; for (int i = 0 ; i < len; i++) { for (HashEntry<K,V> e = tab[i]; e != null; e = e.next) { V v = e.value; if (v == null) // recheck v = readValueUnderLock(e); if (value.equals(v)) return true; } } } return false; } // 几个写操作,需要加锁,下同 boolean replace(K key, int hash, V oldValue, V newValue) { lock(); try { HashEntry<K,V> e = getFirst(hash); while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; boolean replaced = false; if (e != null && oldValue.equals(e.value)) { // replace不会修改modCount,这降低了containValue方法的准确性,jdk1.7修复了这一点 replaced = true; e.value = newValue; } return replaced; } finally { unlock(); } } V replace(K key, int hash, V newValue) { lock(); try { HashEntry<K,V> e = getFirst(hash); while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { // replace不会修改modCount,这降低了containValue方法的准确性,jdk1.7修复了这一点 oldValue = e.value; e.value = newValue; } return oldValue; } finally { unlock(); } } // 基本思路跟1.6的HashMap一样 V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); try { int c = count; // 先执行扩容,再添加节点,1.6的HashMap是先添加节点,再扩容 // 并且Segment这里是先用大于号判断大小,再count++,扩容时实际容量会比HashMap中同情况时多一个, // 会出现put完成后Segment.count > threshold应该扩容但是却没有扩容的情况,这不太符合设计 // eg:threshold = 12,那么在这个Segment上执行第13次put时,判断语句为 12++ > 12,为false,执行完成后Segment.count = 13,threshold = 12 // 另外默认构造情况下threshold = 0,而且每个Segment的table.length都为1,如果更改为符合设计的思路的扩容方式,第一次put又一定会触发扩容 // 1.7修改了上面两点,判断语句先让count加1,再用大于号执行判断,同时让table.length 最小值为2,这样第一次put也不会扩容,完善了整体的扩容机制 if (c++ > threshold) // ensure capacity rehash(); HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); // 定位方式和1.6的HashMap一样 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { oldValue = e.value; if (!onlyIfAbsent) // put相同的key(相当于replace)不会修改modCount,这降低了containsValue方法的准确性,jdk1.7修复了这一点 e.value = value; } else { oldValue = null; ++modCount; // 也是添加在HashEntry链的头部,前面说了,这里的HashEntry的next指针是final的,new后就不能变 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; } return oldValue; } finally { unlock(); } } // 扩容时为了不影响正在进行的读线程,最好的方式是全部节点复制一次并重新添加 // 这里根据扩容时节点迁移的性质,最大可能的重用一部分节点,这个性质跟1.8的HashMap中的高低位是一个道理,必须要求hash值是final的 void rehash() { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; // 这里有段注释是说下面的算法及其作用的,跟1.8的HashMap的resize中用到的那个高低温的原理一样: // 扩容前在一个hash桶中的节点,扩容后只会有两个去向。这里是用 &,后续改为用高低位,实质上是一样的。 // 根据这个去向,找到最末尾的去向都一样的连续的一部分,这部分可以重用,不需要复制 // HashEntry的next是final的,resize/rehash时需要重新new,这里的特殊之处就是最大程度重用HashEntry链尾部的一部分,尽量减少重新new的次数 // 作者说从统计角度看,默认设置下有大约1/6的节点需要被重新复制一次,所以通常情况还是能节省不少时间的 HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1); threshold = (int)(newTable.length * loadFactor); // 跟1.6的HashMap有一样的小问题,可能会过早变为Integer.MAX_VALUE从而导致后续永远不能扩容 int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity ; i++) { // We need to guarantee that any existing reads of old Map can proceed. So we cannot yet null out each bin. // 为了保证其他线程能够继续执行读操作,不执行手动将原来table赋值为null,只是再最后修改一次table的引用 // 1.6的HashMap对应的地方有个src[j] = null,这句话在多线程时某些程度上能加快回收旧table数组,但是放在这里会影响其他线程的读操作 // 只要其他线程完成了读操作,就不会再引用旧HashEntry,旧的就会自动被垃圾回收器回收 // 下面有个关于resize中重用节点的示意图,可以看下 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; int idx = e.hash & sizeMask; // Single node on list if (next == null) newTable[idx] = e; else { // Reuse trailing consecutive sequence at same slot HashEntry<K,V> lastRun = e; int lastIdx = idx; // 这个循环是寻找HashEntry链最大的可重用的尾部 // 看过1.8的HashMap就知道,如果hash值是final的,那么每次扩容,扩容前在一条链表上的节点,扩容后只会有两个去向 // 这里重用部分中,所有节点的去向相同,它们可以不用被复制 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun; // 把重用部分整体放在扩容后的hash桶中 // 复制不能重用的部分,并把它们插入到rehash后的所在HashEntry链的头部 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(p.key, p.hash, n, p.value); } // 这里也可以看出,重用部分rehash后相对顺序不变,并且还是在rehash后的链表的尾部 // 不能重用那些节点在rehash后,再一个个重头添加到链表的头部,如果还在一条链表上面,那么不能重用节点的相对顺序会颠倒 // 所以ConcurrentHashMap的迭代顺序会变化,本身它也不保证迭代顺序不变 } } } table = newTable; } // 因为1.6的HashEntry的next指针是final的,所以比普通的链表remove要复杂些,只有被删除节点的后面可以被重用,前面的都要再重新insert一次 V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay in list, but all preceding ones need to be cloned. // 因为next指针是final的,所以删除不能用简单的链表删除,需要把前面的节点都重新复制再插入一次,后面的节点可以重用 // 删除后,后面的可以重用的那部分顺序不变且还是放在最后,前面的被复制的那部分顺序颠倒地放在前面 // 后面Map.remove那里有个remove的示意图 ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } } void clear() { if (count != 0) { lock(); try { HashEntry<K,V>[] tab = table; for (int i = 0; i < tab.length ; i++) tab[i] = null; ++modCount; count = 0; // write-volatile } finally { unlock(); } } }}下面是jdk1.6 ConcurrentHashMap Segment resize中重用节点的示意图。四、构造方法
这里就很简单了,看下代码和注释就行。public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 寻找适合的2^n,以及n,默认构造时是16和4 int sshift = 0; int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 定位Segment的index时hash值的移位,为32 - n,默认构造时是28 segmentMask = ssize - 1; // 用于 & 运算定位Segment的index,默认构造时是0x0f this.segments = Segment.newArray(ssize); // 实际的concurrentLevel就是Segment数组的长度 if (initialCapacity > MAXIMUM_CAPACITY) // 虽然整个Map的capacity可以大于MAXIMUM_CAPACITY,但是初始化时为了满足2^n的要求,最大也只允许这么多 initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = 1; while (cap < c) cap <<= 1; // 上面这段,求每个Segment代表的实际的hash表的table数组的长度(专业点叫hash桶的数量),这个值是cap,默认构造时是1 // 参数initailCapacity是一个容量参考值,用来计算每个Segment在初始化时有多少个hash桶(这个桶的数量要符合2^n) // 整个ConcurrentHashMap的容量capacity是所有Segment的容量和,在初始化时每个Segment都一样, // 但是每个Segment都可以单独扩容,所以构造完成后这个capacity值就没啥用了 for (int i = 0; i < this.segments.length; ++i) this.segments[i] = new Segment< K,V > (cap, loadFactor);}public ConcurrentHashMap(int initialCapacity, float loadFactor) { this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(int initialCapacity) { this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);}public ConcurrentHashMap(Map<? extends K, ? extends V> m) { this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY), DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); putAll(m);}五、一些基础方法
有了HashMap作为基础,这里就简单在注释上说下。// Wang/Jenkins hash的变种算法// 因为算出来的hash的最高的几位用于定位段,最低的几位用于段内定位hash桶,所以高位低位都需要扰动,要两个方向移位// HashMap中算出来的hash值只有最低的几位会被用到,所以只对需要对低位进行操作,一个方向移位就行 ,不需要下面这种更复杂的hash函数private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-Word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16);}// 定位到某个具体的Segment,使用hash值的高位,算法和HashMap的indexFor一样// ConcurrentHashMap中没有抽象出indexFor了,实际实现还是一样的,看Segment的代码final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask];}这里可以看到,ConcurrentHashMap的有两种hash散列定位,分别使用的是hash值中不同的地方,所以理论上ConcurrentHashMap的冲突要比同参数的HashMap要少。因为ConcurrentHashMap每个Segment都可以单独扩容,扩容方法移动到Segment里面去了,Segment.rehash就是扩容方法。六、常用方法1、读方法比较简单,核心思路是代理到对应的Segment去做相应的操作,要批量读取所有Sement的方法特殊处理下。// ConcurrentHashMap.get是通过hash值定位到Segment,有这个Segment的get来完成的public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);}// 同getpublic boolean containsKey(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).containsKey(key, hash);}// containsValue和size虽然是读操作,但是会批量读取到所有Segment,所以特殊处理// 先不加锁尝试两次以获得比较近似的结果,如果contains就直接返回(因为有一个存在就是存在,不存在才需要遍历全部),不contains才继续,如果发现modCount被其他线程修改,就全部加锁再执行// 不加锁读两次时,可能会碰见写操作的中间状态,也可能在循环到后面时有线程修改了前面,所以这个方法不是100%准确的// 设计成这样主要是为了提高效率,很多业务还是可以接受这种误差,需要更强一致性的时候,可以自己写个方法// 上面Segment的分析中指出了,put相同的key、replace方法不会修改modCount,但是会改变value,这一点使得后面检测modCount是否改变可能成为无用功,让containsValue方法的准确性降低了,1.7进行了修复public boolean containsValue(Object value) { if (value == null) throw new NullPointerException(); final Segment<K,V>[] segments = this.segments; int[] mc = new int[segments.length]; // 先不加锁执行RETRIES_BEFORE_LOCK = 2次 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { int sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; // 这个c没哪里用,意义不明 mcsum += mc[i] = segments[i].modCount; // 就是 mc[i] = segments[i].count; mssum += mc[i],临时保存一份modCount if (segments[i].containsValue(value)) // 碰见contains直接return return true; } boolean cleanSweep = true; // mcsum是modCount的和,为0可以认为遍历开始时没有任何put完成过任何HashEntry,即方法开始执行时不包含任何HashEntry,可以认为(近似认为,几率比较大)此时也不包含 if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { int c = segments[i].count; if (mc[i] != segments[i].modCount) { // modCount改变,说明有其他线程修改了Segment的结构,退出循环。会有replace的问题,前面说了 cleanSweep = false; break; } } } if (cleanSweep) return false; } // 如果连续两次都碰见modCount改变的情况,这时候一次性对全部Segment加锁,最大程度保证遍历时的一致性 // 因为是全部加锁后再遍历,遍历开始后没有线程可以修改任何Segment的结构,可以保证当前线程得到的是准确值 for (int i = 0; i < segments.length; ++i) segments[i].lock(); boolean found = false; try { for (int i = 0; i < segments.length; ++i) { if (segments[i].containsValue(value)) { found = true; break; } } } finally { for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } return found;}// 不是Map接口的方法,为了兼容Hashtable,等价于containsValuepublic boolean contains(Object value) { return containsValue( value);}// 基本同containValue,但是只执行一次且不会加锁public boolean isEmpty() { final Segment<K,V>[] segments = this.segments; int[] mc = new int[segments.length]; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0) return false; else mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { if (segments[i].count != 0 || mc[i] != segments[i].modCount) return false; } } return true;}// 跟containsValue差不多,但是size不会受put相同的key、replace方法的影响// 注意最后一个int溢出处理,因为HashMap以及ConcurrentHashMap是个特殊的集合类,我们通常所说的容量是hash桶的数目,这并不是实际容量// 因为使用链表解决hash冲突的原因,实际的可以容纳得更多,可能会远远超多Integer.MAX_VALUE,这这时返回值就是个错误的值,但还是尽量返回了一个“比较有用”的值。// 这纯粹是历史原因造成的坑,返回个int,没考虑实际情况,1.8的新增了一个mappingCount方法,返回long型准确数字public int size() { final Segment<K,V>[] segments = this.segments; long sum = 0; long check = 0; int[] mc = new int[segments.length]; // 不加锁执行两次,如果两次数据不一样,或者碰到modCount++被修改了,就全部加锁在执行一次 for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0; int mcsum = 0; for (int i = 0; i < segments.length; ++i) { sum += segments[i].count; mcsum += mc[i] = segments[i].modCount; } if (mcsum != 0) { for (int i = 0; i < segments.length; ++i) { check += segments[i].count; if (mc[i] != segments[i].modCount) { check = -1; // force retry break; } } } if (check == sum) break; } if (check != sum) { sum = 0; for (int i = 0; i < segments.length; ++i) segments[i].lock(); for (int i = 0; i < segments.length; ++i) sum += segments[i].count; for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) // int溢出处理,因此返回值可能会是错误的。 // 并且因为兼容性的原因,这个还无法解决,只能新增一个方法,1.8的ConcurrentHashMap就是新增了一个返回long型的方法 return Integer.MAX_VALUE; else return (int)sum;}2、写方法基本思路还是通过hash定位到Segment,由Segment完成。注意下putAll和clear这两个批量操作。// ConcurrentHashMap.put是通过hash值定位到Segment,有这个Segment的put来完成的// ConcurrentHashMap的实现中不允许null key和null valuepublic V put(K key, V value) { if (value == null) throw new NullPointerException(); int hash = hash(key.hashCode()); // 这里 null key 会抛出NPE return segmentFor(hash).put(key, hash, value, false);}// 下面几个基本思路同put,都是代理给相应的Segment的对应方法进行操作public V putIfAbsent(K key, V value);public V remove(Object key);public boolean remove(Object key, Object value);public boolean replace(K key, V oldValue, V newValue);public V replace(K key, V value);// putAll和clear都是循环操作,没有全局加锁,在执行期间还是可以执行完成其他的写操作的,事务性比较差的方法,设计成不用全局锁是为了提高效率public void putAll(Map<? extends K, ? extends V> m) { for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) put(e.getKey(), e.getValue());}public void clear() { for (int i = 0; i < segments.length; ++i) segments[i].clear();}put的简单个示意图如下(圆形中的数字是乱写的,非真实数据)。remove的简单示意图如下(圆形中的数字是乱写的,非真实数据)。七、视图以及迭代器
视图指的的 keySet()、values()、entrySet()、keys()、elements()这几个方法,三个视图属性实质上也是用这几个方法。视图的操作都是基本上代理给map本身来进行操作,因此视图中进行写操作(一般是remove,没实现add),也会造成map发生更改。特别注意下values的类型就是Collection,因此操作都很原始,没有任何子类的特性可以使用,平时尽量也不要用这个。迭代器不使用没有全局加锁,没有使用modCount机制,允许迭代时有其他线程并发进行写操作,因此ConcurrentHashMap的迭代器是弱一致的,也不是快速失败的。它永远不会抛出ConcurrentModificationException,也就是迭代过程允许Map被修改,这些修改可以(但是不保证绝对可以)实时地反映在当前的迭代结果中。跟HashMap的迭代器有些区别,CoincurrentHashMap的迭代器是”逆序“的,也就是在segments、Segment.table中都是从大下标开始迭代。这个应该是为了降低迭代与写操作的冲突,但是感觉作用并不明显啊。1.8的ConcurrentHashMap中迭代和扩容就是逆序的,但是1.8的这一点很明显能降低迭代操作与写操作冲突。1.6的这里感觉最多是为了clear()、putAll之类的冲突减少。另外,由于实际的Map.Entry,也就是内部类HashEntry中没有public get/set方法,返回的EntrySet无法被使用,这里多了一个简单的封装类WriteThroughEntry entends Abstract.SimpleEntry,用于封装外部在迭代操作对Entry的写操作。HashMap中的HashMap.Entry本身就有public get/set方法,所以HashMap的EntrySet直接返回HashMap.Entry类型的Set就行,不必再封装一次。final class WriteThroughEntry extends AbstractMap.SimpleEntry<K,V>{ WriteThroughEntry(K k, V v) { super(k,v); } public V setValue(V value) { if (value == null) throw new NullPointerException(); V v = super.setValue(value); ConcurrentHashMap.this.put(getKey(), value); return v; }}特别的一点,这里会进行一次put操作,保证value所在的hashEntry在当时是一定存在的。这里把setValue 看成是 put相同的key,如果此时value不存在,也会添加一个进去。另外,由于读操作不加锁,因此value返回的也是一个过期的值,ConcurrentHashMap本身也保证不了更多。1.6的代码跟基本1.5的一样,都是比较古老的代码了。之后的1.7/1.8都进行不少优化,实现上也变化了很多,变得更加复杂了。学习1.6的很重要的一点,就是它把ConcurrentHashMap的基本的原理、分段锁思想、各个方法的基本实现,讲得足够清楚、简单,这一点很重要。现在各种编程语言的使用成本在降低,但是相对的学习成本变高了,为了更加强大、实用,许多标准api都进行了各种细节上的优化,代码不再简单(比如1.6的这个类的源码总共1282行,到了1.8变成了6313行,有大概40%的是函数式、Stream相关的,也就是基本的功能用了大约3倍的代码量来实现),想弄清楚一个基本的思路都得看半天。学习最好是要从简单、基础的东西开始,1.6的源码也是集合类中的基础,看懂了,理解后续版本就会简单了。下一篇说下1.7的ConcurrentHashMap,它相对1.6来说,改动不算特别大。以上内容,如有问题,烦请指出,谢谢!
新闻热点
疑难解答