Loading
Loading...

深入浅出 JAVA 之集合 - ConcurrentHashMap

系统环境:

  • JDK 版本: OpenJDK 8

深入浅出 Java 集合系列文章

一、ConcurrentHashMap 概述

ConcurrentHashMap 类似于 HashMap,都是实现了 Map 接口,用于存储键值对的数据结构。然而,两者之间存在关键区别,比如 ConcurrentHashMap 在提供与 HashMap 相似的功能的同时,更注重线程安全性和并发处理能力。这意味着在多线程环境下,ConcurrentHashMap 能够确保更高的稳定性和一致性,同时也提供了额外的并发操作方法和迭代器的一致性保证。

由于其设计初衷是为了支持高并发访问,ConcurrentHashMap 引入了特定的同步机制,这使得它在某些场景下的性能略逊于 HashMap。不过,这种牺牲换来的是在并发环境中的高效表现和数据安全性。

因此,在选择使用 ConcurrentHashMap 还是 HashMap 时,应当基于应用的具体需求来决定。如果应用程序需要在多线程环境中高效且安全地操作,则应优先考虑 ConcurrentHashMap;反之,若是在单线程环境下或者对同步要求不高,则 HashMap 可能是更为合适的选择。

二、ConcurrentHashMap 底层结构

2.1 整体结构

在 JDK 8 中,ConcurrentHashMap 底层结构和 HashMap 相同,都是采用了 数组 (table)链表红黑树 组成。其结构如下图所示:

2.2 数组 table

ConcurrentHashMap 的核心是一个名为 table 的数组,也常被称为”桶数组”。这个数组中的每个元素都是一个”桶”,用于存储键值对数据。当发生哈希冲突时,这些桶会进一步扩展为链表或红黑树结构。

2.3 链表和红黑树

  • 链表: 当一个桶中存储的节点数量较少时,它会以链表的形式存在。链表由一系列 Node 类型的节点组成,每个节点包含键值对的 key、value 和 hash 信息,并通过 next 属性链接到下一个节点。
  • 红黑树: 当链表长度达到一定阈值 (默认为 8) 且数组长度大于等于 64 时,链表会转换为红黑树结构。红黑树使用 TreeNode 类型的节点,同样存储 key、value 和 hash 信息,并通过 left 和 right 属性进行关联。红黑树的引入是为了提高查找效率,特别是在链表较长的情况下。

2.4 动态调整机制

为了在不同场景下兼顾性能与内存开销,ConcurrentHashMap 引入了一套动态结构转换机制。这个机制会根据链表长度和数组容量的变化,在链表和红黑树之间进行智能切换。

主要规则如下:

  • 链表转红黑树: 当某个桶(bucket)中的链表长度达到 8,并且当前 table 数组的长度大于等于 64 时,该链表会被转换为红黑树结构。这样做的目的是为了提高在高冲突的场景中的数据查找的效率。红黑树的查找时间复杂度是 O(log n),优于链表的 O(n)
  • 红黑树还原为链表: 如果某个桶中的节点数量减少到 6 个或更少,即使它原本是红黑树结构,也会被退化回链表形式。这种 “滞后还原” 策略可以避免频繁地在链表和红黑树之间切换,从而减少不必要的性能损耗。

这种机制确保了在不同情况下都能选择最合适的结构,从而尽可能地提升访问效率。

为什么设定为 8 和 6?

  • ① 基于泊松分布统计得出,在默认负载因子 (0.75) 下,链表长度超过 8 的概率非常低,只有在极端哈希碰撞情况下才会触发树化。
  • ② 避免在节点数波动时反复切换结构,这样可以保证数据查询的稳定性。

为什么要求数组长度 ≥ 64?

  • ① 在数组容量较小时,哈希冲突更容易集中,此时盲目树化反而会浪费资源。
  • ② 当数组较大时,说明已经进入高负载状态,才值得引入红黑树来优化访问效率。

三、ConcurrentHashMap 属性

3.1 ConcurrentHashMap 中的默认常量值

