CAS原理 LongAdder思想

CAS底层

我们直接来看下JVM的CAS源码

//openjdk-jdk8u源码:/hotspot/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
  int mp = os::is_MP();
  // LOCK_IF_MP是宏定义
  // #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
  • __asm__ 表示的是嵌入一段汇编代码。
  • cmpxchgl 是汇编语言中 比较并交换 ,能保证原子操作。(关于硬件层面的指令,这里就不深入研究了)
  • LOCK_IF_MP是根据当前处理器的类型来决定是否为cmpxchg指令添加lock前缀。MP是指multiple processor,即多核处理器。如果程序是在多处理器上运行,就为cmpxchg指令加上lock前缀(lock cmpxchg)。反之,如果程序是在单处理器上运行,就省略lock前缀(单处理器自身会维护单处理器内的顺序一致性,不需要lock前缀提供的内存屏障效果)。

为什么要lock 可以参见我的Volatile原理篇

罗政:JMM+Volatile 理论


总结

利用了底层的汇编指令cmpxchgl(原子比较交换)来实现,如果是多核处理器,还添加了lock执行保证CPU缓存的一致。所以CAS操作是原子可靠的。



CAS应用层

CAS的全称为Compare And Swap,直译就是比较交换。是一条CPU的原子指令,其作用是让CPU先进行比较两个值是否相等,然后原子地更新某个位置的值,其实现方式是基于硬件平台的汇编指令,在intel的CPU中,使用的是cmpxchg指令,就是说CAS是靠硬件实现的,从而在硬件层面提升效率。

CAS有三个操作数:内存值V、旧的预期值A、要修改的值B,当且仅当预期值A和内存值V相同时,将内存值修改为B并返回true,否则什么都不做并返回false。

sun.misc.Unsafe介绍

工欲善其事必先利其器,为什么要先讲Unsafe?

Unsafe类是进行底层操作的方法集合,可以直接操作内存,进行一些非常规操作,所以说是"不安全"的操作,但是因为直接操作内存,它的效率很高,通常在在对性能有要求或者有底层操作需求的时候使用。

我们的CAS操作就是通过 sun.misc.Unsafe 类操作的(java8以下),Unsafe在jdk1.8.0/jre/lib/rt.jar包下。

怎么获取Unsafe实例?

public final class Unsafe {
    private static final Unsafe theUnsafe;
    private Unsafe() {}
    static{
         theUnsafe = new Unsafe();
    }
    public static Unsafe getUnsafe() {
         return theUnsafe;
    }
}

这里我们没法直接new对象,必须要通过反射来获取 theUnsafe 变量,下面来看下里面的几个重要方法。

public long objectFieldOffset(Field f)

获取字段的内存偏移地址,cas要用。内部是native代码实现的,不讲, 看一段实例代码:

