CAS与Unsafe源码笔记

乐观锁与悲观锁

乐观锁

总是假设最好的情况,每次去操作数据的时候都认为别人不会修改,所以不会加锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制CAS实现。在Java中java.util.concurrent.atomic包下面的原子变量类就是使用了乐观锁的一种实现方式CAS实现的。乐观锁适用于读多写少的应用场景,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。

悲观锁

总是假设最坏的情况,每次去操作数据的时候都认为别人会修改,所以每次在拿数据的时候都会先上锁,这样别人想拿这个数据就会阻塞,直到它拿到锁(共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。传统的关系型数据库里边就用到了很多这种锁机制,比如行锁,表锁等,读锁,写锁等,都是在操作之前先上锁。$Java$中synchronizedReentrantLock等独占锁就是悲观锁思想的实现。悲观锁比较适合多写的场景。

CAS作用原理

$CAS(Compare \ And \ Swap)$ 也就是比较并交换的意思,在 $CAS$ 中有三个参数:

  • $V$ : 要更新的变量( $var$ )
  • $E$ : 期望值( $expected$ )
  • $U$: 新值( $update$ )

在 $CAS$ 操作过程中,判断 $V$ 是否等于 $E$,如果等于,将 $V$ 的值设置为 $U$;如果不等,说明已经有其它线程更新了 $V$,则当前线程放弃更新,什么都不做。如果用代码可以表示为:

1
2
3
4
5
6
7
8
public boolean compareAndSwap(int value, int expected, int update) {
if (value == expect) {
value = update;
return true;
} else {
return false;
}
}

AtomicInteger

AtomicIntegerjava.util.concurrent.atomic包下的一个原子类,那么AtomicInteger是怎么实现原子性的呢?点开源码看到:

1
2
3
private static final Unsafe unsafe = Unsafe.getUnsafe(); // Unsafe 类
private static final long valueOffset; // 偏移量
private volatile int value; // 整型变量

往下随手一翻可以看到很多使用了 $CAS$ 的方法

1
2
3
4
5
6
7
8
9
10
11
12
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final int getAndAccumulate(int x, IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}

Unsafe

AtomicInteger的源码中可以看到一个动作出现了不止一次。

1
2
3
4
5
6
7
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}

这个理的unsafe就是前面所说的Unsafe对象,加上限定名后可以表示为:

1
private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();

我们拿下面这个变量来分析一下。

1
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

compareAndSwapInt(x, x, x, x)这个方法是比较内存中的一个整形值是否和我们的期望值var4一样,如果一样则将内存中的这个值更新为var5,参数var1是值所在的对象(AtomicInteger),参数var2是值在对象中var1的内存偏移量,它们的组合作用是计算出值所在的内存的地址。

看到native这和关键字就应该想到这已经不是java语言编写的库了,通常是C/C++编写的。

在此借用 薛8的一张图。

img

  • Unsafe.java
    • 将对象引用、值在对象中的偏移量、期望的值和打算更新的新值传递给Unsafe.cpp
    • 如果值更新成功则返回true给调用者,否则返回false
  • Unsafe.cpp
    • 接收从Unsafe传递过来的对象引用、偏移量、期望值与更新值,根据对象引用和偏移量计算出值的地址,然后将值地址,期望值,更新值传递给$CPU$
    • 如果更新成功则返回trueUnsafe.java,否则返回false
  • CPU
    • 接收从Unsafe.cpp传递过来的地址、期望值、更新值,执行指令 $cmpxchg(x, addr, e)$,比较地址中的值是否和期望值一样。是则更新,否则什么也不做。在此cmpxchg可以理解为 $(compare\ and\ change)$,也就是比较并改变的意思。
    • 将操作结果返回给Unsafe.cpp

CAS实现原子操作的代价

ABA问题

所谓 $ABA$ 问题,也就是说一个值原来是 $A$ ,然后被改成了 $B$ ,最后又被改成了 $A$ 。而 $CAS$ 比较的重点就是它期待的值是不是 $A$ ,如果是就更新。

$A$ :“二狗,你再也不是当年的那个二狗了。”

二狗:“$A$,你再也不是当时的那个 $A$ 了。”

但是 $CAS$ 本身是检查不出这个变化的。

为了解决这个问题,可以使用版本号机制或者时间戳,也就是加上时间的判断(现在的我们总还不能与时间为敌吧?),在 $MySQL$ 的写操作常常会加上这个,在 $J.U.C$ 下也提供了相应的实现,也就是java.util.concurrent.atomic下的AtomicStampedReference

这个类的compareAndSet方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果二者都相等,才使用 $CAS$ 设置为新的值和标志。

1
2
3
4
5
6
7
8
9
10
11
12
public boolean compareAndSet(V   expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

循环开销可能会很大

$CAS$多与自旋结合。如果自旋 $CAS$ 长时间不成功,会占用大量的 $CPU$ 资源。

解决思路是

  • 让 $JVM$ 支持处理器提供的 $pause$ 指令。$pause$ 指令能让自旋失败时 $CPU$ 睡眠一小段时间再继续自旋,从而使得读操作的频率低很多,为解决内存顺序冲突而导致的 $CPU$ 流水线重排的代价也会小很多。
  • 限制自旋次数。比如重试 $10$ 次失败后暂时放弃,歇歇再来。

只能保证一个共享变量的原子操作

CAS的原子操作只能针对一个共享变量,可以用以下两种方法解决。

  • 使用 $JDK 1.5$ 开始就提供的AtomicReference类保证对象之间的原子性,把多个变量放到一个对象里面进行 $CAS$ 操作
  • 使用锁,锁内的临界区代码可以保证只有当前线程能操作。

CAS的应用

CAS操作并不会锁住共享变量,是一种非阻塞的同步机制,CAS就是乐观锁的实现。

Java利用CAS的乐观锁、原子性的特性高效解决了多线程的安全性问题,例如 $JDK1.8$ 中的集合类ConcurrentHashMap、关键字volatileReentrantLock等。

参考

评论