/**
* 集合最大容量限制.
*/
private static final int MAXIMUM_CAPACITY = 1 << 30;
/**
* 默认初始容量。该值必须是 2 的 n 次幂,默认值为 16
*/
private static final int DEFAULT_CAPACITY = 16;
/**
* 数组的最大长度
*/
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/**
* 默认的并行度,为了兼容 JDK7 版本中的 Segment
*/
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/**
* 扩容因子,当元素个数达到0.75*数组长度的时候触发扩容,也是jdk1.7使用,jdk1.8直接使用位操作计算出扩容阈值
*/
private static final float LOAD_FACTOR = 0.75f;
/**
* 桶中-链表转换为红黑树的阈值
*/
static final int TREEIFY_THRESHOLD = 8;
/**
* 桶中-红黑树还原为链表的阈值
*/
static final int UNTREEIFY_THRESHOLD = 6;
/**
* 链表转化为红黑树时对集合最小容量的限制值,如果集合容量小于该值则链表不会转换为红黑树
*/
static final int MIN_TREEIFY_CAPACITY = 64;
/**
* 并发扩容时每个线程最少处理16个桶
*/
private static final int MIN_TRANSFER_STRIDE = 16;
/**
* 这三个常量控制并发扩容的结束,其实就是开始扩容的时候设置sizeCtl成一个负数,每次加入一个线程去扩容就sizeCtl++,
* 每次一个线程扩容结束就sizeCtl–-,那么当所有线程扩容结束后sizeCtl就等于最开始设置的那个负数
*/
private static int RESIZE_STAMP_BITS = 16;
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/**
* 当node节点的hash值 为-1时,表示当前节点为FWD节点
*/
static final int MOVED = -1;
/**
* 当node节点的hash值 为-2时, 表示当前节点已经树化,且当前节点为TreeBin对象,TreeBin对象代理操作红黑树
*/
static final int TREEBIN = -2;
/**
* 标识为ReservationNode节点(hash=-3),用于map.compute操作
*/
static final int RESERVED = -3;
/**
* 可以将一个负数通过位与运算后得到正数,但是不是取绝对值
*/
static final int HASH_BITS = 0x7fffffff;

3.2 ConcurrentHashMap 中的成员变量

/**
* 系统CPU的数量
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* .散列表,长度一定是2次方数
*/
transient volatile Node<K,V>[] table;
/**
* 扩容过程中,会将扩容中的新table 赋值给nextTable 保持引用,扩容结束之后,这里会被设置为null
*/
private transient volatile Node<K,V>[] nextTable;
/**
* 未发生竞争时,或者当前LongAdder 处于加锁状态时,增量累加到baseCount
*/
private transient volatile long baseCount;
/**
* sizeCtl < 0
* 1. -1 表示当前table正在初始化(有线程在创建table数组),当前线程需要自旋等待
* 2.表达当前map正在进行扩容 高16位表示:扩容的标识戳 低16位: 表示 1+N Thread 当前参与并发扩容的线程数量
*
* sizeCtl = 0;
* 表示创建map时,使用DEFALUT_CAPACITY 为大小
*
* sizeCtl > 0;
* 1.如果table未初始化,表示初始化大小;
* 2.如果table已经初始化了,表示下次扩容时触发条件(阈值)
*/
private transient volatile int sizeCtl;
/**
* 扩容过程中,记录当前进度。所有线程都需要从transferIndex中分配区间任务,去执行自己的任务
*/
private transient volatile int transferIndex;
/**
* LongAdder中的 cellsBuzy 0表示当前LongAdder对象无锁状态,1表示当前LongAdder对象加锁状态
*/
private transient volatile int cellsBusy;
/**
* LongAdder 中的cells数组,当baseCount发生竞争后,会创建cells数组.线程会通过计算hash值取到自己的cell,将增量累加到指定cell中
*/
private transient volatile CounterCell[] counterCells;

四、ConcurrentHashMap 关键方法

4.1 构造方法 ConcurrentHashMap

(1) 无参构造方法

/**
* 无参构造方法
*/
public ConcurrentHashMap() {
}

(2) 支持配置初始化容量的构造方法

/**
* 支持配置初始化容量的构造方法
*
* @param initialCapacity 初始化容量
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

(3) 支持配置初始化容量与负载因子的构造方法

支持配置初始化容量与负载因子的构造方法,可以配置 初始化容量负载因子,不过在对参数进行配置前需要对参数进行一系列校验。

HashMap(int initialCapacity, float loadFactor)

/**
* 支持配置初始化容量与负载因子的构造方法
*
* @param initialCapacity 初始化容量
* @param loadFactor 负载因子
*/
public HashMap(int initialCapacity, float loadFactor) {
// 如果初始化容量为负数,则抛出 IllegalArgumentException 异常
if (initialCapacity < 0) {
throw new IllegalArgumentException("Illegal initial capacity: " + initialCapacity);
}
// 如果初始化容量大于 MAXIMUM_CAPACITY 值,则就设置初始化容量为 MAXIMUM_CAPACITY
if (initialCapacity > MAXIMUM_CAPACITY) {
initialCapacity = MAXIMUM_CAPACITY;
}
// 如果负载因子为负数,或者非数字 (NaN),则抛出 IllegalArgumentException 异常
if (loadFactor <= 0 || Float.isNaN(loadFactor)) {
throw new IllegalArgumentException("Illegal load factor: " + loadFactor);
}
// 设置【负载因子】和【扩容阈值】变量
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}

tableSizeFor(int c)

/**
* 返回一个大于等于且最接近 cap 的 2 的幂次方:
* - 比如传入初始化容量 cap = 9,则返回 2^4,即 16
* - 比如传入初始化容量 cap = 25,则返回 2^5,即 32
*
* @param c 传入构造方法中初始化容量
* @return 传入初始化容量 cap 的 2 的幂次方
*/
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}

(4) 支持根据已有 Map 集合创建新集合的构造方法

ConcurrentHashMap(Map<? extends K, ? extends V> m)