private static Object unsafe;
    static {
        try {
            /** Unsafe在rt.jar下,不能直接实例化。必须通过反射 */
            Field field = Class.forName("sun.misc.Unsafe").getDeclaredField("theUnsafe");
            field.setAccessible(true);
            unsafe = field.get(null);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    static class Data {
        int intParam;
    }
    public static void main(String[] args) throws Exception {
        // 反射获取objectFieldOffset方法
        Method method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("objectFieldOffset", new Class<?>[] {Field.class});
        method.setAccessible(true);
        // 执行调用, 返回 Data类的intParam成员的偏移地址
        Object ret = method.invoke(unsafe, Data.class.getDeclaredField("intParam"));
        System.err.println(ret);
}

static静态块就是取得了Unsafe 类中的单例theUnsafe ,然后反射调用其objectFieldOffset方法,返回对象成员的内存偏移量。

public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);

这是重头戏,CAS操作的方法实现, 将对象o的偏移地址变量改成x,前提是x的值是expected,请接着上面的代码:

  public static void main(String[] args) throws Exception {
        // 反射获取objectFieldOffset方法
        Method method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("objectFieldOffset", new Class<?>[] {Field.class});
        method.setAccessible(true);
        // 执行调用, 返回 Data类的intParam成员的偏移地址
        long offset = (long) method.invoke(unsafe, Data.class.getDeclaredField("intParam"));
        // 获取 compareAndSwapInt 方法
        method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("compareAndSwapInt", new Class<?>[] {Object.class,
            long.class,int.class,int.class});
        method.setAccessible(true);
        Data data = new Data();
        data.intParam = 78;
        // 第4个参数: 预期的值   第5个参数: 要修改的值
        boolean success = (boolean) method.invoke(unsafe, data,offset,7,90);
        System.err.println("1 修改成功吗:"+success+ " , 修改后intParam:"+data.intParam);
        success = (boolean) method.invoke(unsafe, data,offset,78,90);
        System.err.println("2 修改成功吗:"+success+ " , 修改后intParam:"+data.intParam);
    }

我们发现第一次 预期值传了7 (实际上是78),所以我们修改失败,第二次才成功。

public native int getIntVolatile(Object o, long offset);

获得给定对象的指定偏移量offset的int值,使用volatile语义,总能获取到最新的int值。就是获取的主内存的值,并不是自己线程的副本。

我们都知道JMM内存模型 , 线程自己内存拥有一套副本,和主内存不一致 ,所以一个线程操作一个变量,另一个线程自己的副本不一定马上会更新,这样就会导致线程安全。

请看用CAS是如何解决上述问题的

  public static void main(String[] args) throws Exception {
        // 反射获取objectFieldOffset方法
        Method method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("objectFieldOffset", new Class<?>[] {Field.class});
        method.setAccessible(true);
        // 执行调用, 返回 Data类的intParam成员的偏移地址
        long offset = (long) method.invoke(unsafe, Data.class.getDeclaredField("intParam"));
        // 获取 compareAndSwapInt 方法
        method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("compareAndSwapInt", new Class<?>[] {Object.class,
            long.class,int.class,int.class});
        method.setAccessible(true);
        Data data = new Data();
        data.intParam = 78;
        
        while(true) {
            method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("getIntVolatile", new Class<?>[] {Object.class,
                long.class});
            //通过 getIntVolatile 方法获取主内存的值
            int expected = (int) method.invoke(unsafe, data,offset);
            // 比较主内存的值 和当前 线程副本的值是否一致,一致就更新,否则更新失败, 
            method = Class.forName("sun.misc.Unsafe").getDeclaredMethod("compareAndSwapInt", new Class<?>[] {Object.class,
                long.class,int.class,int.class});
            boolean success = (boolean) method.invoke(unsafe, data,offset,expected,90);
            System.err.println(success);
            if(success) {
                break;
            }
            // 更新失败,循环重试,直到更新成功为止
        }
    }

借助了 getIntVolatile 先获取主内存的值, 然后compareAndSwapInt 将值一直循环更新成功为止。这其实也就是我们所说的 自旋锁

其实java并发编程里面的juc包下的,什么AQS啊,AtomicInteger 等都是以上面这种骚操 作基础的,下面我们看下AtomicInteger 如何骚 的。

AtomicInteger 源码分析

public class AtomicInteger extends Number implements java.io.Serializable {
  private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    private volatile int value;
}

jdk当然可以直接使用getUnsafe方法来获取实例,然后把value的内存偏移量存储到valueOffset变量上,后面CAS操作直接用。 value 就是AtomicInteger 实际存储的值。且是 volatile 的。

incrementAndGet方法

  public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }

 public final int getAndAddInt(Object var1, long var2, int var4) {
   int var5;
   do {
   // 获取主内存的值
      var5 = this.getIntVolatile(var1, var2);
    // 将值变成原有的值var5 加上var4
   } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
   return var5;
}



直接调用Unsafe的getAndAddInt方法。getAndAddInt在多线程下也是安全的。

get方法

  public final int get() {
        return value;
    }

总结一下

(1)AtomicInteger中维护了一个使用volatile修饰的变量value,保证可见性;

(2)AtomicInteger中的主要方法最终几乎都会调用到Unsafe的compareAndSwapInt()方法保证对变量修改的原子性。


CAS带来的问题


ABA问题

