All Articles

AQS深入剖析

前言

JAVA并发编程离不开JDK的JUC包,JUC包又离不开ReentrantLock、CountdownLatch、ThreadPoolExecutor等并发编程常用类,而AQS作为这位这些常用类的核心原子操作的实现,了解其原理就是体现技术深度的一个重要指标。

本文将从AQS的基本概念出发,从具体常用实践类从上至下地剖析AQS的作用以及功能,最后也引出更底层基本概念不同CPU架构下锁选择以及不同锁选择的一些讨论,

大纲(to delete)

AQS是什么东西?-> 从ReentrantLock看AQS怎么用 -> 剖析AQS的原理 -> 引出CLH自旋锁概念以及NUMA架构的概念,再拓展出自旋锁和互斥锁之争。(明天写)

一、什么是AQS?

JAVA中讲的AQS指的是JUC模块java.util.concurrent.locks包下的AbstractQueuedSynchronizer类,之所以这么被重视是因为我们常用的JDK中的锁和一些同步器就是依赖其实现的。
以下是官方Javadoc的说法,意思是AQS就是一个提供依赖先进先出等待队列来实现阻塞锁和相关同步器的框架,是大多数依赖原子数字值来表示状态的同步器的核心。

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. 

单看上面的说法可能还是不太好理解,所以接下来会结合具体案例来看AQS是如何起作用的,进而剖析其

二、基于AQS实现的常见类

ReentrantLock

ReentrantLock是可重入锁,主要用到的方法有tryLock和unLock方法,接下来看下其具体实现

  1. trylock
public boolean tryLock() { 
   return sync.tryLock();
}
  1. unLock
public void unlock() {
    sync.release(1);
}
  1. newCondition
public Condition newCondition() {  
      return sync.newCondition();
}
  1. lock
public void lock() {
    sync.lock();
}

可以看出核心逻辑完全是由sync这个对象实现的,其实现代码如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
    //tryLock方法
    final boolean tryLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            } else if (getExclusiveOwnerThread() == current) {
                if (++c < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(c);
                return true;
            }
            return false;
        }
    //lock方法
    final void lock() {
        if (!initialTryLock())
            acquire(1);
    }
    //release方法继承AQS

    //newCondition方法
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    
    //还包含很多继承的方法,如tryAcquireShared
}
  1. tryLock剖析

tryLock的方法逻辑比较清晰

  • 如果锁状态为0,则可以获取锁,返回true
  • 如果锁状态非0,且获取排它锁线程是当前线程,则状态还是设置为+1后设置成当前锁的状态,返回true
  • 其他情况返回false

核心处理方法在getState上,所以看一下getState的实现

getState实现

    protected final int getState() {
        return state;
    }

仅仅是返回了一个当前状态的变量,整体都没什么核心处理逻辑,那就只能直接看构造了

  1. lock方法剖析