/**
* 支持根据已有 Map 集合创建新集合的构造方法
*
* @param m 已有的 Map 集合
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}

putAll(Map<? extends K, ? extends V> m)

/**
* 将插入的 Map 集合中的数据转移到当前集合中
*
* @param m 需要将数据插入到新集合中的 Map 集合
*/
public void putAll(Map<? extends K, ? extends V> m) {
tryPresize(m.size());
for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) {
putVal(e.getKey(), e.getValue(), false);
}
}

4.2 初始化数组方法 initTable()

initTable()

/**
* 初始化数组
*
* @return 初始后的数组
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; // 记录待初始的数组的变量
int sc; // 记录 sizeCtl 值的变量
// 循环执行初始化操作
while ((tab = table) == null || tab.length == 0) {
// 设置变量 sc = sizeCtl,然后判断 sc 变量大小是否小于 0:
// - 如果 sc = -1,则说明有其他线程在执行初始化,为了保证线程安全,使当前线程先让出 CPU 时间片;
// - 如果 sc > 0,则说明没有其它线程执行初始化,当前线程可以执行初始化操作;
if ((sc = sizeCtl) < 0) {
Thread.yield();
}
// 使用 CAS 方法,将当前对象中的 SIZECTL 属性值与 sc 进行比较:
// - 如果值相等,则将 SIZECTL 的值更新为 -1,然后执行 if 中的逻辑;
// - 如果值不相等,不执行 if 中的逻辑;
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// 设置变量 tab = table,然后判断 table 是否为空,是则执行初始化操作。
if ((tab = table) == null || tab.length == 0) {
// 如果初始化容量了,则设置为 sc,否则设置为默认容量 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
// 初始化 table 数组
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
// 计算下次扩容的大小,实际就是当前容量的 0.75 倍,这里使用了右移来计算
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl 为 sc,如果默认是 16 的话,则 sc 为 16*0.75=12
sizeCtl = sc;
}
break;
}
}
// 返回初始后的表
return tab;
}

tabAt(Node<K,V>[] tab, int i)

/**
* 获取 tab 数组中下标为 i 的元素。
*
* 这里之所以使用 getObjectVolatile() 方法获取数组中指定位置的元素,这是因为该方法属于 volatile 类型方法,
* 可以保证数组中数据的可见性,其它线程对当前数组进行修改后,当前线程可以马上观察到最新的改动。
*
* @param tab 待操作的数组
* @param i 数组的下标位置
* @return 数组指定下标位置的元素
*/
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
// ABASE 表示数组地址,(long)i << ASHIFT 表示偏移量
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v)

/**
* 将 tab 数组中下标为 i 的元素与预期值 c 进行比较,如果相同则将 i 位置元素更新为 v,否则不更新。
*
* 使用 compareAndSwapObject 方法进行更新,可以保证多线程环境下的操作的线程安全。
*
* @param tab 待操作的数组
* @param i 数组的下标位置
* @param c 预期值
* @param v 如果符合预期进行更新的值
* @return 如果 tab 数组中下标为 i 的元素与预期值 c 相同,则返回 ture,否则返回 false
*/
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

4.3 插入方法 putVal(K key, V value)

put(K key, V value)

/**
* 将指定的键值对插入到 ConcurrentHashMap 集合中。 (不允许插入null键和null值)
*
* @param key 待插入的键
* @param value 待插入的值
* @return 返回与 Key 关联的旧值,分两种情况:
* - 如果集合中不存在对应的 key,即插入一个新的 key,则返回 null;
* - 如果集合中已经存在一个相同的 key,则新 value 覆盖旧 value,然后返回旧 value;
*/
public V put(K key, V value) {
return putVal(key, value, false);
}

putVal(K key, V value, boolean onlyIfAbsent)

