• 0

  • 418

ConcurrentHashMap分析(一)整体结构

机器猫

机器学习

1个月前

前言

对于ConcurrentHashMap的分析,网上已经有很多完善的资料,对于它的源码分析对于巩固和提升关于Java并发思想以及Map集合的思考很有帮助。此文是作者自己关于ConcurrentHashMap的分析和总结。由于本人水平有限,分析过程中可能存在纰漏和错误,希望大家可以指出,一起学习,一起进步。

对于此次分析希望能达到以下目的:

  1. 了解ConcurrentHashMap对于并发的优化方法。
  2. 了解并发措施和Map集合的结合。

ConcurrentHashMap结构

下面是ConcurrentHashMap的类继承关系图:

1600838485715

对于ConcurrentHashMap继承了AbstractMap我们并不奇怪,因为这是每个Map集合都要继承的。倒是这个ConcurrentMap好像是个新鲜玩意,从名字中我们可以推测它里面包括Map集合的并发操作。而ConcurrentMap注释也说明了,它是一个提高线程安全和原子性保证的接口。

ConcurrentHashMap的代码加上注释有6000+行,如果直接从源码入手可能会猝死。所以我们先从它的整体架构,再深入其中的关键方法。我们先来看看它的内部结构。

1600862074854

节点种类

从上图中我们可以看到ConcurrentHashMap有五种节点,存储在一个table里,它们分别为:NodeTreeBinTreeNodeForwardingNodeReservationNode,对于NodeTreeNode我们可以理解,毕竟一个是链表节点,一个是树节点。可是为什么树的根节点是TreeBin而不是TreeNode,另外ForwardingNodeReservationNode又是什么呢?

上图中树的红黑节点不是随便标注的,其实ConcurrentHashMap的树形结构就是红黑树,如果了解了红黑树的同学肯定会对它左旋、右旋的平衡操作的复杂性记忆深刻。所以这里用TreeBin充当代理节点来进行这些操作,而TreeNode节点仅有查找方法。

这里也许会有人疑惑,为什么不用现成的TreeMap来代替TreeBin

TreeMap在调用put存放数据数据时,会默认对键进行比较排序,如果给TreeMap传入了Comparator或者键本身实现了Comparable接口,就会按照自定义规则进行排序。而ConcurrentHashMap并不需要这个功能,而且TreeMap在put方法中,会强转键为Comparable类型:

1600841664308

那么问题就来了,以下代码就会出现错误:

1600841768708

调用Double.compareTo时,会对入参Integer进行转型,但是Double和Integer不属于父子类关系,所以会报错类型转换异常。

关于ForwardingNode 节点,它的注释写道:

A node inserted at head of bins during transfer operations.
复制代码

在转换操作时插入到书头部的一个节点,应该是和ConcurrentHashMap的扩容缩容有关系,等到后面讲到时再具体分析

对于 ReservationNode类型的节点根据它的注释判断出它是起到一个类似占位的作用,好像现在没有什么和它有关系,暂时就先放着。

/**
 * A place-holder node used in computeIfAbsent and compute.
 * 用于 computeIfAbsent 和 compute 的占位节点
 */
复制代码

我们还要先了解ConcurrentHashMao的常量和变量字段,便于在后面分析代码时更好地理解。

常量

/**
 * 容量的最大值
 */
private static final int MAXIMUM_CAPACITY = 1 << 30;

/**
 * 容量的默认值
 */
private static final int DEFAULT_CAPACITY = 16;

/**
 * 数组的最大容量,这个在ArrayList里也有,大部分人赞同的答案是:
 * 减少一些机器发生内存溢出的可能性【https://www.zhihu.com/question/27999759】
 */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

/**
 * 这个哈希表默认的并发级别,用于兼容以前的版本
 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

/**
 * 负载因子
 */
private static final float LOAD_FACTOR = 0.75f;

/**
 * 链表转换为树的阈值
 */
static final int TREEIFY_THRESHOLD = 8;

/**
 * 树转换为链表的阈值
 */
static final int UNTREEIFY_THRESHOLD = 6;

/**
 * 在链表转换为树之前,当前槽节点的数量必须大于这个值
 */
static final int MIN_TREEIFY_CAPACITY = 64;

/**
 * 在树转换为链表之前,当前槽节点的数量必须不大于这个值
 */
private static final int MIN_TRANSFER_STRIDE = 16;

/**
 * 用于在扩容时生成随机数的种子
 */
private static final int RESIZE_STAMP_BITS = 16;

/**
 * 并行进行扩容的最大线程数
 */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;

