不断的学习,我们才能不断的前进
一个好的程序员是那种过单行线马路都要往两边看的人

ConcurrentHashMap

ConcurrentHashMap是线程安全的Map结构

底层的数据结构

底层的数据结构采用数组+链表+红黑树的结构,其中Node结构里面有hash、key、value、next指针,当发生hash冲突的时候,需要用链地址法来解决冲突。当链表长度大于8且容量大于64的时候,会转换成红黑树结构,TreeNode结点继承于Node 结点,里面有parent、left、right指针,内部也维护了Node结点的next指针,因为在进行扩容迁移的时候,需要使用到这个next指针,把链表或者红黑树转换成高低位两个链表。
除此之外还有sizeCtrl,大于0表示用来控制扩容的阈值,-1表示并发情况下的初始化中(值为-1),小于0 但不等于-1 表示扩容中。
transferIndex 用来记录扩容时,旧表当前扩容的位置

public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {
    // 扩容的时候存储,旧表的索引位置
    private transient volatile int transferIndex;
    private transient volatile Node<K,V>[] nextTable;// 扩容的时候会使用
    
    transient volatile Node<K,V>[] table;// map数组
    
    private transient volatile int sizeCtl;// 控制表的初始化 或者扩容
    
    private static final float LOAD_FACTOR = 0.75f;// 加载因子
    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;// 默认初始容量
    
    static final int MOVED     = -1; // hash for forwarding nodes
    static final int TREEBIN   = -2; // hash for roots of trees
    static final int RESERVED  = -3; // hash for transient reservations
    static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
    
    static final int UNTREEIFY_THRESHOLD = 6; // 树化退为链化的阈值
    static final int TREEIFY_THRESHOLD = 8; // 树化阈值
}
    

负载因子可以修改吗?

负载因子不可以修改,因为是final关键字修饰的,这点跟HashMap不同

Node.hash字段为什么必须大于等于0?

hashcode的计算方式跟HashMap不一样,是高16位跟低16位进行异或运算,这个是跟HashMap是一样的,因为在进行寻址运算时,(数组的长度-1)只有低位是1,高位是0,所以导致高位0跟任何数相于结果都是0,没有充分考虑高位的信息;
最后在强制使最高位变成0,让hachcode结果大于0,原因就是,负数有特殊的意义:

  1. 在做数据迁移的时候,可能存在并发访问的情况,这个时候旧表和新表都有数据,就需要标记旧表哪些桶 迁移到了新表。已经迁移的旧结点就称为ForwardingNode,并且结点的hashcode为-1。
  2. 还有红黑树的情况,需要特殊的代理结点TreeBin,其中对应的hashcode是-2;

-1 表示正在做数据迁移
-2 表示红黑树TREEBIN的时候

sizeCtl字段的含义?

sizeCtl字段用来控制并发Map的初始化(sizeCtl==-1),触发扩容的阈值(sizeCtl>0)、处于扩容中(sizeCtl<0 && !=-1)。

初始化

并发Map的创建,是通过懒加载的机制实现的,在创建并发Map对象的时候,会计算sizeCtl的大小,但是不会进行创建:

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (initialCapacity < concurrencyLevel)   // Use at least as many bins
            initialCapacity = concurrencyLevel;   // as estimated threads
        long size = (long)(1.0 + (long)initialCapacity / loadFactor);
        int cap = (size >= (long)MAXIMUM_CAPACITY) ?
            MAXIMUM_CAPACITY : tableSizeFor((int)size);
        this.sizeCtl = cap; // 初始容量是16
    }

并发情况initTable

因为采用懒加载的思想,所以在第一次put的时候会进行初始化,触发initTable:

  1. 初始化的阶段可能存放多线程并发访问的情况。
  2. 所以第一线程执行到初始化的时候,先保存SIZECTL扩容阈值,然后CAS设置SIZECTL为-1,然后进行初始化。
  3. 这个时候其他线程也来进行初始化时,会发现SIZECTL为-1,表示当前正在进行扩容中,所以会触发yield让出CPU系统资源。
  4. 然后计算新的扩容阈值,默认为16的话,新的扩容阈值为12;

用了双重检测null,来避免被重复初始化
sizeCtl=-1 表示正在初始化中

     private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0) // 其他线程在来访问时,会发现SIZECTL=-1,所以会yield让出CPU资源
                Thread.yield(); // lost initialization race; just spin
            // 第一个线程进行初始化的时候,CAS把SIZECTL设置为-1,表示初始化中
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2); // 计算新的扩容阈值
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

扩容的时候,sizeCtl<0 && sizeCtl!=-1