/**
* 实现 Map.put() 和相关方法。
*
* @param key 待插入的键
* @param value 待插入的值
* @param onlyIfAbsent 如果为 true 则不更改现有的值
* @return 如果插入的 Key 已经存在,则返回已存在 Key 的节点值,否则返回 null
*/
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 如果插入的 key 或者 value 为 null,则抛出空指针异常
if (key == null || value == null) {
throw new NullPointerException();
}
// 计算 hash 值
int hash = spread(key.hashCode());
// 创建变量 binCount,使用改变量记录,当数组下标位置关联的为链表或红黑树时:
// - 如果是链表,则设置 binCount 值为链表的节点数量,后续用于判断是否需要将链表转换为红黑树;
// - 如果是红黑树,则设置 binCount 值为固定值 2,表示在红黑树中插入或者替换了值,用于后续判断是否需要返回旧值;
int binCount = 0;
// 死循环执行循环代码块中的逻辑 (自旋操作)
for (Node<K,V>[] tab = table;;) {
// f: 记录待插入 key 经计算找到在 table 数组中的下标位置节点的变量
// n: 记录 table 数组长度的变量
// i: 记录待插入 key 经计算找到在 table 数组中的下标位置的变量
// fh: 记录 table 数组中的下标位置节点 hash 值的变量
Node<K,V> f; int n, i, fh;
// --- 1、判断 table 数组是否初始化 ---
// 判断 table 数组是否为空,如果是则调用 initTable() 方法进行初始化操作
if (tab == null || (n = tab.length) == 0) {
tab = initTable();
}
// --- 2、判断 table 数组下标位置是否哈希冲突 ---
// 调用 tabAt() 方法获取插入 key 在 table 数组中下标位置的节点,并赋给变量 f,然后判断节点是否为 null:
// - 如果为 null,则说明当前数组下标位置为空,则执行 else if 条件中的代码逻辑;
// - 如果不为 null,则说明当前数组下标位置已经存在数据,发生了哈希冲突,不执行 else if 条件中的代码逻辑;
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
// 如果数组下标位置的节点为 null,则使用 CAS 方法,判断当前数组下标位置是否为 null:
// - 如果当前数组下标位置为 null,则将新创建的键值对 Node 节点加入到该位置;
// - 如果当前数组下标位置不为 null,则不进行任何操作;
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) {
break; // no lock when adding to empty bin
}
}
// --- 3、判断当前集合是否正在执行扩容 ---
// 获取数组下标位置节点的 hash 值并赋给变量 fh,然后判断 hash 值是否为 MOVED(-1):
// - 如果 hash 值为 -1,则说明有其它线程正在对当前集合进行扩容,当前桶位置的节点为 ForwardingNode 转移节点,
// 这时当前线程就会执行 helpTransfer() 方法协助其它线程进行扩容操作,然后将插入的数据加入扩容后的新数组中;
// - 如果 hash 值不为 -1,则说明当前结合没有进行扩容,不执行 if 中的代码逻辑;
else if ((fh = f.hash) == MOVED) {
tab = helpTransfer(tab, f);
}
// --- 4、将插入的键值对加入到链表或者红黑树中 ---
else {
// 记录找到的节点的原先 value 值的变量
V oldVal = null;
// 使用 synchronized 进行加锁,并以当前数组下标位置节点 f 作为锁对象
synchronized (f) {
// 再次判断当前数组下标位置的节点是否为 f,只所以需要再次判断,这是因为在添加元素的时候,
// 有可能同时又有其它线程在移除元素,这种情况需要自旋操作来重新插入。
if (tabAt(tab, i) == f) {
// 判断数组下标位置节点的 hash 值是否大于等于 0:
// - 如果 hash >= 0,则说明当前数组下标位置关联的是链表;
// - 如果 hash < 0,则说明当前数组下标位置关联的是红黑树;
if (fh >= 0) {
// 遍历链表,寻找是否存在与插入 key 相同的节点,并且使 binCount 值 +1
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
// 记录遍历过程中当前节点的 key 的变量
K ek;
// 如果找到插入 key 的节点,则记录当前节点的值赋给变量 oldVal,然后将插入的新 value 替换掉旧值,
// 最后执行 break 指令终止继续遍历链表。
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent) {
e.val = value;
}
break;
}
// 记录前驱节点
Node<K,V> pred = e;
// 记录后继节点,并判断当前遍历到的位置是否到了链表末尾节点,如果是就创建新的节点加入到链表末尾
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 如果数组下标位置的节点 f 为红黑树节点,则将键值对数据加入到红黑树中
else if (f instanceof TreeBin) {
// 记录调用 putTreeVal() 方法的返回结果
Node<K,V> p;
// 设置 binCount 为 2
binCount = 2;
// 调用 putTreeVal() 方法将键值对数据加入到红黑树中,并且判断方法返回结果:
// - 如果方法返回结果为 null,则说明执行了添加操作,不用记录老值;
// - 如果方法返回结果不为 null,则说明执行了替换操作,需要记录老值赋给变量 oldVal,
// 然后将插入的新 value 替换掉旧值;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent) {
p.val = value;
}
}
}
}
}
// 如果 binCount 不为 0 则说明在链表或者红黑树中进行了插入或替换操作,则执行 if 中的代码逻辑
if (binCount != 0) {
// 判断变量 binCount 是否大于等于 TREEIFY_THRESHOLD(8),如果是则尝试将链表转换为红黑树,不过:
// - 如果当前集合大小 < 64 树化最小限制容量 MIN_TREEIFY_CAPACITY(64),则不会转换为红黑树,
// 只会对当前集合进行一次扩容操作;
// - 如果当前集合大小 ≥ 树化最小限制容量 IN_TREEIFY_CAPACITY(64),则就将链表转换为红黑树;
if (binCount >= TREEIFY_THRESHOLD) {
treeifyBin(tab, i);
}
// 判断老值是否不为空,是则返回老值
if (oldVal != null) {
return oldVal;
}
break;
}
}
}
// 执行到这里说明进行了添加操作,而没有进行替换操作,添加数据就使结合大小 +1,然后判断是否达到了库容阈值,是就进行扩容
addCount(1L, binCount);
return null;
}

4.4 删除方法 remove(Object key)

remove(Object key)

/**
* 从集合中删除指定 key 对应的键值对数据
*
* @param key 待删除的键值对 key
* @return 判断集合中存在指定的 key 对应的键值对数据,不存在则返回 null,存在则返回键值对的 value
*/
public V remove(Object key) {
return replaceNode(key, null, null);
}

