JAVA并发编程离不开JDK的JUC包,JUC包又离不开ReentrantLock、CountdownLatch、ThreadPoolExecutor等并发编程常用类,而AQS作为这位这些常用类的核心原子操作的实现,了解其原理就是体现技术深度的一个重要指标。
本文将从AQS的基本概念出发,从具体常用实践类从上至下地剖析AQS的作用以及功能,最后也引出更底层基本概念不同CPU架构下锁选择以及不同锁选择的一些讨论,
AQS是什么东西?-> 从ReentrantLock看AQS怎么用 -> 剖析AQS的原理 -> 引出CLH自旋锁概念以及NUMA架构的概念,再拓展出自旋锁和互斥锁之争。(明天写)
JAVA中讲的AQS指的是JUC模块java.util.concurrent.locks包下的AbstractQueuedSynchronizer类,之所以这么被重视是因为我们常用的JDK中的锁和一些同步器就是依赖其实现的。
以下是官方Javadoc的说法,意思是AQS就是一个提供依赖先进先出等待队列来实现阻塞锁和相关同步器的框架,是大多数依赖原子数字值来表示状态的同步器的核心。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. 单看上面的说法可能还是不太好理解,所以接下来会结合具体案例来看AQS是如何起作用的,进而剖析其
ReentrantLock是可重入锁,主要用到的方法有tryLock和unLock方法,接下来看下其具体实现
public boolean tryLock() {
return sync.tryLock();
}public void unlock() {
sync.release(1);
}public Condition newCondition() {
return sync.newCondition();
}public void lock() {
sync.lock();
}可以看出核心逻辑完全是由sync这个对象实现的,其实现代码如下:
abstract static class Sync extends AbstractQueuedSynchronizer {
//tryLock方法
final boolean tryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
//lock方法
final void lock() {
if (!initialTryLock())
acquire(1);
}
//release方法继承AQS
//newCondition方法
final ConditionObject newCondition() {
return new ConditionObject();
}
//还包含很多继承的方法,如tryAcquireShared
}tryLock的方法逻辑比较清晰
核心处理方法在getState上,所以看一下getState的实现
getState实现
protected final int getState() {
return state;
}仅仅是返回了一个当前状态的变量,整体都没什么核心处理逻辑,那就只能直接看构造了
先通过子类(公平/不公平)的initialTryLock 初始化状态,然后通过acquire方法获取锁,这里acquire的参数写死为1,表明ReentrantLock只能争抢一个资源,源码如下
// FairSync实现
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
//UnFairSync实现
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt isunguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
}
//acquire方法,AQS中定义
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
//FairSync的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && !hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//NoneFairSync的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0,acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}上面可以看出公平锁和不公平锁的区别在于条件判断上,公平锁会先看下是否队列中有有线程,非公平锁直接争抢,而这里的线程判断 hasQueuedPredecessors()和hasQueuedThreads()方法就是在AQS中定义的,具体逻辑就不展开了,有兴趣可以自行查看源码。
核心的Lock方法需要查看acquire方法内部逻辑如下,内部使用双向队列存储线程的状态,本质是实现了CLH锁的一个变种,当获取到锁时会被作为头结点放入CLH队列中,如果没有获取到锁会被放到CLH队列中并自旋等待,支持共享和独占两种模式,还支持公平和不公平锁两种模式。
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if queue is not initialized, do so by attaching new header node
* resort to spinwait on OOME trying to create node
* else if node not yet created, create it
* resort to spinwait on OOME trying to create node
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
Node t;
if ((t = tail) == null) { // initialize queue
if (tryInitializeHead() == null)
return acquireOnOOME(shared, arg);
} else if (node == null) { // allocate; retry before enqueue
try {
node = (shared) ? new SharedNode() : new ExclusiveNode();
} catch (OutOfMemoryError oome) {
return acquireOnOOME(shared, arg);
}
} else if (pred == null) { // try to enqueue
node.waiter = current;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
private int acquireOnOOME(boolean shared, int arg) {
for (long nanos = 1L;;) {
if (shared ? (tryAcquireShared(arg) >= 0) : tryAcquire(arg))
return 1;
U.park(false, nanos); // must use Unsafe park to sleep
if (nanos < 1L << 30) // max about 1 second
nanos <<= 1;
}
}总结: 可以看的出核心的同步逻辑都在AQS中定义,仅提供状态的设置以及几个重写方法由子类扩展来实现高级功能。 可以重写的方法如下,默认都是抛出unsupported异常:
提供了公平锁和非公平锁两种锁以及独占锁和共享锁两种模式的的代码封装支持,方便快速实现线程同步操作。
依赖于CLH变种实现了线程资源争抢时同步阻塞等待的核心逻辑,且依赖于自旋方式来避免线程上下文的切换进而节省耗时。
核心操作主要涉及几个情况