<
深入理解AQS和CAS原理
>
上一篇

DVM及ART是如何对JVM进行优化的?
下一篇

偏向锁,轻量级锁,重量级锁

深入理解AQS和CAS原理

AQS全称是Abstract Queued Synchronizer,一般翻译为同步器。它是一套实现多线程同步功能的框架。AQS在源码中被广泛使用尤其是JUC(java Util Concurrent)中,比如ReentrantLock,Semaphore,CountDownLatch,ThreadPoolExecutor。理解AQS对我们理解JUC中其他组件至关重要,并且在实际开发中也可以通过自定义AQS 来实现各种需求场景。

理解AQS需要熟悉双端队列,Unsafe

一、ReentrantLock 和 AQS 的关系

通过ReentrantLock来理解AQS内部工作机制。

从ReentrantLock的lock()方法可以看到

public void lock(){
    sync.lock();
}

只是调用了一个sync的lock方法,这个sync是什么呢?

public class ReentrantLock implements Lock{
    private final Sync sync;
    
    abstract static class Sync extends AbstractQueuedSynchronizer {
        //......
    }
}

可以看到,sync是ReentrantLock中的一个内部类。ReentrantLock并没有直接继承AQS,而是通过内部类sync来扩展AQS的功能。然后ReentrantLock中存有sync的全局变量引用。

sync在ReentrantLock中两种实现:NonfairSync和FairSync,对应非公平锁和公平锁。以非公平锁代码为例:sync.lock()

static final class NofairSync extends Sync{
    final void lock(){
    //通过CAS操作来修改state状态,表示争抢锁的操作
        if(compareAndSetState(0,1)){
          //设置当前获得锁状态的线程setExclusiveOwnerThread(Thread.currentThread);
        }else{
        //修改状态失败,尝试去获取锁
            acquire(1);
        }
    }
    
    //复写。真正功能由子类同步器实现
    protected final boolean tryAcquire(int acquires){
    	return nofairTryAcquire(acquires);
	}
}

可以看出在非公平锁中的lock()方法中 ,主要做了如下操作:

  1. 通过CAS设置变量State(同步状态)成功,表示当前线程获取锁成功,则将当前线程设置为独占线程。
  2. 通过CAS设置State失败,表示当前锁正在被其他线程持有,则进入acquire()方法进行后续处理

acquire()方法 定义在AQS中:

public final void acquire(int arg){
 if(!tryAcquire(arg)&&
 acquireQueued( addWaiter(Node.EXCLUSIVE),arg) ){
        selfInterrupt();
    }
}

acquire()方法是一个比较重要的方法,将其拆为3个主要步骤:

  1. tryAcquire():方法主要目的是尝试获取锁
  2. addWaiter():如果tryAcquire()尝试获取锁失败则调用addWaiter,将当前线程添加到一个等待队列中
  3. acquireQueued:处理加入到队列中的节点,通过自旋 去尝试获取锁,根据情况将线程挂起或取消。

以上3个方法都被定义在AQS中,但其中tryAcquire()有点特殊,实现如下:

protected boolean tryAcuqire(int arg){
    throw new UnsupportedOperationException()
}

默认情况下直接抛异常,因此它需要在子类中复写 ,也就是说真正的获取锁的逻辑由子类同步器自己实现

ReentrantLock中tryAcquire的实现(非公平锁)如下:

//NonfairSync extends Sync
protect final boolean tryAcuqire(int acquires){
    return nofairTryAcquire(acquires);
}

//Sync extends AbstractQueuedSynchronizer
final boolean nofairTryAcquire(int acquires){
    //获得当前执行的线程
    final Thread cur = Thread.currentThread();
    int c = getState();//获得state的值,volatile
    //c==0当前无锁状态
    if (c==0){
    	//通过CAS操作来替换state的值为1
        if(compareAndSetSate(0,acquires)){
        	setExclusiveOwnerThread(cur);
        	return true;
        }
        //如果是同一个线程来获得锁,则直接增加重入次数
    }else if (cur == getExclusiveOwnerThread()){
    	//增加重入次数
        int nextc = c+acquires;
        //overflow
        if(nextc<0){
            throw new Error("maximum lock count exceeded");
        }else{
            setState(nextc);
            return true;
        }
    }
    return false;
}

解释说明:

  1. 获取当前线程,判断当前的锁的状态
  2. 如果state=0,表示当前是无锁状态,通过CAS更新state状态的值,返回true
  3. 如果当前线程属于重入,则增加重入次数,返回true
  4. 上述情况都不满足,则获取锁失败,返回false