replaceNode(Object key, V value, Object cv)

/**
* 替换/删除方法
* - 如果传入参数 value 为空,则执行删除操作;
* - 如果传入参数 value 不为空,则执行替换操作,将 cv 替换为 value;
*
* @param key 键值对 key
* @param value 键值对 value
* @param cv 待替换的旧值
* @return 如果执行删除操作则返回 null,如果执行替换操作则返回旧值
*/
final V replaceNode(Object key, V value, Object cv) {
int hash = spread(key.hashCode());
for ( Node<K,V>[] tab = table;;) {
Node<K,V> f; // 记录数组下标位置节点的变量
int n; // 记录数组 table 长度的变量
int i; // 记录数组下标位置索引的变量
int fh; // 记录数组下标位置哈希值的变量
// 如果当前数组 table 为空,或者数组下标位置为空,则表示数据已经被删除,直接结束循环
if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) {
break;
}
// 获取数组下标位置节点的 hash 值并赋给变量 fh,然后判断 hash 值是否为 MOVED(-1):
// - 如果 hash 值为 -1,则说明有其它线程正在对当前集合进行扩容,当前桶位置的节点为 ForwardingNode 转移节点,
// 这时当前线程就会执行 helpTransfer() 方法协助其它线程进行扩容操作,然后将插入的数据加入扩容后的新数组中;
// - 如果 hash 值不为 -1,则说明当前结合没有进行扩容,不执行 if 中的代码逻辑;
else if ((fh = f.hash) == MOVED) {
tab = helpTransfer(tab, f);
}
else {
V oldVal = null; // 记录找到的节点的原先 value 值的变量
boolean validated = false; // 记录是否处理过当前数组下标位置关联的链表或者红黑树
// 使用 synchronized 进行加锁,并以当前数组下标位置节点 f 作为锁对象
synchronized (f) {
// 再次判断当前数组下标位置的节点是否为 f,只所以需要再次判断,这是因为在添加元素的时候,
// 有可能同时又有其它线程在移除元素,这种情况需要自旋操作来重新操作。
if (tabAt(tab, i) == f) {
// 判断数组下标位置节点的 hash 值是否大于等于 0:
// - 如果 hash >= 0,则说明当前数组下标位置关联的是链表;
// - 如果 hash < 0,则说明当前数组下标位置关联的是红黑树;
if (fh >= 0) {
// 标记为 true,表示处理过
validated = true;
// 从链表头结点开始遍历链表
for (Node<K,V> e = f, pred = null;;) {
// 记录遍历过程中当前节点的 key 的变量
K ek;
// 如果找到指定 key 的节点,则记录当前节点的值赋给变量 oldVal,然后将插入的新 value 替换掉旧值,
// 最后执行 break 指令终止继续遍历链表。
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
// 记录当前节点的 value 值
V ev = e.val;
// 判断是否传入 cv:
// - 如果传入的旧值 cv 为 null 则执行下面替换/删除逻辑;
// - 如果当前节点 value 和传入旧值 cv 相同,则执行下面替换/删除逻辑;
if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
// 将旧值赋给变量 oldVal
oldVal = ev;
// 判断 value 是否为空:
// - 如果 value 不为空,则说明要执行替换操作;
// - 如果 value 为空,则说明要执行删除操作;
if (value != null) {
// 将旧值替换为 value
e.val = value;
}
// 判断将当前节点是否有前驱节点:
// - 如果有前驱节点,则设置 pred.next = e.next 来进行删除当前节点;
// - 如果没有前驱节点,则说明当前节点是头结点,设置 setTabAt(tab, i, e.next) 直接删除当前节点;
else if (pred != null) {
pred.next = e.next;
}
else {
setTabAt(tab, i, e.next);
}
}
break;
}
// 设置前驱节点为当前节点
pred = e;
// 将后继节点赋给变量 e,然后判断其是否为 null,如果是则说明已经遍历到尾节点,直接结束循环
if ((e = e.next) == null) {
break;
}
}
}
// 判断当前节点是否为树节点
else if (f instanceof TreeBin) {
// 标记为 true,表示处理过
validated = true;
// 将节点 Node 强转为 TreeBin
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> r; // 记录树 root 节点的变量
TreeNode<K,V> p; // 记录接收 findTreeNode 方法返回结果的变量
// 将 root 节点赋给变量 r,然后调用 findTreeNode 方法遍历树查找指定 key 节点:
// - 如果调用 findTreeNode() 方法返回找到的指定 key 的树节点,则执行下面逻辑;
// - 如果调用 findTreeNode() 方法返回 null,则没有找到,不执行下面逻辑;
if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
// 将找到的树节点赋给变量 pv
V pv = p.val;
// 判断是否传入 cv:
// - 如果传入的旧值 cv 为 null 则执行下面替换/删除逻辑;
// - 如果当前节点 value 和传入旧值 cv 相同,则执行下面替换/删除逻辑;
if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
// 将旧值赋给变量 oldVal
oldVal = pv;
// 判断 value 是否为空:
// - 如果 value 不为空,则说明要执行替换操作;
// - 如果 value 为空,则说明要执行删除操作;
if (value != null) {
p.val = value;
}
// 执行到这里说明 value 为空,执行删除操作,调用 t.removeTreeNode(p) 方法删除当前节点,
// 然后判断方法返回结果:
// - 如果方法返回 true,则表示删除节点后树的元素个数较少,则将红黑树转换为链表;;
// - 如果方法返回 false,则表示删除节点后树的元素个数依旧很多,则不将红黑树转换为链表;
else if (t.removeTreeNode(p)) {
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}
}
// 如果处理过,不管有没有找到元素都返回
if (validated) {
// 判断变量 oldVal 是否为空:
// - 如果不为 null 则说明存在旧值,返回结果将旧值返回;
// - 如果为 null 则说明不存在旧值,返回结果返回 null;
if (oldVal != null) {
// 判断 value 是否为空:
// - 如果 value 不为空,则说明要执行替换操作;
// - 如果 value 为空,则说明要执行删除操作,这里执行 addCount 方法减小集合大小;
if (value == null) {
addCount(-1L, -1);
}
return oldVal;
}
break;
}
}
}
return null;
}

