并发之Striped64(累加器)和 LongAdder
閱讀本文約花費: 10 (分鐘)
并发之Striped64(累加器)
Striped64是在java8中添加用来支持累加器的并发组件,它可以在并发环境下使用来做某种计数,Striped64的设计思路是在竞争激烈的时候尽量分散竞争,在实现上,Striped64维护了一个base Count和一个Cell数组,计数线程会首先试图更新base变量,如果成功则退出计数,否则会认为当前竞争是很激烈的,那么就会通过Cell数组来分散计数,Striped64根据线程来计算哈希,然后将不同的线程分散到不同的Cell数组的index上,然后这个线程的计数内容就会保存在该Cell的位置上面,基于这种设计,最后的总计数需要结合base以及散落在Cell数组中的计数内容。这种设计思路类似于java7的ConcurrentHashMap实现,也就是所谓的分段锁算法,ConcurrentHashMap会将记录根据key的hashCode来分散到不同的segment上,线程想要操作某个记录只需要锁住这个记录对应着的segment就可以了,而其他segment并不会被锁住,其他线程任然可以去操作其他的segment,这样就显著提高了并发度,虽然如此,java8中的ConcurrentHashMap实现已经抛弃了java7中分段锁的设计,而采用更为轻量级的CAS来协调并发,效率更佳。
原理如下图所示:
/** * cell数组,容量为2的次幂,初始值为2 */ transient volatile Cell[] cells; /** * 基础值,在更新操作时基于CAS无锁技术实现原子更新 */ transient volatile long base; /** * 自旋锁 用于保护创建或者扩展Cell表。 */ transient volatile int cellsBusy; /** * Package-private default constructor */ Striped64() { } /** * 使用CAS来更橷值 */ final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } /** * CASes the cellsBusy field from 0 to 1 to acquire lock. * 它的作用是当要修改cells数组时加锁,防止多线程同时修改cells数组,0为无锁,1为加锁,加锁的状况有三种 * 1. cells数组初始化的时候; * 2. cells数组扩容的时候; * 3. 如果cells数组中某个元素为null,给这个位置创建新的Cell对象的时候; */ final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } /** * Returns the probe value for the current thread. * Duplicated from ThreadLocalRandom because of packaging restrictions. */ static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); }
JavaCopy
AtomicLong只有一个value,所有线程累加都要通过cas竞争value这一个变量,高并发下线程争用非常严重;
而LongAdder则有两个值用于累加,一个是base,它的作用类似于AtomicInteger里面的value,在没有竞争的情况不会用到cells数组,它为null,这时使用base做累加,有了竞争后cells数组就上场了,第一次初始化长度为2,以后每次扩容都是变为原来的两倍,直到cells数组的长度大于等于当前服务器cpu的数量为止就不在扩容(想下为什么到超过cpu数量的时候就不再扩容);每个线程会通过线程对cells[threadLocalRandomProbe & cells.length]位置的Cell对象中的value做累加,这样相当于将线程绑定到了cells中的某个cell对象上;
看一下LongAdder类中的几个关键方法:
/** * 如果一下两种条件则继续执行if内的语句 * 1. cells数组不为null(不存在争用的时候,cells数组一定为null,一旦对base的cas操作失败,才会初始化cells数组) * 2. 如果cells数组为null,如果casBase执行成功,则直接返回,如果casBase方法执行失败(casBase失败,说明第一次争用冲突产生,需要对cells数组初始化)进入if内; * casBase方法很简单,就是通过UNSAFE类的cas设置成员变量base的值为base+要累加的值 * casBase执行成功的前提是无竞争,这时候cells数组还没有用到为null,可见在无竞争的情况下是类似于AtomticInteger处理方式,使用cas做累加。 */ public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { //uncontended判断cells数组中,当前线程要做cas累加操作的某个元素是否存在争用,如果cas失败则存在争用;uncontended=false代表存在争用,uncontended=true代表不存在争用。 boolean uncontended = true; //如果进入if语句执行longAccumulate方法,有三种情况 //1. 前两个条件代表cells没有初始化, //2. 第三个条件指当前线程hash到的cells数组中的位置还没有其它线程做过累加操作, //3. 第四个条件代表产生了冲突,uncontended=false if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } } /** * 自增1 */ public void increment() { add(1L); } /** * 自减1 */ public void decrement() { add(-1L); } /** * 将多个cell数组中的值加起来的和就类似于AtomicLong中的value */ public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
JavaCopy
Striped64中最重要的方法longAccumulate,实现不同的线程更新各自Cell中的值,相当于分段锁。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { //获取当前线程的threadLocalRandomProbe值作为hash值,如果当前线程的threadLocalRandomProbe为0,说明当前线程是第一次进入该方法,则强制设置线程的threadLocalRandomProbe为ThreadLocalRandom类的成员静态私有变量probeGenerator的值,后面会详细将hash值的生成; //另外需要注意,如果threadLocalRandomProbe=0,代表新的线程开始参与cell争用的情况 //1.当前线程之前还没有参与过cells争用(也许cells数组还没初始化,进到当前方法来就是为了初始化cells数组后争用的),是第一次执行base的cas累加操作失败; //2.或者是在执行add方法时,对cells某个位置的Cell的cas操作第一次失败,则将wasUncontended设置为false,那么这里会将其重新置为true;第一次执行操作失败; //凡是参与了cell争用操作的线程threadLocalRandomProbe都不为0; int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } //cas冲突标志,表示当前线程hash到的Cells数组的位置,做cas累加操作时与其它线程发生了冲突,cas失败;collide=true代表有冲突,collide=false代表无冲突 boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; //这个主干if有三个分支 //1.主分支一:处理cells数组已经正常初始化了的情况(这个if分支处理add方法的四个条件中的3和4) //2.主分支二:处理cells数组没有初始化或者长度为0的情况;(这个分支处理add方法的四个条件中的1和2) //3.主分支三:处理如果cell数组没有初始化,并且其它线程正在执行对cells数组初始化的操作,及cellbusy=1;则尝试将累加值通过cas累加到base上 if ((as = cells) != null && (n = as.length) > 0) { //这个是处理add方法内部if分支的条件3:如果被hash到的位置为null,说明没有线程在这个位置设置过值,没有竞争, //可以直接使用,则用x值作为初始值创建一个新的Cell对象,对cells数组使用cellsBusy加锁,然后将这个Cell对象放到cells[m & cells.length]位置上 if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail //如果add方法中条件4的通过cas设置cells[m%cells.length]位置的Cell对象中的value值设置为v+x失败,说明已经发生竞争,将wasUncontended设置为true,跳出内部的if判断,最后重新计算一个新的probe,然后重新执行循环; wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) //新的争用线程参与争用的情况:处理刚进入当前方法时threadLocalRandomProbe=0的情况,也就是当前线程第一次参与cell争用的cas失败,这里会尝试将x值加到cells[m%cells.length]的value ,如果成功直接退出 break; else if (n >= NCPU || cells != as) //分支3处理新的线程争用执行失败了,这时如果cells数组的长度已经到了最大值(大于等于cup数量),或者是当前cells已经做了扩容,则将collide设置为false,后面重新计算prob的值 collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
JavaCopy
参考文章:https://www.cnblogs.com/gosaint/p/9129867.html