在ReentrantLock执行lock()的过程中,大部分同步机制的核心逻辑都已经在AQS中实现 ,ReentrantLock自身只要实现某些特定步骤下的方法即可,这种设计模式叫模板模式 。Activity的生命周期就是这样的,生命周期的执行流程都已在framework中定义好了,子Activity只要在相应的onCreate等方法中提供相应的实现即可。

二、AQS核心功能原理分析

首先看下AQS中几个关键属性

static final class Node{
    ...
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;//The synchronization state.

展示了AQS中两个比较重要的属性Node和state

1.state锁状态

state表示当前锁状态。当state=0时表示无锁状态;state>0时,表示已经有线程获得了锁,也就是state=1,如果一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5.而在释放锁的时候,同样需要释放5次直到state=0,其他线程才有资格获得锁

state还可以实现锁的独占模式或者共享模式

  1. 独占模式:只有一个线程能够持有同步锁

    在独占模式下,我们可以把state的初始值设置为0,当某个线程申请锁对象时,需要判断state的值是不是0,如果不是0就代表其他线程已经持有该锁 ,则本线程需要阻塞等待。

  2. 共享模式:可以有多个线程持有同步锁

    比如某项操作允许10个线程同时进行,超过这个数量的线程就需要阻塞等待。那么只需要在线程申请对象时判断state的值是否小于10。如果小于10,就将state+1后继续执行同步语句;如果=10,说明已经有10线程在同时执行,本线程需要阻塞等待。

2.Node双端队列节点

Node是一个先进先出的双端队列,并且是等待队列 ,当多个线程争用资源被阻塞时,会进入此队列 。这个队列是AQS实现多线程同步的核心。

ReentrantLock中看到,在AQS中有两个Node的指针,分别指向队列的head,tail。Node主要结构如下:

static final class Node{
	//该等待同步的节点处于共享模式
    static final Node SHARED = new Node();
    //该等待同步的节点处于独占模式
    static final Node EXCLUSIVE = null;
    
    //Node中的线程状态,和state不一样:有1,0,-1,-2,-3五个
    volatile int waitStatus;
    static final int CANCELLED =1;
    static final int SIGNAL =-1;
    static final int CONDITION =-2;
    static final int PROPAGATE =-3;
    
    volatile Node prev;//前驱节点
    volatile Node next;//后继节点
    volatile Thread thread;//等待锁的线程
    ...
}

3.获取锁失败后续流程分析

锁的意义:就是使竞争到锁对象的线程执行同步代码块 ,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock如何实现让线程等待并唤醒的? (队列!!!)

前面提到过在ReentrantLock.lock()阶段,在acquire()方法中会先后调用tryAcquire,addWaiter,acquireQueued这3方法来处理。tryAcquire在ReentrantLock中被复写,如果返回true说明获取锁成功,就继续执行同步代码块。

可是tryAcquire返回false,也就是当前锁对象被其他线程持有,那么当前线程会被AQS如何处理?(addWaiter,acquireQueued)

  1. addWaiter

    首先当前获取锁失败的线程会被添加到一个等待队列的末端

    //将线程以Node的方式添加到队列中
    private Node addWriter(Node node){
    	//当前线程封装一个新Node
        Node node = new Node(Thread.cur(),node);
        Node pred = tail;
        //将Node插入队列
        if(pred!=null){
            node.prev=pred;
            //CAS替换当前尾部,成功则返回
            if(compareAndSetTail(pred,node)){
                pred.next=node;
                return node;
            }
        }
        //插入队列失败,进入enq自旋重试入队
        enq(node);
        return node;
    }
       
    //插入节点到队列中,如果队列未初始化,则初始化再插入
    private Node enq(final Node node){
        for(;;){
            Node t =tail;
            //如果队列从未被初始化好,需要初始化一个空Node
            if(t==null){
                if(compareAndSetHead(new Node())){
                    tail = head;
                }
            }else{
                node.prev=t;
                if(compareAndSetTail(t,node)){
                    t.next=node;
                    return t;
                }
            }
        }
    }
    //SDK28,直接改成了enq的方式,直接自旋,在里面进行插入或者就是初始化。
    

    有两种情况会导致插入队列失败:

    1. tail为空:说明队列从未初始化,因此需要调用enq方法在队列中插入一个空的Node
    2. compareAndSetTail失败:说明插入过程中有线程修改了此队列,因此需要调用enq将当前Node重新插入到队列末端

    经过addWaiter之后,此时线程以Node方式被加入到队列的末端 ,但是线程还有没有被执行阻塞操作,真正的阻塞操作是在下面的acquireQueued方法中