4.5 协助扩容方法 helpTransfer(Node<K,V>[] tab, Node<K,V> f)

/**
* 协助扩容
*
* @param tab 待操作的数组
* @param f 需要查找的节点
* @return 如果扩容结束则返回当前 tabel 数组,否则返回新的 table 数组
*/
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; // 下一个节点
int sc; // 记录 sizeCtl 的变量
// 如果旧 table 不为空,并且要查找的节点是转移节点,则判断新 table 是否为空:
// - 如果新 table 为空,则说明扩容已经结束;
// - 如果新 table 不为空,则说明扩容正在进行,当前线程协助集合进行扩容操作;
if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
// 根据数组长度得到一个扩容戳
int rs = resizeStamp(tab.length);
// 循环判断是否扩容完成
while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
// 判断你是否符合下面条件,如果符合则说明扩容结束,则直接退出循环,判断条件如下:
// - sc >>> RESIZE_STAMP_SHIFT != rs 表示同一轮扩容中 sc 无符号右移比较高位和 rs 的值应该是相等的,
// 如果不相等说明扩容结束;
// - rs + 1 表示扩容结束;
// - rs + MAX_RESIZERS 表示扩容线程数达到最大扩容线程数;
// - transferIndex <= 0 表示所有的 Node 都已经分配了线程;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) {
break;
}
// 每增加一个线程来协助迁移就使 sizeCtl 值 +1, 然后当前线程协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
// 返回新的 table 数组
return nextTab;
}
// 返回当前 table 数组
return table;
}

4.6 链表转红黑树方法 treeifyBin(Node<K,V>[] tab, int index)

/**
* 链表转换为红黑树
*
* @param tab 数组 table
* @param index 数组索引位置
*/
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; // 记录数组指定索引位置节点的变量
int n, sc; // 记录数组长度
// 如果数组 table 不为空,则执行 if 中的代码逻辑
if (tab != null) {
// 判断当前数组长度是否达到了转换为红黑树要求的最小容量:
// - 如果当前数组长度 < 转换为红黑树要求的最小容量,则不转换为红黑树,只对 table 数组进行扩容;
// - 如果当前数组长度 ≥ 转换为红黑树要求的最小容量,则将数组指定索引位置关联的链表转换为红黑树;
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) {
tryPresize(n << 1);
}
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
// 使用同步锁,并且以数组索引下标位置的节点作为锁对象
synchronized (b) {
// 再次验证数组下标索引下标位置的节点是否和变量 b 记录的节点相同,是则执行链表转换为红黑树的逻辑
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null);
if ((p.prev = tl) == null) {
hd = p;
} else {
tl.next = p;
}
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

4.7 红黑树转链表方法 untreeify(Node<K,V> b)

/**
* 取消树化,将红黑树转换为链表
*
* @param b 待转换的红黑树节点
* @return 链表
*/
static <K,V> Node<K,V> untreeify(Node<K,V> b) {
Node<K,V> hd = null; // 记录链表头节点的变量
Node<K,V> tl = null; // 记录链表尾节点的变量
// 遍历 Node 节点
for (Node<K,V> q = b; q != null; q = q.next) {
// 根据已有的节点创建的数据,创建新节点
Node<K,V> p = new Node<K,V>(q.hash, q.key, q.val, null);
// 如果为遍历的第一次,则记录链表的头结点
if (tl == null) {
hd = p;
}
// 将上一个节点的下一个节点指向当前这个节点
else {
tl.next = p;
}
// 将 tl 节点指向 p
tl = p;
}
// 返回链表的头结点
return hd;
}

4.8 尝试扩容方法 tryPresize(int size)