假如你很牛逼,扣款的代码直接不加锁而是使用CAS来写。有这样一个场景:

  • A账户上有10块钱,娶媳妇需要提款5元,但是系统问题同时发起了两次扣款,相当于2个线程1,2并发。
  • 假如线程1先执行CAS,预期值是10,要修改成5 ,成功。然后准备到线程2,正常情况是 线程2 发现预期值是10,现在是5了,就会CAS失败不扣钱,这样系统就不会扣两次钱没问题, 但是发生了下面情况。
  • 在线程2 CAS之前,A的妈妈怕儿子娶媳妇钱不够,又往A账户上打了5块钱,这时,A的账户就恢复了10块钱。
  • 然后线程2 CAS 发现 卧槽,预期值是10,现在也是10,就毫不犹豫把钱扣了。A又只剩5块了。

妈妈,五块钱没了,我不取媳妇了,呜呜~~~~~~
其实上述问题原因就是CAS操作将值由A改为B然后又改成A , 另一个线程CAS的话是当做什么都没发生的。

看下JDK怎么利用 AtomicStampedReference 来解决这个问题的

public class AtomicStampedReference<V> {
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }
    private volatile Pair<V> pair;
    public boolean compareAndSet(V   expectedReference,
            V   newReference,
            int expectedStamp,
            int newStamp) {
        // 获取当前的(元素值,版本号)对
        Pair<V> current = pair;
        return
        // 引用没变
        expectedReference == current.reference &&
        // 版本号没变
        expectedStamp == current.stamp &&
        // 新引用等于旧引用
        ((newReference == current.reference &&
        // 新版本号等于旧版本号
        newStamp == current.stamp) ||
        // 构造新的Pair对象并CAS更新
        casPair(current, Pair.of(newReference, newStamp)));
        }
        
        private boolean casPair(Pair<V> cmp, Pair<V> val) {
        // 调用Unsafe的compareAndSwapObject()方法CAS更新pair的引用为新引用
        return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
    }
}
  • 首先把我们上面CAS操作的int,变成CAS操作对象Pair,原理是一样。
  • 加了个版本号stamp,只有版本号不一样时,CAS才操作成功。
  • 上面代码流程: 如果元素值和版本号都没有变化,并且和新的也相同,返回true;如果元素值和版本号都没有变化,并且和新的不完全相同,就构造一个新的Pair对象并执行CAS更新pair。

并发自旋耗cpu多

在并发量比较低的情况下,线程冲突的概率比较小,自旋的次数不会很多。但是,高并发情况下,N个线程同时进行自旋操作,会出现大量失败并不断自旋的场景。 JDK8中出现了 LongAdder 来解决AtomicLong的上述并发大的问题。

AtomicLong中有个内部变量value保存着实际的long的值,高并发场景下,value变量就是N个线程竞争的一个热点。

LongAdder的基本思路就是分散热点,将value值分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个值进行CAS操作,这样热点就被分散了,冲突的概率就小很多。如果要获取真正的long值,只要将各个槽中的变量值累加返回即可。

LongAdder 大致原理

我们知道, AtomicLong 中有个内部变量 value 保存着实际的 long 值,所有的操作都是针对该变量进行。也就是说,高并发环境下, value 变量其实是一个 热点数据 ,也就是 N个线程竞争一个热点。

LongAdder 的基本思路就是 分散热点 ,将 value 值的新增操作分散到一个数组中,不同线程会命中到数组的不同槽中,各个线程只对自己槽中的那个 value 值进行 CAS 操作,这样热点就被分散了,冲突的概率就小很多。

LongAdder 有一个全局变量 volatile long base 值,当并发不高的情况下都是通过 CAS 来直接操作 base 值,如果 CAS 失败,则针对 LongAdder 中的 Cell[] 数组中的 Cell 进行 CA S操作,减少失败的概率。

例如当前类中 base = 5 ,有三个线程进行 CAS 原子性的 +1操作 线程一执行成功,此时base=11 线程二、线程三执行失败后 开始针对于 Cell[] 数组中的 Cell 元素进行 +1操作 ,同样也是 CAS 操作,此时数组 index=1 index=2 Cell value 都被设置为了1.

执行完成后,统计累加数据: sum = 5 + 1 + 1 = 7


强烈推荐一个 进阶 JAVA架构师 的博客

Java架构师修炼

支付宝打赏 微信打赏

如果文章对您有帮助,您可以鼓励一下作者