Java中CAS机制详解 - Java技术债务


概述

传统的并发控制手段,如使用synchronized关键字或者ReentrantLock等互斥锁机制,虽然能够有效防止资源的竞争冲突,但也可能带来额外的性能开销,如上下文切换、锁竞争导致的线程阻塞等。而此时就出现了一种乐观锁的策略,以其非阻塞、轻量级的特点,在某些场合下能更好地提升并发性能,其中最为关键的技术便是Compare And Swap(简称CAS)。

CAS是一种无锁算法,它在硬件级别提供了原子性的条件更新操作,允许线程在不加锁的情况下实现对共享变量的修改。在Java中,CAS机制被广泛应用于java.util.concurrent.atomic包下的原子类以及高级并发工具类如AbstractQueuedSynchronizer(AQS)的实现中。

CAS的基本概念

CAS是原子指令,一种基于锁的操作,而且是乐观锁,又称无锁机制。CAS操作包含三个基本操作数:内存位置、期望值和新值。

  1. 主内存中存放的共享变量的值:V(一般情况下这个V是内存的地址值,通过这个地址可以获得内存中的值)。
  2. 工作内存中共享变量的副本值,也叫预期值:A。
  3. 需要将共享变量更新到的最新值:B。

CAS基本原理

在执行CAS操作时,计算机会检查内存位置当前是否存放着期望值,如果是,则将内存位置的值更新为新值;若不是,则不做任何修改,保持原有值不变,并返回当前内存位置的实际值。

CAS操作通过一条CPU的原子指令,保证了比较和更新的原子性。在执行CAS操作时,CPU会判断当前系统是否为多核系统,如果是,则会给总线加锁,确保只有一个线程能够执行CAS操作。这种独占式的原子性实现方式,比起使用synchronized等重量级锁,具有更短的排他时间,因此在多线程情况下性能更佳。

Java中的CAS实现

在Java中,CAS机制被封装在jdk.internal.misc.Unsafe类中,尽管这个类并不建议在普通应用程序中直接使用,但它是构建更高层次并发工具的基础,例如java.util.concurrent.atomic包下的原子类如AtomicIntegerAtomicLong等。这些原子类通过JNI调用底层硬件提供的CAS指令,从而在Java层面上实现了无锁并发操作。

Java的标准库中,特别是jdk.internal.misc.Unsafe类提供了一系列compareAndSwapXXX方法,这些方法底层确实是通过C++编写的内联汇编来调用对应CPU架构的cmpxchg指令,从而实现原子性的比较和交换操作。

Java中CAS机制详解 - Java技术债务

public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    //由这里可以看出来,依赖jdk.internal.misc.Unsafe实现的
    private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
    private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");

    private volatile int value;

	public final boolean compareAndSet(int expectedValue, int newValue) { 
	    // 调用 jdk.internal.misc.Unsafe的compareAndSetInt方法
	    return U.compareAndSetInt(this, VALUE, expectedValue, newValue);  
	}
}

Unsafe中的compareAndSetInt使用了@HotSpotIntrinsicCandidate注解修饰,@HotSpotIntrinsicCandidate注解是Java HotSpot虚拟机(JVM)的一个特性注解,它表明标注的方法有可能会被HotSpot JVM识别为“内联候选”,当JVM发现有方法被标记为内联候选时,会尝试利用底层硬件提供的原子指令(比如cmpxchg指令)直接替换掉原本的Java方法调用,从而在运行时获得更好的性能。