/**
* 尝试调整当前集合数组 table 的大小,以容纳给定数量的元素。
*
* @param size 带插入集合的大小
*/
private final void tryPresize(int size) {
// 判断集合大小 size 是否大于等于 2^29:
// - 如果是,则设置变量 c 为集合大小为集合最大容量 MAXIMUM_CAPACITY;
// - 如果不是,则设置变量 c 为调用 tableSizeFor 方法的结果;
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1);
int sc; // 记录 sizeCtl 的变量
// 循环执行,并且判断 sizeCtl >= 0:
// - 如果 sizeCtl < 0 则表示其它线程正在对当前集合进行初始化或扩容;
// - 如果 sizeCtl ≥ 0 则表示集合已经扩容完成,则执行下面 while 代码块中的逻辑;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; // 记录数组 table 的变量
int n; // 记录数组 table 长度的变量
// 如果数组 table 当前为空,则说明数组 table 还没有初始化,先执行初始化操作
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
// 相当于执行 n*0.75,即计算扩容阈值
sc = n - (n >>> 2);
}
} finally {
// 设置 sizeCtl 值为 sc 值
sizeCtl = sc;
}
}
}
// 数组容量已经最大或足够了,不需要扩容
// size 大小 ≤ 扩容阈值,或者 ≥ 最大容量限制值时,不需要进行扩容
else if (c <= sc || n >= MAXIMUM_CAPACITY) {
break;
}
// 验证 tab 记录的变量是否为 table 数组,如果是就进行扩容操作
else if (tab == table) {
// 根据数组长度得到一个扩容戳
// (注: 每次扩容时都会生成一个类似时间戳的校验标记,上一次扩容和下一次扩容都不一样,这个值由当前数组的长度决定)
int rs = resizeStamp(n);
// 判断变量 sizeCtl 的值是否小于 0:
// - 如果 sizeCtl ≥ 0 则表示集合已经扩容完成;
// - 如果 sizeCtl < 0 则表示其它线程正在对当前集合进行初始化或扩容,则当前线程进行协助扩容;
// (注: 扩容时 sizeCtl 高 16 位表示扩容戳(校验标记),低 16 位表示(正在参与扩容的线程数+1))
if (sc < 0) {
Node<K,V>[] nt;
// 判断你是否符合下面条件,如果符合则说明扩容结束,则直接退出循环,判断条件如下:
// - sc >>> RESIZE_STAMP_SHIFT != rs 表示同一轮扩容中 sc 无符号右移比较高位和 rs 的值应该是相等的,
// 如果不相等说明扩容结束;
// - rs + 1 表示扩容结束;
// - rs + MAX_RESIZERS 表示扩容线程数达到最大扩容线程数;
// - (nt = nextTable) == null 表示 nextTable 还没有初始化或者正在进行初始化;
// - transferIndex <= 0 表示所有的 Node 都已经分配了线程;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs
|| sc == rs + 1
|| sc == rs + MAX_RESIZERS
|| (nt = nextTable) == null
|| transferIndex <= 0) {
break;
}
// 每增加一个线程来协助迁移,就使用 CAS 方法使 sizeCtl 值 +1, 然后当前线程协助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nt);
}
}
// 如果当前没有在扩容,那么 rs 肯定是一个正数
// 通过 rs<<RESIZE_STAMP_SHIFT 将 sc 设置为一个负数,+2 表示有一个线程在执行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) {
transfer(tab, null);
}
}
}
}

4.9 扩容方法 transfer(Node<K,V>[] tab, Node<K,V>[] nextTab)