先通过子类(公平/不公平)的initialTryLock 初始化状态,然后通过acquire方法获取锁,这里acquire的参数写死为1,表明ReentrantLock只能争抢一个资源,源码如下

 // FairSync实现
 final boolean initialTryLock() {
     Thread current = Thread.currentThread();
     int c = getState();
     if (c == 0) {
         if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     } else if (getExclusiveOwnerThread() == current) {
         if (++c < 0) // overflow
             throw new Error("Maximum lock count exceeded");
         setState(c);
         return true;
     }
     return false;

    //UnFairSync实现
    final boolean initialTryLock() {
        Thread current = Thread.currentThread();
        if (compareAndSetState(0, 1)) { // first attempt isunguarded
            setExclusiveOwnerThread(current);
            return true;
        } else if (getExclusiveOwnerThread() == current) {
            int c = getState() + 1;
            if (c < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(c);
            return true;
        } else
            return false;
    }
 }
    //acquire方法,AQS中定义
     public final void acquire(int arg) {
        if (!tryAcquire(arg))
            acquire(null, arg, false, false, false, 0L);
    }
    
    //FairSync的tryAcquire实现
    protected final boolean tryAcquire(int acquires) {
            if (getState() == 0 && !hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
    //NoneFairSync的tryAcquire实现
    protected final boolean tryAcquire(int acquires) {
        if (getState() == 0 && compareAndSetState(0,acquires)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
    return false;
    }

上面可以看出公平锁和不公平锁的区别在于条件判断上,公平锁会先看下是否队列中有有线程,非公平锁直接争抢,而这里的线程判断 hasQueuedPredecessors()和hasQueuedThreads()方法就是在AQS中定义的,具体逻辑就不展开了,有兴趣可以自行查看源码。

核心的Lock方法需要查看acquire方法内部逻辑如下,内部使用双向队列存储线程的状态,本质是实现了CLH锁的一个变种,当获取到锁时会被作为头结点放入CLH队列中,如果没有获取到锁会被放到CLH队列中并自旋等待,支持共享和独占两种模式,还支持公平和不公平锁两种模式。

    final int acquire(Node node, int arg, boolean shared,
                      boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // retries upon unpark of first thread
        boolean interrupted = false, first = false;
        Node pred = null;               // predecessor of node when enqueued

        /*
         * Repeatedly:
         *  Check if node now first
         *    if so, ensure head stable, else ensure valid predecessor
         *  if node is first or not yet enqueued, try acquiring
         *  else if queue is not initialized, do so by attaching new header node
         *     resort to spinwait on OOME trying to create node
         *  else if node not yet created, create it
         *     resort to spinwait on OOME trying to create node
         *  else if not yet enqueued, try once to enqueue
         *  else if woken from park, retry (up to postSpins times)
         *  else if WAITING status not set, set and retry
         *  else park and clear WAITING status, and check cancellation
         */

        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null &&
                !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            Node t;
            if ((t = tail) == null) {           // initialize queue
                if (tryInitializeHead() == null)
                    return acquireOnOOME(shared, arg);
            } else if (node == null) {          // allocate; retry before enqueue
                try {
                    node = (shared) ? new SharedNode() : new ExclusiveNode();
                } catch (OutOfMemoryError oome) {
                    return acquireOnOOME(shared, arg);
                }
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }

    private int acquireOnOOME(boolean shared, int arg) {
        for (long nanos = 1L;;) {
            if (shared ? (tryAcquireShared(arg) >= 0) : tryAcquire(arg))
                return 1;
            U.park(false, nanos);               // must use Unsafe park to sleep
            if (nanos < 1L << 30)               // max about 1 second
                nanos <<= 1;
        }
    }

总结: 可以看的出核心的同步逻辑都在AQS中定义,仅提供状态的设置以及几个重写方法由子类扩展来实现高级功能。 可以重写的方法如下,默认都是抛出unsupported异常:

  1. isHeldExclusively 是否线程独占的判断方法,ReentrantLock中是判断当前线程是不是当前获取锁的线程,CountDownLatch没有用到这个方法,所以没有实现。
  2. tryReleaseShared(int arg):共享模式的tryRealse方法,release逻辑一般同tryRelease
  3. tryAcquireShared(int arg):共享模式的tryAcquire,release逻辑一般同tryRlease
  4. tryRelease(int arg) : 定义可以release的条件,countdown为count减到0来判断,ReentrantLock为当前state减1,如果等于0则可以释放。
  5. tryAcquire(int arg): ReentrantLock里此方法为了区分公平锁和非公平锁,内部有队列判断。

三、总结AQS

  1. AQS核心功能

提供了公平锁和非公平锁两种锁以及独占锁和共享锁两种模式的的代码封装支持,方便快速实现线程同步操作。

  1. 原理

依赖于CLH变种实现了线程资源争抢时同步阻塞等待的核心逻辑,且依赖于自旋方式来避免线程上下文的切换进而节省耗时。

  核心操作主要涉及几个情况
  • 获取锁成功,直接加入CLH列头
  • 获取锁失败,加入CLH列尾
  • 释放锁移除节点,head指向下一个获得锁的节点,新获得锁的节点prev前置节点置为null。

Published Jun 27, 2024

Software Engineer from China, currently more attention to the automation field of ai interaction