public final class Unsafe {
	@HotSpotIntrinsicCandidate  
	public final native boolean compareAndSetInt(Object o, long offset,  
	                                             int expected,  
	                                             int x);
}                                            
  • compareAndSetInt这个方法我们可以从openjdkhotspot源码(位置:hotspot/src/share/vm/prims/unsafe.cpp)中可以找到。

  • hostspot中的Unsafe_CompareAndSetInt函数会统一调用Atomiccmpxchg函数

    UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSetInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x)) {
    
    oop p = JNIHandles::resolve(obj);
    
    jint* addr = (jint *)index_oop_from_field_offset_long(p, offset);
    // 统一调用Atomic的cmpxchg函数
    return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
    
    } UNSAFE_END
    
  • Atomiccmpxchg函数源码(位置:hotspot/src/share/vm/runtime/atomic.hpp)如下:

    /**
    *这是按字节大小进行的`cmpxchg`操作的默认实现。它使用按整数大小进行的`cmpxchg`来模拟按字节大小进行的`cmpxchg`。不同的平台可以通过定义自己的内联定义以及定义`VM_HAS_SPECIALIZED_CMPXCHG_BYTE`来覆盖这个默认实现。这将导致使用特定于平台的实现而不是默认实现。
    *  exchange_value:要交换的新值。
    *  dest:指向目标字节的指针。
    *  compare_value:要比较的值。
    *  order:内存顺序。
    */
    inline jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest,
                                 jbyte compare_value, cmpxchg_memory_order order) {
      STATIC_ASSERT(sizeof(jbyte) == 1);
      volatile jint* dest_int =
          static_cast(align_ptr_down(dest, sizeof(jint)));
      size_t offset = pointer_delta(dest, dest_int, 1);
      // 获取当前整数大小的值,并将其转换为字节数组。
      jint cur = *dest_int;
      jbyte* cur_as_bytes = reinterpret_cast(&cur);
    
      // 设置当前整数中对应字节的值为compare_value。这确保了如果初始的整数值不是我们要找的值,那么第一次的cmpxchg操作会失败。
      cur_as_bytes[offset] = compare_value;
    
      // 在循环中,不断尝试更新目标字节的值。
      do {
        // new_val
        jint new_value = cur;
        // 复制当前整数值,并设置其中对应字节的值为exchange_value。
        reinterpret_cast(&new_value)[offset] = exchange_value;
    	// 尝试使用新的整数值替换目标整数。
        jint res = cmpxchg(new_value, dest_int, cur, order);
        if (res == cur) break; // 如果返回值与原始整数值相同,说明操作成功。
    
        // 更新当前整数值为cmpxchg操作的结果。
        cur = res;
        // 如果目标字节的值仍然是我们之前设置的值,那么继续循环并再次尝试。
      } while (cur_as_bytes[offset] == compare_value);
      // 返回更新后的字节值
      return cur_as_bytes[offset];
    }
    

cmpxchg指令是多数现代CPU支持的原子指令,它能在多线程环境下确保一次比较和交换操作的原子性,有效解决了多线程环境下数据竞争的问题,避免了数据不一致的情况。例如,在更新一个共享变量时,如果期望值与当前值相匹配,则原子性地更新为新值,否则不进行更新操作,这样就能在无锁的情况下实现对共享资源的安全访问。 我们以java.util.concurrent.atomic包下的AtomicInteger为例,分析其compareAndSet方法。

而由cmpxchg函数中的do...while我们也可以看出,当多个线程同时尝试更新同一内存位置,且它们的期望值相同但只有一个线程能够成功更新时,其他线程的CAS操作会失败。对于失败的线程,常见的做法是采用自旋锁的形式,即循环重试直到成功为止。这种方式在低竞争或短时间窗口内的并发更新时,相比于传统的锁机制,它避免了线程的阻塞和唤醒带来的开销,所以它的性能会更优。

什么是unsafe

什么是unsafe呢?Java语言不像C,C++那样可以直接访问底层操作系统,但是JVM为我们提供了一个后门,这个后门就是unsafe。unsafe为我们提供了硬件级别的原子操作。

CAS是一种原子操作。那么Java是怎样来使用CAS的呢?

我们知道,在Java中,如果一个方法是native的,那Java就不负责具体实现它,而是交给底层的JVM使用c或者c++去实现。

Unsafe类是JDK提供的一个不安全的类,它提供了一些底层的操作,包括内存操作、线程调度、对象实例化等。它的作用是让Java可以在底层直接操作内存,从而提高程序的效率。但是,由于Unsafe类是不安全的,所以只有JDK开发人员才能使用它,普通开发者不建议使用。它里面大多是一些native方法,其中就有几个关于CAS的:

boolean compareAndSwapObject(Object o, long offset,Object expected, Object x);
boolean compareAndSwapInt(Object o, long offset,int expected,int x);
boolean compareAndSwapLong(Object o, long offset,long expected,long x);
	```

- 调用`compareAndSwapInt、compareAndSwapLong或compareAndSwapObject`方法时,会传入三个参数,分别是需要修改的**变量V、期望的值A和新值B。**
- 方法会**先读取变量V的当前值**,如果当前值等于期望的值A,则使用新值B来更新变量V,否则不做任何操作。
- 方法会返回更新操作是否成功的标志,如果更新成功,则返回true,否则返回false。
    
    由于CAS操作是**基于底层硬件支持的原子性指令来实现的**,所以它可以保证操作的原子性和线程安全性,同时也可以避免使用锁带来的性能开销。因此,CAS操作广泛应用于并发编程中,比如**实现无锁数据结构、实现线程安全的计数器**等。
    

## 原子操作类解析

看一下`AtomicInteger`当中常用的自增方法`incrementAndGet`

```java
public class AtomicInteger extends Number implements java.io.Serializable {
    private static final long serialVersionUID = 6214790243416807050L;

    // setup to use Unsafe.compareAndSwapInt for updates
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
        try {
            valueOffset = unsafe.objectFieldOffset
                (AtomicInteger.class.getDeclaredField("value"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;
    /**
     * Atomically increments by one the current value.
     *
     * @return the updated value
     */
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
}

public final class Unsafe {
	public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
	public final int getAndAddInt(Object var1, long var2, int var4) {
	        int var5;
	        do {
	            var5 = this.getIntVolatile(var1, var2);
	        } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
	
	        return var5;
	}
}

这段代码是一个无限循环,也就是CAS的自旋(底层为do-while循环),循环体中做了三件事:

  1. 获取当前值
  2. 当前值 + 1,计算出目标值
  3. 进行CAS操作,如果成功则跳出循环,如果失败则重复上述步骤

这里需要注意的重点是get方法这个方法的作用是获取变量的当前值。

volatile关键字来保证(保证线程间的可见性)。

compareAndSet方法的实现很简单,只有一行代码。这里涉及到两个重要的对象,一个是unsafe,一个是valueOffset。 unsafe上面提到,就不用多说了,对于valueOffset对象,是通过unsafe.objectFiledOffset方法得到,所代表的是AtomicInteger对象value成员变量在内存中的偏移量。我们可以简单的把valueOffset理解为value变量的内存地址

我们上面说过,CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。

而unsafe的compareAndSwapInt方法的参数包括了这三个基本元素:valueOffset参数代表了V,expect参数代表了A,update参数代表了B。

正是unsafe的compareAndSwapInt方法保证了Compare和Swap操作之间的原子性操作。

CAS机制的优缺点

优点

一开始在文中我们曾经提到过,CAS是一种乐观锁,而且是一种非阻塞的轻量级的乐观锁,什么是非阻塞式的呢?其实就是一个线程想要获得锁,对方会给一个回应表示这个锁能不能获得。在资源竞争不激烈的情况下性能高,相比synchronized重量锁,synchronized会进行比较复杂的加锁,解锁和唤醒操作。

缺点

  • CPU开销过大:CAS摒弃了传统的锁机制,避免了因获取和释放锁产生的上下文切换和线程阻塞,从而显著提升了系统的并发性能。并且由于CAS操作是基于硬件层面的原子性保证,所以它不会出现死锁问题,这对于复杂并发场景下的程序设计特别重要。另外,CAS策略下线程在无法成功更新变量时不需要挂起和唤醒,只需通过简单的循环重试即可。

    但是,在高并发条件下,频繁的CAS操作可能导致大量的自旋重试,消耗大量的CPU资源。尤其是在竞争激烈的场景中,线程可能花费大量的时间在不断地尝试更新变量,而不是做有用的工作。这个由刚才cmpxchg函数可以看出。对于这个问题,我们可以参考synchronize中轻量级锁经过自旋,超过一定阈值后升级为重量级锁的原理,我们也可以给自旋设置一个次数,如果超过这个次数,就把线程挂起或者执行失败。(自适应自旋)。

    另外,Java中的原子类也提供了解决办法,比如LongAdder以及DoubleAdder等,LongAdder过分散竞争点来减少自旋锁的冲突。它并没有像AtomicLong那样维护一个单一的共享变量,而是维护了一个Base值和一组Cell(桶)结构。每个Cell本质上也是一个可以进行原子操作的计数器,多个线程可以分别在一个独立的Cell上进行累加,只有在必要时才将各个Cell的值汇总到Base中。这样一来,大部分时候线程间的修改不再是集中在同一个变量上,从而降低了竞争强度,提高了并发性能。

  • ABA问题: 单纯的CAS无法识别一个值被多次修改后又恢复原值的情况,可能导致错误的判断。在高并发场景下,使用CAS操作可能存在ABA问题,也就是在一个值被修改之前,先被其他线程修改为另外的值,然后再被修改回原值,此时CAS操作会认为这个值没有被修改过,导致数据不一致。

    为了解决ABA问题,Java中提供了AtomicStampedReference类(原子标记参考),该类通过使用版本号的方式来解决ABA问题。每个共享变量都会关联一个版本号,CAS操作时需要同时检查值和版本号是否匹配。因此,如果共享变量的值被改变了,版本号也会发生变化,即使共享变量被改回原来的值,版本号也不同,因此CAS操作会失败。

    在java中锁分为乐观锁和悲观锁。悲观锁是将资源锁住,等一个之前获得锁的线程释放锁之后,下一个线程才可以访问。而乐观锁采取了一种宽泛的态度,通过某种方式不加锁来处理资源,比如通过给记录加version来获取数据,性能较悲观锁有很大的提高。

  • 不能保证代码块的原子性: CAS机制所保证的知识一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用synchronized了。

Java的原子类就提供了类似的实现,如AtomicStampedReferenceAtomicMarkableReference引入了附加的标记位或版本号,以便区分不同的修改序列。

Java中CAS机制详解 - Java技术债务

Java中CAS机制详解 - Java技术债务

CAS应用场景

主要在并发编程的应用中非常的广泛,通常用于实现乐观锁和无锁算法

  • 线程安全计数器:由于CAS操作是原子性的,因此CAS可以用来实现一个线程安全的计数器;
  • 队列: 在并发编程中,队列经常用于多线程之间的数据交换。使用CAS可以实现无锁的非阻塞队列(Lock-Free Queue);
  • 数据库并发控制: 乐观锁就是通过CAS实现的,它可以在数据库并发控制中保证多个事务同时访问同一数据时的一致性;
  • 自旋锁: 自旋锁是一种非阻塞锁,当线程尝试获取锁时,如果锁已经被其他线程占用,则线程不会进入休眠,而是一直在自旋等待锁的释放。自旋锁的实现可以使用CAS操作;
  • 线程池: 在多线程编程中,线程池可以提高线程的使用效率。使用CAS操作可以避免对线程池的加锁,从而提高线程池的并发性能。

CAS机制优化

虽然CAS机制具有很多优点,但在实际应用中也存在一些问题,如自旋等待导致的CPU资源浪费等。为了优化CAS机制的性能,可以采取以下措施:

  1. 自适应自旋:根据历史经验动态调整自旋次数,避免过多的自旋等待。
  2. 批量操作:将多个CAS操作组合成一个原子块,减少CAS操作的次数。
  3. 减少锁竞争:通过合理的数据结构设计和线程调度策略,减少CAS操作的竞争,提高并发性能。

总结

Java中的CAS原理及其在并发编程中的应用是一项非常重要的技术。CAS利用CPU硬件提供的原子指令,实现了在无锁环境下的高效并发控制,避免了传统锁机制带来的上下文切换和线程阻塞开销。Java通过JNI接口调用底层的CAS指令,封装在jdk.internal.misc类和java.util.concurrent.atomic包下的原子类中,为我们提供了简洁易用的API来实现无锁编程。

CAS在带来并发性能提升的同时,也可能引发循环开销过大、ABA问题等问题。针对这些问题,Java提供了如LongAdderAtomicStampedReferenceAtomicMarkableReference等工具类来解决ABA问题,同时也通过自适应自旋、适时放弃自旋转而进入阻塞等待等方式降低循环开销。

理解和熟练掌握CAS原理及其在Java中的应用,有助于我们在开发高性能并发程序时作出更明智的选择,既能提高系统并发性能,又能保证数据的正确性和一致性。

   登录后才可以发表呦...

专注分享Java技术干货,包括
但不仅限于多线程、JVM、Spring Boot
Spring Cloud、 Redis、微服务、
消息队列、Git、面试题 最新动态等。

想交个朋友吗
那就快扫下面吧


微信

Java技术债务

你还可以关注我的公众号

会分享一些干货或者好文章

Java技术债务