/**
 * 记录sizeCtl大小所需要进行的偏移位数【现在没看懂什么意思】
 */
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;


static final int MOVED     = -1; // 标识 ForwardingNode 节点
static final int TREEBIN   = -2; // 标识红黑树根节点
static final int RESERVED  = -3; // 标识 ReservationNode 节点
static final int HASH_BITS = 0x7fffffff; // 标识普通节点

/** CPU的核心数量,用于扩容时使用 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
复制代码

变量

/**
 * Node 数组
 */
transient volatile Node<K,V>[] table;

/**
 * 扩容过程中使用的 Node 数组,采用渐进式数据迁移的方式
 */
private transient volatile Node<K,V>[] nextTable;

/**
 * 基础计数器,主要在无线程竞争的条件下使用,在 table 初始化时也被用来做 fallback 操作,通过 CAS 进行更新
 */
private transient volatile long baseCount;

/**
 * table 初始化和扩容的状态标识:
 * >0:数组初始化后的容量
 * 0:默认初始值
 * -1:单线程扩容
 * -1(1 + nThread):多线程扩容
 */
private transient volatile int sizeCtl;

/**
 * 扩容时另一个表的下标
 */
private transient volatile int transferIndex;

/**
 * 在扩容或创建 CounterCells 时的自旋锁.
 */
private transient volatile int cellsBusy;

/**
 * CounterCell 数组,用于热点数据分段计算,跟 LongAddr 的 Cell 差不多
 */
private transient volatile CounterCell[] counterCells;
复制代码

操作分析

put