sizeCtl大于0的时候,表示扩容的阈值,然后在扩容状态中时,会CAS设置为sizeCtl小于0,但不等于-1

扩容标识戳sizeCtl的计算方式?

sizeCtl<0 && sizeCtl!=-1表示扩容中,其中高16 位表示扩容标识戳,低16位表示参与扩容工作的线程数量 +1。扩容标识戳必须保证每个线程计算出来的值一致。不然无法实现并发扩容。

扩容

扩容在后面有讲

ConcurrentHashMap如何保证并发数据安全?

通过put操作来向ConcurrentHashMap里面put值,首先通过Hash寻址算法找到,需要插入的数组下标;然后分为四个情况:第一个是该位置为NUll、第二个是该位置有链表、第三个是该位置已经变成红黑树、第四种情况是当前处于扩容中;

  • 如果是为NUll的情况,构建一个Node结点,然后通过CAS放进去。
  • 如果其他情况,则给当前桶的头节点加上synchronized锁,保证桶内写操作是线程安全的
    • 当为链表的时候:首先遍历桶中的结点,如果发现key,value都相同的结点,则直接进行覆盖;否则插入到末尾结点,然后判断是否达到树化阈值。
    • 当为红黑树的时候,直接进行红黑树的插入。
  • 如果处于扩容中,则当前线程加入扩容队伍中,帮助进行扩容
    插入完之后,会判断达到扩容阈值,这部分比较复杂。
     final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable(); // 懒加载进行初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            // CAS 放入空的index位置
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED) // 当前线程帮助进行扩容
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) { // 双重判断
                        if (fh >= 0) { // 链表情况
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 红黑树的情况,执行红黑树的插入
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) { // 判断是否达到树化阈值
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);// 表中数据大小加1,会判断是否需要扩容
        return null;
    }

寻址算法?

hashcode的计算方式跟HashMap不一样,是高16位跟低16位进行异或运算,这个是跟HashMap是一样的,主要是因为在进行寻址运算时,(数组的长度-1)只有低位是1,高位是0,所以导致高位0跟任何数相于结果都是0,没有充分考虑高位的信息。然后并发Map需要保证hashcode的值大于0,所以强制使得最高位为1。 因为hashcode=-1 表示forwarding nodes 结点处于扩容中,-2 表示红黑树结点

并发情况下如何统计当前散列表的数据量?

使用的是LongAdder,因为使用CAS的AutomicInteger,在并发量小的时候性能还不错,但是在高并发的情况下性能不够。比如在高并发的情况下,都进行CAS操作,只有一个线程会 CAS成功,其他线程都会失败,所以比较耗费性能。

LongAdder 如何解决大并发下的性能问题?

LongAdder 利用热点数据分离的方法,主要思想采用的是MapReduce的思想,分治再汇总。因为传统的AutomicInteger类,在高并发的情况下每次只有一个线程能够CAS成功,而其他失败的线程,就会进入自旋,进入下一次的CAS比较。就比如有1000个线程进行同时CAS进行增加,只有一个会成功,其他999都会失败,然后拿到新的内存地址后,又进行新的一轮CAS。这个在高并发的情况下是很耗费性能的。

LongAdder的数据结构

一个base单元,一个cell数组,当没有线程竞争的时候,CAS累加base单元,如果有竞争的话,即CAS失败的时候,当前线程会去创建一个cell数组,cell单元里面也是存放的value值,然后通过hash算法,得到当前线程的桶位置,累加cell单元里面的value值;最后将base的值和所有的cell单元的值累加,就得到最终的结果类。

并发情况下的扩容问题?

通过LongAdder来统计元素的个数,当表内数据的容量达到扩容阈值的时候就会触发扩容
当前线程创建一个nextTbale,容量是旧表的两倍,然后设置transferIndex为旧表的容量大小,从后往前开始迁移桶内的元素,会计算每次为该任务分配的桶的大小,在迁移的时候,线程会使用synchronized锁住当前桶的头结点,避免并发情况下数据安全的问题。

默认每个扩容线程分配任务的桶的大小是MIN_TRANSFER_STRIDE=16

如果在扩容的时候遇到get操作,会仍然在旧表里面进行访问
如果遇到插入、删除操作,则会加入到迁移过程中,为当前线程分配扩容任务,等到扩容完毕后,在去新表里面进行相应的操作。

触发扩容条件的线程,做了哪些预处理工作?

修改sizeCtrl 表示正在进行扩容中,根据扩容前的散列表长度,计算出扩容唯一标识戳。是一个16位的值。SizeCtl 高16位存储扩容唯一标识戳,低16位存储值参与扩容工作的线程数+1. 因为线程是触发扩容线程,将 sizeCtl 低于16位直接设置成 2. 就表示一个线程可以工作。