  2. acquireQueued

    在acquireQueued方法中并不会立即挂起该节点中的线程 ,因为在插入节点的过程中,之前持有锁的线程可能已经执行完了释放了锁(严谨),所以这里使用自旋再次去尝试获取锁 。如果自旋还没获取到!那么就将该线程挂起(阻塞)。

    //在队列中的节点通过此方法获取锁
    final boolean acquireQueued(final Node node,int arg){
    boolean failed = true;
    try{
        boolean interrupted =false;
        for(;;){
            final Node p = node.predecessor();//Returns previous node
            /*
            *检测当前节点前驱是否head,这是尝试获取锁的资格
            *如果是的话,则调用tryAcquire尝试获取锁,
            *成功,则将head置为当前节点
            */
            if(p==head && tryAcquire(arg)){
                setHead(node);
                p.next = null; //help GC
                failed = false;
                return interrupted;
            }
            /*
            *如果未成功获取锁则根据前驱节点判断是否要阻塞
            *如阻塞过程中被中断,则置interrupt标志位为true
            *shouldParkAfterFailedAcquire方法在前驱状态不
            *为SIGNAL的情况下都会循环重试获取锁
            */
            if(shouldParkAfterFailedAcquire(p,node))&&
            parkAndCheckInterrupt()){
                interrupt = true;
            }
        }
    }finally{
        if(failed){
            //Cancels an ongoing attempt to acquire.
            cancelAcquire(node);
        }
    }
    }
    

    在shouldParkAfterFailedAcquire方法中会判断当前线程是否应该被挂起

    //根据前驱节点中的waitStatus来判断是否需要阻塞当前线程
    private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
        //获取前驱节点的状态
        int ws = pred.waitStatus;
        if(ws == Node.SIGNAL){
            //如果是SIGNAL状态,返回true将当前线程挂起
            return true;
        }
        if(ws > 0){
            /*
            *前驱节点状态为取消,向前遍历,更新当前节点
            *的前驱为往前第一个非取消节点。当前线程之后会
            *再次回到循环并尝试获取锁
            */
            do{
                node.prev = pred = pred.prev;
            }while(pred.waitStatus > 0);
            pred.next = node;
        }else{
            /*
            *等待状态为0或者PROPAGATE(-3),设置前驱的
            *等待状态为SIGNAL。并且之后会回到循环再次
            *重试获取锁
            */
            compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
        }
        return false;
    }
    

    首先获取前驱节点的waitStatus值,Node中的waitStatus一共5种取值:

    waitStatue值 描述
    CANCELLED(1) 当前线程因为超时或中断被取消。这是一个终结态,也就是状态到此为止
    SIGNAL(-1) 当前线程的后继线程被阻塞或即将被阻塞,当前线程释放锁或取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的
    CONDITION(-2) 当前线程在condition队列种
    PROPAGATE(-3) 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的
    0 表示无锁状态

    接下来根据waitStatus不同的值进行不同的操作,有以下情况:

    1. 如果waitStatus等于SIGNAL,返回true将当前线程挂起,等待后续唤醒操作即可。
    2. 如果waitStatus大于0也就是CANCLE状态,会将此前驱节点从队列中删除,并在循环中逐步寻找下一个不是“CANCEL”状态的节点作为当前节点的前驱节点。
    3. 如果waitStatus既不是SIGNAL也不是CANCEL,则将当前节点的前驱节点状态设置为SIGNAL,这样做的好处是下一次执行shouldParkAfterFailedAcquire时可以直接返回true,挂起 线程。

    代码再回到acquireQueued中,如果shouldParkAfterFailedAcquire返回true表示线程需要被挂起,那么会继续调用parkAndCheckInterrupt()方法执行真正的阻塞线程代码

    private final boolean parkAndCheckInterrupt(){
        LockSupport.park(this);
        return Thread.interrupted();
    }
       
    //LockSupport.java
    public static void park(Object blocker){
        Thread t = Thread.currentThread();
        setBlocker(t,blocker);
        U.park(false,0L);
        setBlocker(t,null);
    }
    

    只是调用了LockSupport中的park()方法。在LockSupport.park()方法中调用了Unsafe API来执行底层natice方法将线程挂起,代码到这已经到OS层面,没必要再深入分析。

    至此,获取锁的大体流程已分析完,总结:

    1. AQS的模板方法acquire通过调用子类自定义实现的tryAcquire获取锁
    2. 如果获取锁失败,通过addWaiter()方法将线程构造成Node节点插入到同步队列队尾
    3. 在acquireQueued()方法中以自旋的方式尝试获取锁 ,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行LockSupport(Unsafe)中的native API 来实现线程阻塞。

    4.释放锁流程分析

    在上面加锁阶段被阻塞的线程需要被唤醒后才可以重新执行 (阻塞状态)。那具体AQS是何时尝试唤醒等待队列中被阻塞的线程呢?

    同加锁过程一样,释放锁需要从ReentrantLock.unlock()方法开始

    public void unlock(){
        sync.release(1);
    }
       
    //AbstractQueuedSynchronizer
    public final boolean release(int arg){
        if(tryRelease(arg)){
            Node h = head;
            if(h!=null && h.waitStatus!=0){
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }
    

    首先调用tryRelease()方法尝试释放锁,如果成功最终会调用AQS中的unparkSuccessor方法来实现释放锁的操作。

    private void unparkSuccessor(Node node){
        //获取头节点waitStatus
        int ws = node.waitStatus;
        if(ws < 0){
            compareAndSetWaitStatus(node,ws,0);
        }
        //获取当前节点(实际是head节点)的下一个节点
        Node s = node.next;
        //如果下个节点是null或是CANCEL状态
        //就找到队列最开始的非CANCEL的节点
        if(s == null || s.waitStatus > 0){
            s = null;
            //从尾部节点开始找,到队首
            //找到队列第一个waitStatus < 0的节点
            for(Node t=tail;t!=null&&t!=node;t=t.prev){
                if(t.waitStatus <= 0){
                    s = t;
                }
            }
        }
        //如果当前节点的下个节点!=null,且状态<=0。unpark
        if(s != null){
            LockSupport.unpark(s.thread);
        }
    }
    

    首先获取当前节点(实际上传入的是head节点)的状态,如果head节点的下一个节点是null,或者下一个节点的状态为CANCEL,则从等待队列的尾部开始遍历,直到寻找到第一个waitStatus小于0的节点。

    如果最终遍历到的节点!=null,再调用LockSupport.unpar()方法,调用底层方法唤醒线程。至此,线程被唤醒的时机也分析完了。

二、CAS

不管是加锁还是释放锁的阶段,多次提到一种通用操作:compareAndSetXXX。这种操作最终会调用Unsafe中的API进行CAS操作。

CAS全称是:Compare And Swap,译为比较和替换,是一种通过硬件实现并发安全 的常用技术,底层是利用CPU的CAS指令对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。

它的实现过程主要有3个操作数:内存值V,旧的预期值E,要修改的新值U,当且仅当预期值E和内存值V相同时 ,才将内存值V修改为U,否则什么都不做。

三、自定义AQS

public class MyLock{
    private Sync sync = new Sync();
    
    //加锁
    public void lock(){
        sync.acquire(1);
    }
    
    //释放锁
    public void unlock(){
        sync.release(1);
    }
    
    static class Sync extends AbstractQueuedSynchroizer{
        @Override
        protected boolean tryAcquire(int arg){
            return compareAndSetState(0,1);
        }
        
        @Override
        protected boolean tryRelease(int arg){
            setState(0);
            return true;
        }
    }
}

代码中的MyLock就是一个最简单的独占锁,通过使用MyLock也能实现同synchronized和ReentrantLock相同的功能,如下:

static int count = 0;
static MyLock myLock = new MyLock();

@Test
public void testMyLock() throws InterruptException{
    Runnable runnable = new Runnable(){
        @Override
        public void run(){
            try{
                myLock.lock();
                for(int i=0;i<1000;i++){
                    count++;
                }
            }catch(Exception e){
                print(e)
            }finally{
                myLock.unlock();
            }
        }
    };
    
    Thread thread1 = new Thread(runnable);
    Thread thread2 = new Thread(runnable);
    thread1.start();
    thread2.start();
    thread1.join();
    thread2.join();
    print(count);
}

打印200,说明线程安全的同步操作。

四、总结

总体来说,AQS是一套框架,在框架内部已经封装好了大部分同步需要的逻辑,,在AQS内部维护了一个状态指示器state和一个等待队列Node,而通过state的操作又分为两种:独占式和共享式,这就导致AQS有两种不同的实现:独占锁(ReentrantLock)和分享锁(CountDownLatch,读写锁)。这里主要从独占锁的角度来跟进的分析AQS加锁和释放锁的流程。

自定义时有几个可能需要子类同步器实现:

  1. lock()
  2. tryAcquire(int):独占方式。尝试获取资源,成功true,失败false
  3. tryRelease(int)
  4. tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但无剩余资源;正数表示成功且有剩余。
  5. tryReleaseShared(int)
Top
Foot