put系列方法都是调用了putVal方法,我们来看下它的源码:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    // 根据 key 计算出 hash 值
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // 如果数组为空就先初始化数组
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 下标的计算方式:(table.length - 1) & key.hashCode
        // 如果计算出的下标的槽的元素为空,直接放入然后返回
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;                   // no lock when adding to empty bin
        }
        // 判断槽的状态是否为 MOVED,即该节点是一个 Fowarding 节点,
        // 正在进行扩容操作,尝试协助数据迁移。
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 无锁状态下节点首节点的 hash 是否和要添加的元素的 hash 一样,
        // 一样就直接设置值就可以了
        else if (onlyIfAbsent
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        else {
            V oldVal = null;
            synchronized (f) {
                // 判断一下 f 节点是不是首节点
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历链表,如果链表中存在和 key 的 hash 值相同的元素,替换该节点的 value 值
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    // 开始遍历树节点了
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 将 key-value 值放入红黑树中
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    // 前面写了,ReservationNode 节点是一个占位的辅助节点而已,所以这里直接抛出异常
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            // 判断要不要链表转红黑树
            if (binCount != 0) {
                // TREEIFY_THRESHOLD = 8
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}
复制代码

虽然putVal的方法很长,但是实际上只是对各种情况的判断而已。我们先来看看treeifyBin方法。

private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n;
    if (tab != null) {
        // MIN_TREEIFY_CAPACITY=64
        // 当 table 容量小于64时,只是对 table 进行扩容
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 链表转换成红黑树
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 包装成 TreeBin 类型,并放入 table[index] 中
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
复制代码

对于其中的initTablehelpTransferputTreeVal方法留到后面再说。

get

我们来看看get方法的源码:

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    // 计算 hash 值
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        // 首节点对应上了就直接返回
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        // 从红黑树/ForwardingNode/ReservationNode开始找
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        // 从链表开始找
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}
复制代码

对于链表是从头开始查找这是没什么可说的,但是红黑树的插入删除会涉及整个结构的调整,我们来看下红黑树的find的代码:

final Node<K,V> find(int h, Object k) {
            if (k != null) {
                for (Node<K,V> e = first; e != null; ) {
                    int s; K ek;
                    // 用链表的方式查找元素,有两种情况:
                    // 1. 有线程正持有写锁
                    // 2. 有线程等待获取写锁
                    if (((s = lockState) & (WAITER|WRITER)) != 0) {
                        if (e.hash == h &&
                            ((ek = e.key) == k || (ek != null && k.equals(ek))))
                            return e;
                        e = e.next;
                    }
                    // 读线程+1,同时更新读状态
                    else if (U.compareAndSetInt(this, LOCKSTATE, s,
                                                 s + READER)) {
                        TreeNode<K,V> r, p;
                        try {
                            p = ((r = root) == null ? null :
                                 r.findTreeNode(h, k, null));
                        } finally {
                            Thread w;
                            // 如果当前线程是最后一个读线程,且有写线程因为读锁而阻塞,则告诉写线程可以尝试获取写锁了
                            if (U.getAndAddInt(this, LOCKSTATE, -READER) ==
                                (READER|WAITER) && (w = waiter) != null)
                                LockSupport.unpark(w);
                        }
                        return p;
                    }
                }
            }
            return null;
        }
复制代码

ConcurrentHashMap运用了类似读写锁的方式,当有线程修改红黑树时,读线程采用链表的方式进行查找。

ForwardingNode 同样重写了 Node 的 find 方法,我们来看下它是怎么实现的:

Node<K,V> find(int h, Object k) {
        outer: for (Node<K,V>[] tab = nextTable;;) {
            Node<K,V> e; int n;
            // 键为空,table 为空,键对应的槽位空都是直接返回
            if (k == null || tab == null || (n = tab.length) == 0 ||
                (e = tabAt(tab, (n - 1) & h)) == null)
                return null;
            for (;;) {
                int eh; K ek;
                if ((eh = e.hash) == h &&
                    ((ek = e.key) == k || (ek != null && k.equals(ek))))
                    return e;
                if (eh < 0) {
                    // 如果 e 节点还是 ForwardingNode 节点,那么就去新的用于迁移数据的 table 找
                    if (e instanceof ForwardingNode) {
                        tab = ((ForwardingNode<K,V>)e).nextTable;
                        // 从最外部循环再次开始
                        continue outer;
                    }
                    else
                        // 这个方法就是调用 TreeBin 节点的 find 方法
                        return e.find(h, k);
                }
                // 查到最后了还没有找到那就直接返回 null 
                if ((e = e.next) == null)
                    return null;
            }
        }
    }
}
复制代码

ReservationNode 节点是占位节点,本身没有数据,所以就直接返回 null 了:

Node<K,V> find(int h, Object k) {
    return null;
}
复制代码

size

分析了getput方法后,前面部分的常量和变量已经用到了,但是还有个CounterCell数组没有看到在哪里使用过。前面说它类似于LongAddrCell,那么我们就来看看对于热点数据 ConcurrentHashMap 是怎么处理的,其中的主要应用从 size 方法开始:

public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}
复制代码

这里没什么好说的,就是获取元素总数,然后处理一下整型越界的情况再返回。我们把关注点放在sumCount方法:

final long sumCount() {
    CounterCell[] cs = counterCells;
    long sum = baseCount;
    if (cs != null) {
        for (CounterCell c : cs)
            if (c != null)
                sum += c.value;
    }
    return sum;
}
复制代码

我们来对比一下 LongAddersum 方法:

public long sum() {
    Cell[] cs = cells;
    long sum = base;
    if (cs != null) {
        for (Cell c : cs)
            if (c != null)
                sum += c.value;
    }
    return sum;
}
复制代码

是不是感觉就是copy了一份?很显然它们都是使用了分段计数的方法,我们来看看 CounterCell 这个槽对象在并发冲突时的处理方法。还记得我们在分析get方法时最后调用了一个addCount(1L, binCount);吗,它的处理逻辑可以在addCount 里看到:【后半部分涉及到扩容,这是个很大的知识点,所以需要另开一文,因此现在只对前面分析】

CounterCell[] cs; long b, s;
if ((cs = counterCells) != null ||
    // 如果 CAS 更新失败,说明出现并发竞争,那么就将计数器累加到 CounterCell 数组
    !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell c; long v; int m;
    boolean uncontended = true;
    if (cs == null || (m = cs.length - 1) < 0 ||
        // 根据线程的 random 计算槽的索引
        (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
          U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
        // 如果还是更新失败则执行该方法
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}
复制代码

addCount前半部分的处理逻辑为:如果counterCells为空,说明之前没有出现并发冲突,那么就可以直接将值加到baseCount上。否则就尝试更新counterCells[i]中的值,如果还是更新失败,那么说明槽也有并发冲突,那么就需要对槽进行扩容,调用方法fullAddCount,该方法的逻辑跟LongAdderlongAccumulate差不多一样,扩容成功后再次更新值。

总结

以上就是这篇文章的分析,主要通过 ConcurrentHashMap 的整体结构入手,先是分析了各个节点,以及各常量和变量的作用,然后通过了解主要的三个方法getputsize对其主要的处理逻辑有了一定的了解。后面会以 ConcurrentHashMap 的扩容入手,通过方法一步步感受 ConcurrentHashMap 那优美的并发处理逻辑。

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

机器学习

418

相关文章推荐

未登录头像

暂无评论