这个线程需要创建一个新的 table, 大小大概就是扩容前的两倍,也就是map.nextTbale,还需要保存老表的长度到 map.transferIndex 字段, 这个字段记录老表的总迁移进度。迁移工作从高位桶开始,一直迁移到下标为0的桶

迁移完的线程怎么标记的?

迁移的时候会创建一个 ForwardingNode 对象,这个Node 比较特殊,用来表示指定 Slot 已经被迁移完毕的旧桶

ForwardingNode有什么功能?

ForwardingNode除了标记为旧桶,里面还有个字段是新表的位置,然后提供方法find,在新表里面进行查找。

ForwardingNode 结点里面存放了新表的引用

散列表正在扩容时,再来写请求如何处理?

如果写入的结点还没有被迁移,则拿到桶头结点的锁,执行正常的插入流程;
如果写操作访问的桶,头节点正好是 ForwardingNode 节点,则让当前结点,执行桶迁移的过程,等迁移到新桶后,再进行插入。

扩容期间,扩容工作线程如何维护sizeCtl的低16位?

来一个线程执行任务的时候,sizeCtl的低16为执行加1,任务结束的时候就减1

最后退出扩容的线程,需要执行哪些收尾工作?

sizeCtl执行减1,当减为1时,表示扩容工作完成,需要检查旧表查看是否有没有被迁移的桶,如果都被迁移了,则退出并重新计算sizeCtl 为新的扩容大小

扩容源码分析

private final void addCount(long x, int check) {
        // 执行size++操作,增加map的容量
        CounterCell[] as; long b, s;
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        // 如果增加之后的容量大于 指定的阈值 则进行扩容
        if (check >= 0) {
            Node<K,V>[] tab, nt; int n, sc;
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 修改SIZECTL的值,为负数,高位表示扩容标识符,低位表示参与扩容的线程数量
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt); // 执行扩容操作
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }
    
    // ======================================扩容逻辑=============
    /**
    * 1. 计算每次扩容时分配的任务,桶的数量
    * 2. 初始化nextTab,初始化transferIndex为旧表的大小(从后往前开始迁移)
    * 3. CAS 修改TRANSFERINDEX 为分配任务后的大小(假设旧表容量为32,一次分配的任务大小是16,所以TRANSFERINDEX=32-16=16,再下一次来分配任务后就变为0)
    * 4. 开始迁移,通过hash寻址判断找到当前结点的位置,可能为null、链表、红黑树
    * 当前结点为null,直接CAS设置为ForwardingNode结点
    * 当前位置为链表,直接拆分为高低位链,低位链表不变,高位增加旧表的容量大小。
    * 当前位置为红黑树,高低位链拆分,拆分后判断是否达到链化阈值6
    **/
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        int n = tab.length, stride;
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // 分配的桶的数量
        if (nextTab == null) {            // 初始化nextTab
            try {
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;// transferIndex为旧表的大小
        }
        int nextn = nextTab.length;
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
        boolean advance = true;
        boolean finishing = false; // to ensure sweep before committing nextTab
        for (int i = 0, bound = 0;;) {
            Node<K,V> f; int fh;
            while (advance) { // 分配任务,并CAS修改TRANSFERINDEX
                int nextIndex, nextBound;
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }// 完成任务后SIZECTL-1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            else if ((f = tabAt(tab, i)) == null)// 空结点直接放上Forwarding结点
                advance = casTabAt(tab, i, null, fwd);
            else if ((fh = f.hash) == MOVED)
                advance = true; // already processed
            else {
                synchronized (f) {// 当前桶上加锁
                    if (tabAt(tab, i) == f) {
                        Node<K,V> ln, hn;
                        if (fh >= 0) {// 链表高低位链进行拆分
                            int runBit = fh & n;
                            Node<K,V> lastRun = f;
                            for (Node<K,V> p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 红黑树结点的高低位链进行拆分,拆分后要判断是否达到链化阈值6
                        else if (f instanceof TreeBin) {
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>
                                    (h, e.key, e.val, null, null);
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

TreeBin这个字段?

桶升级成红黑树,且当前红黑树上有读线程访问,再来写请求怎么办?

直接拒绝,因为在执行写入操作的时候,红黑树会进行自旋,这会导致结点位置发生改变,而另外一个线程正在执行读操作,所以会出现问题。

红黑树正在执行写操作,再来读请求怎么办?

直接读红黑树的链表结构,也就是使用next指针来读取下一个数据。


目录