/**
* 扩容方法
*
* @param tab 当前 table
* @param nextTab 扩容后的 table
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
// 记录当前数组 tabel 的长度
int n = tab.length, stride;
// 如果处 CPU 数量大于 1,则通过当前 table 中 Node 总量和 CPU 的数量来计算步长,即每个线程所允许处理的桶的数量
// 注:
// - NCPU 表示可以使用的 CPU 的数量;
// - MIN_TRANSFER_STRIDE 表示并发扩容时每个线程最少处理16个桶;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) {
stride = MIN_TRANSFER_STRIDE;
}
// 如果 nextTab 为 null,则说明是第一个线程进行转移,执行下面 if 中的逻辑。
if (nextTab == null) {
try {
// 创建一个容量为当前数组容量两倍的新数组,并赋给变量 nextTab,
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) {
// 如果执行过程中发生异常,则设置 sizeCtl = Integer.MAX_VALUE,然后直接退出不转移了
sizeCtl = Integer.MAX_VALUE;
return;
}
// 将新数组赋值给 nextTable,此时数组为一个没有数据的空数组
nextTable = nextTab;
// 使变量 transferIndex 记录当前数组的长度 n,表示转移开始的索引值
transferIndex = n;
}
// 将新数组的长度赋给变量 nextn
int nextn = nextTab.length;
// 变量 fwd 是于来标识某个桶处正在转移,其存储了新数组
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 变量 advance 标记用于确认是否需要在区间内继续往前前进处理
boolean advance = true;
// 变量 finishing 用于标记是否将所有的区间都迁移完成了
boolean finishing = false;
// 将 i 和 bound 的值初始化为 0,然后进行自旋,直到数据转移成功后再退出循环
for (int i = 0, bound = 0;;) {
Node<K,V> f; // 记录当前数组指定位置节点的变量
int fh; // 记录当前数组指定位置节点哈希值的变量
// 控制 --i,遍历原数组中的节点
while (advance) {
int nextIndex, nextBound;
// 某个线程的第二次循环时候才会走到这里来,说明 i 还在该线程需要转移的区间内,即 [bound <--> i <--> transferIndex)
if (--i >= bound || finishing) {
advance = false;
}
// transferIndex ≤ 0 说明已经到最前面了,当前 table 中所有数组元素都已经有其他线程负责扩容
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 走到这里说明 i < bound 且 transferIndex > 0 (即整个当前 table 数组中的桶的转移还没处理完成)
// 通过 CAS 继续认领一段区间进行转移,计算出当前线程所需要处理的区间,nextBound 就是下一个线程的下边界,
// 也是当线程的上边界
else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
// 如果 i < 0 或者 i >= n 说明已经超出了所需要处理的区间了
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// 判断遍历 finishing 的值:
// - 如果 finishing 为 true,则说明所有的桶都处理完了,执行下面 if 中的代码逻辑;
// - 如果 finishing 为 false,则说明害有桶没有处理完成,不执行 if 中的代码逻辑;
if (finishing) {
// 将 nextTable 置为空,并将新的数组赋值给变量 table
nextTable = null;
table = nextTab;
// 设置新的阈值,新数组的长度为 2n,所以阈值应该为 0.75 * 2n,即 2n - 0.5n
sizeCtl = (n << 1) - (n >>> 1);
// 跳出循环
return;
}
// CAS 更扩容阈值,在这里面 sizeCtl 值减 -1,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) {
return;
}
// 设置变量 finishing 和 advance 状态为 true
finishing = advance = true;
i = n;
}
}
// 如果索引为 i 的桶处为 null,则通过 CAS 方法将 fwd 转移节点放在原数组 i 索引的下标位置,
// 然后将 advance 设置为 true,然后退出循环继续往前
else if ((f = tabAt(tab, i)) == null) {
advance = casTabAt(tab, i, null, fwd);
}
// 获取当前数组 table 中 i 索引下标位置节点的 hash 值并赋给变量 fh,然后判断 hash 值是否为 MOVED(-1):
// - 如果 hash 值为 -1,则说明有其它线程正在对当前集合进行扩容,当前桶位置的节点为 ForwardingNode 转移节点,
// 这时就会将 advance 设置为 true,然后退出循环继续往前;
// - 如果 hash 值不为 -1,则说明当前结合没有进行扩容,不执行 if 中的代码逻辑;
else if ((fh = f.hash) == MOVED) {
advance = true;
} else {
// 使用 synchronized 进行加锁,并以当前数组下标位置节点 f 作为锁对象
synchronized (f) {
// 再次判断当前数组下标位置的节点是否为 f,只所以需要再次判断,这是因为在添加元素的时候,
// 有可能同时又有其它线程在移除元素,这种情况需要自旋操作来重新插入。
if (tabAt(tab, i) == f) {
// 判断数组下标位置节点的 hash 值是否大于等于 0:
// - 如果 hash >= 0,则说明当前数组下标位置关联的是链表;
// - 如果 hash < 0,则说明当前数组下标位置关联的是红黑树;
if (fh >= 0) {
// 遍历链表,找到第一个后面全部为相同的 runBit 的节点
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
// 如果 runBit == 0,则说明这些节点还要放在相同 index 的桶里,赋给变量 ln
if (runBit == 0) {
ln = lastRun;
hn = null;
}
// 否则 runBit == n,则说明这些节点还要放在相同 index + n 的桶里,赋给变量 hn
else {
hn = lastRun;
ln = null;
}
// 如果节点只有单个数据则直接拷贝,如果是链表则对链表进行遍历拷贝
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0) {
ln = new Node<K,V>(ph, pk, pv, ln);
} else {
hn = new Node<K,V>(ph, pk, pv, hn);
}
}
// 将节点 ln 插入新数组 table 的 i 处
setTabAt(nextTab, i, ln);
// 将节点 hn 插入新数组 table 的 i+n 处
setTabAt(nextTab, i + n, hn);
// 在原先 table 数组 i 位置设置 ForwardingNode 节点,表示该节点已经处理过了
// 将原数组 table 的索引 i 位置的节点设置为 fwd 节点
setTabAt(tab, i, fwd);
// 更新 advance 为 true,准备下次 for 循环
advance = true;
}
// 如果是 TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null) {
lo = p;
} else {
loTail.next = p;
}
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null) {
hi = p;
} else {
hiTail.next = p;
}
hiTail = p;
++hc;
}
}
// 扩容后树节点个数比较少的话,将红黑树转换为链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
---END---
如果本文对你有帮助,可以关注我的公众号 "小豆丁技术栈" 了解最新动态,顺便也请帮忙 Github 点颗星哦,感谢~

本文作者:超级小豆丁 @ 小豆丁技术栈

本文链接:http://www.mydlq.club/article/141/

本文标题:深入浅出 JAVA 之集合 - ConcurrentHashMap

本文版权:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!