传统的HashMap
不是线程安全的, 所以多线程进行put()
和get()
操作的时候可能会引发问题. 还有一个叫做HashTable
的数据结构, 它使用的是synchronized
来保证线程安全, 但是效率很低, 因为不能并发读.
ConcurrentHashMap
采用的是锁分段技术, 将数据分成一段一段存储, 然后给每一段数据配一把锁, 当一个线程对某段数据进行写操作的时候, 其他段的数据也能被其他线程安全访问(读写).
ConcurrentHashMap
包含一个Segment
数组;一个Segment
里面包含一个HashEntry
链表.Segment
是一种可重入锁, 在ConcurrentHashMap
中扮演锁的角色. HashEntry
则是实际存储键值对数据的地方. 当对HashEntry
的数据进行修改时, 必须首先获得与它对应的Segment
锁.
初始化segments数组
if (concurrencyLevel > MAX_SEGMENTS) { // 如果自定义的concurrencyLevel超过允许最大的并发数 concurrencyLevel = MAX_SEGMENTS; // 设定为允许的最大的并发数}int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) { ++sshift; // 为了能够使用按位与的散列算法来定位segments数组的索引, 必须保证segments数组的长度是2的N次方 // 例如concurrencyLevel是14, 15或者16, 那么锁的个数也总是16 ssize <<= 1;}segmentShift = 32 -sshift; // 用于定位参与散列运算的位数长度segmentMask = ssize - 1; // 散列运算的掩码, 各个二进制位都是1this.segments = Segment.newArray(ssize); // 创建Segment数组初始化单个segment
if (initialCapacity > MAXIMUM_CAPACITY) { // 如果自定义的Capacity超过了单个segment允许的最大的Capacity initialCapacity = MAXIMUM_CAPACITY; // 设定为单个segment允许的最大的Capacity}int c = initialCapacity / ssize; // 计算单个segment下的HashEntry数组的长度基值if (c * ssize < initialCapacity) { // 确保数组能够容纳所有键值对 ++c;}int cap = 1;while (cap < c) { cap <<= 1; // 将HashEntry的长度向2的N次方对齐}for (int i = 0; i < this.segments.length; ++i) { // 初始化各个segment元素, 实际上是创建cap长度的HashEntry数组, loadFactor指定了HashEntry数组总体上允许的最大负载百分比 this.segments[i] = new Segment<K, V>(cap, loadFactor)}定位Segment 在插入和获取元素的时候, 必须先通过散列算法定位到Segment
, ConcurrentHashMap
首先使用Wang/Jenkins hash的变种算法对元素的hashCode
进行一次再散列.
变种算法的代码如下:
// 本段代码无需理解, 只需要知道是一个对要插入ConcurrentHashMap元素的原始hash值的再散列(hash)的过程即可PRivate static int hash(int h) { h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16);}/* 用再散列后的元素的的hash值来定位segment数组索引 */final Segment<K, V> segmentFor(int hash) { // hash 是再散列后的元素的hash值 // 使用hash值的高四位进行Segment数组索引的计算 // 这里假设segmentMask为1111, segmentShift为28, 也就是Segment数组的长度为16 return segments[(hash >>> segmetnShift) & segmentMask]; }之所以进行再散列, 目的是减少散列冲突, 使元素能够均匀分布在不同的Segment
上. 如若负载不均, 那么分段锁就会失去意义.
主要介绍三种操作: get
, put
和size
操作
步骤如下:
对元素的原始hash值先进行再散列;通过这个散列值定位到segment
数组索引;通过散列算法定位到具体的HashEntry
数组中的元素;public V get(Object key) { int hash = hash(key.hashCode()); // 再散列 return segmentFor(hash).get(key, hash); // 点调用segmentFor返回具体的segment, 然后通过hash值获取元素}需要注意的是, 定位Segment
参与计算是再散列后的hash值的高位, 而定位HashEntry
参与计算的是再散列后的has值的低位, 这是为了避免两次计算使用的hash值相同, 导致元素在Segment
中散开了, 但是没有在HashEntry
中散开.
get的高效之处在于整个get过程不需要加锁, 因为value
和count
都是volatile
类型的值. 所以可以多线程并发读, 但是写入仍然是独占的(对每个数据段独占).
put操作是需要加锁的. 需要完成以下操作:
通过再散列的hash值定位到需要插入的Segment
;判断是否需要对Segment
的HashEntry
数组进行扩容(通过判断是否超过 负载因子*总容量 来判断);定位添加元素的位置;在扩容的时候, 首先会在当前需要扩容的Segment
中创建一个容量是之前两倍的数组, 然后将原数组里的元素进行再散列后插入到新的数组里. 为了高效, ConcurrentHashMap
不会对整个容器进行扩容, 仅会对某个segment进行扩容.
size
操作获取count
的累加值的具体步骤是:
segment
数组的来累加count
;如果统计的过程中, 容器的count
出现了变化, 则再采用加锁的方式统计Segment
数组中所有元素的个数.上述的判断容器的count
是否发生变化是通过一个modCount
变量来统计的, 任何会造成count变化的操作, 比如put
, remove
, clean
方法操作元素都会将modCount
加1, 所以在进行size操作前后比较modCount
元素是否发生变化, 就可以得知容器的大小是否发生变化.
是一个线程安全的基于链接节点的非阻塞FIFO无界队列.
对于阻塞算法, 有两种实现思路:
使用一个锁, 也就是入队出队用同一个锁使用两个锁, 入队出队用两个锁;对于非阻塞算法, 可以借助循环CAS的方式来实现(注: 实际上是通过加轻量级锁的方式, 循环CAS会消耗较多的时间片, 只有确认同步块运行速度较快的情况才能使用轻量级锁).
以下是ConcurrentLinkedQueue
的结构
阻塞队列是支持阻塞的插入和阻塞的移除的队列.
阻塞的插入 意思是当队列满时, 队列会阻塞插入元素的线程, 直到队列不满; 阻塞的移除 意思是在队列为空时, 获取元素的线程会等待队列变为非空;
常见的应用场景是生产者和消费者的场景, 作为二者获取元素的容器.
ArrayBlockingQueue
一个由数组结构组成的有界阻塞队列LinkedBlockingQueue
一个由链表结构组成的有界(初始化和最大值均为Integer.MAX_VALUE
大小, 可以看做无界队列)阻塞队列PriorityBlockingQueue
一个支持优先级排序的无界阻塞队列DelayQueue
一个使用优先级队列实现的无界阻塞队列, 在创建元素的时候可以指定多久才能从队列中获取当前元素, 只有在延迟期满才能从队列中提取元素.SynchronousQueue
一个不存储元素的阻塞队列, 也就是容量是0, 直接将生产者线程的数据传递给消费者. 每一个put
操作必须等待一个take
操作, 否则会被阻塞LinkedTransferQueue
一个由链表结构组成的无界阻塞队列, 如果有消费者在等待数据, 就直接将生产者的数据传递给消费者, 如果没有消费者在等待获取数据, 就将数据放置在队列尾部.LinkedBlockingDeque
一个由链表结构组成的双向阻塞队列. 由于两端都可以入队和出队, 比前述队列减少了一半的数据竞争, 可以用在”工作窃取”模式中未完待续
新闻热点
疑难解答