AQS全称是Abstract Queued Synchronizer,一般翻译为同步器。它是一套实现多线程同步功能的框架。AQS在源码中被广泛使用尤其是JUC(java Util Concurrent)中,比如ReentrantLock,Semaphore,CountDownLatch,ThreadPoolExecutor。理解AQS对我们理解JUC中其他组件至关重要,并且在实际开发中也可以通过自定义AQS 来实现各种需求场景。
理解AQS需要熟悉双端队列,Unsafe
通过ReentrantLock来理解AQS内部工作机制。
从ReentrantLock的lock()方法可以看到
public void lock(){
sync.lock();
}
只是调用了一个sync的lock方法,这个sync是什么呢?
public class ReentrantLock implements Lock{
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
//......
}
}
可以看到,sync是ReentrantLock中的一个内部类。ReentrantLock并没有直接继承AQS,而是通过内部类sync来扩展AQS的功能。然后ReentrantLock中存有sync的全局变量引用。
sync在ReentrantLock中两种实现:NonfairSync和FairSync,对应非公平锁和公平锁。以非公平锁代码为例:sync.lock()
static final class NofairSync extends Sync{
final void lock(){
//通过CAS操作来修改state状态,表示争抢锁的操作
if(compareAndSetState(0,1)){
//设置当前获得锁状态的线程setExclusiveOwnerThread(Thread.currentThread);
}else{
//修改状态失败,尝试去获取锁
acquire(1);
}
}
//复写。真正功能由子类同步器实现
protected final boolean tryAcquire(int acquires){
return nofairTryAcquire(acquires);
}
}
可以看出在非公平锁中的lock()方法中 ,主要做了如下操作:
acquire()方法 定义在AQS中:
public final void acquire(int arg){
if(!tryAcquire(arg)&&
acquireQueued( addWaiter(Node.EXCLUSIVE),arg) ){
selfInterrupt();
}
}
acquire()方法是一个比较重要的方法,将其拆为3个主要步骤:
以上3个方法都被定义在AQS中,但其中tryAcquire()有点特殊,实现如下:
protected boolean tryAcuqire(int arg){
throw new UnsupportedOperationException();
}
默认情况下直接抛异常,因此它需要在子类中复写 ,也就是说真正的获取锁的逻辑由子类同步器自己实现 。
ReentrantLock中tryAcquire的实现(非公平锁)如下:
//NonfairSync extends Sync
protect final boolean tryAcuqire(int acquires){
return nofairTryAcquire(acquires);
}
//Sync extends AbstractQueuedSynchronizer
final boolean nofairTryAcquire(int acquires){
//获得当前执行的线程
final Thread cur = Thread.currentThread();
int c = getState();//获得state的值,volatile
//c==0当前无锁状态
if (c==0){
//通过CAS操作来替换state的值为1
if(compareAndSetSate(0,acquires)){
setExclusiveOwnerThread(cur);
return true;
}
//如果是同一个线程来获得锁,则直接增加重入次数
}else if (cur == getExclusiveOwnerThread()){
//增加重入次数
int nextc = c+acquires;
//overflow
if(nextc<0){
throw new Error("maximum lock count exceeded");
}else{
setState(nextc);
return true;
}
}
return false;
}
解释说明:
在ReentrantLock执行lock()的过程中,大部分同步机制的核心逻辑都已经在AQS中实现 ,ReentrantLock自身只要实现某些特定步骤下的方法即可,这种设计模式叫模板模式 。Activity的生命周期就是这样的,生命周期的执行流程都已在framework中定义好了,子Activity只要在相应的onCreate等方法中提供相应的实现即可。
首先看下AQS中几个关键属性
static final class Node{
...
}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;//The synchronization state.
展示了AQS中两个比较重要的属性Node和state
state表示当前锁状态。当state=0时表示无锁状态;state>0时,表示已经有线程获得了锁,也就是state=1,如果一个线程多次获得同步锁的时候,state会递增,比如重入5次,那么state=5.而在释放锁的时候,同样需要释放5次直到state=0,其他线程才有资格获得锁 。
state还可以实现锁的独占模式或者共享模式:
独占模式:只有一个线程能够持有同步锁
在独占模式下,我们可以把state的初始值设置为0,当某个线程申请锁对象时,需要判断state的值是不是0,如果不是0就代表其他线程已经持有该锁 ,则本线程需要阻塞等待。
共享模式:可以有多个线程持有同步锁
比如某项操作允许10个线程同时进行,超过这个数量的线程就需要阻塞等待。那么只需要在线程申请对象时判断state的值是否小于10。如果小于10,就将state+1后继续执行同步语句;如果=10,说明已经有10线程在同时执行,本线程需要阻塞等待。
Node是一个先进先出的双端队列,并且是等待队列 ,当多个线程争用资源被阻塞时,会进入此队列 。这个队列是AQS实现多线程同步的核心。
ReentrantLock中看到,在AQS中有两个Node的指针,分别指向队列的head,tail。Node主要结构如下:
static final class Node{
//该等待同步的节点处于共享模式
static final Node SHARED = new Node();
//该等待同步的节点处于独占模式
static final Node EXCLUSIVE = null;
//Node中的线程状态,和state不一样:有1,0,-1,-2,-3五个
volatile int waitStatus;
static final int CANCELLED =1;
static final int SIGNAL =-1;
static final int CONDITION =-2;
static final int PROPAGATE =-3;
volatile Node prev;//前驱节点
volatile Node next;//后继节点
volatile Thread thread;//等待锁的线程
...
}
锁的意义:就是使竞争到锁对象的线程执行同步代码块 ,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock如何实现让线程等待并唤醒的? (队列!!!)
前面提到过在ReentrantLock.lock()阶段,在acquire()方法中会先后调用tryAcquire,addWaiter,acquireQueued这3方法来处理。tryAcquire在ReentrantLock中被复写,如果返回true说明获取锁成功,就继续执行同步代码块。
可是tryAcquire返回false,也就是当前锁对象被其他线程持有,那么当前线程会被AQS如何处理?(addWaiter,acquireQueued)
addWaiter
首先当前获取锁失败的线程会被添加到一个等待队列的末端
//将线程以Node的方式添加到队列中
private Node addWriter(Node node){
//当前线程封装一个新Node
Node node = new Node(Thread.cur(),node);
Node pred = tail;
//将Node插入队列
if(pred!=null){
node.prev=pred;
//CAS替换当前尾部,成功则返回
if(compareAndSetTail(pred,node)){
pred.next=node;
return node;
}
}
//插入队列失败,进入enq自旋重试入队
enq(node);
return node;
}
//插入节点到队列中,如果队列未初始化,则初始化再插入
private Node enq(final Node node){
for(;;){
Node t =tail;
//如果队列从未被初始化好,需要初始化一个空Node
if(t==null){
if(compareAndSetHead(new Node())){
tail = head;
}
}else{
node.prev=t;
if(compareAndSetTail(t,node)){
t.next=node;
return t;
}
}
}
}
//SDK28,直接改成了enq的方式,直接自旋,在里面进行插入或者就是初始化。
有两种情况会导致插入队列失败:
经过addWaiter之后,此时线程以Node方式被加入到队列的末端 ,但是线程还有没有被执行阻塞操作,真正的阻塞操作是在下面的acquireQueued方法中
acquireQueued
在acquireQueued方法中并不会立即挂起该节点中的线程 ,因为在插入节点的过程中,之前持有锁的线程可能已经执行完了释放了锁(严谨),所以这里使用自旋再次去尝试获取锁 。如果自旋还没获取到!那么就将该线程挂起(阻塞)。
//在队列中的节点通过此方法获取锁
final boolean acquireQueued(final Node node,int arg){
boolean failed = true;
try{
boolean interrupted =false;
for(;;){
final Node p = node.predecessor();//Returns previous node
/*
*检测当前节点前驱是否head,这是尝试获取锁的资格
*如果是的话,则调用tryAcquire尝试获取锁,
*成功,则将head置为当前节点
*/
if(p==head && tryAcquire(arg)){
setHead(node);
p.next = null; //help GC
failed = false;
return interrupted;
}
/*
*如果未成功获取锁则根据前驱节点判断是否要阻塞
*如阻塞过程中被中断,则置interrupt标志位为true
*shouldParkAfterFailedAcquire方法在前驱状态不
*为SIGNAL的情况下都会循环重试获取锁
*/
if(shouldParkAfterFailedAcquire(p,node))&&
parkAndCheckInterrupt()){
interrupt = true;
}
}
}finally{
if(failed){
//Cancels an ongoing attempt to acquire.
cancelAcquire(node);
}
}
}
在shouldParkAfterFailedAcquire方法中会判断当前线程是否应该被挂起
//根据前驱节点中的waitStatus来判断是否需要阻塞当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred,Node node){
//获取前驱节点的状态
int ws = pred.waitStatus;
if(ws == Node.SIGNAL){
//如果是SIGNAL状态,返回true将当前线程挂起
return true;
}
if(ws > 0){
/*
*前驱节点状态为取消,向前遍历,更新当前节点
*的前驱为往前第一个非取消节点。当前线程之后会
*再次回到循环并尝试获取锁
*/
do{
node.prev = pred = pred.prev;
}while(pred.waitStatus > 0);
pred.next = node;
}else{
/*
*等待状态为0或者PROPAGATE(-3),设置前驱的
*等待状态为SIGNAL。并且之后会回到循环再次
*重试获取锁
*/
compareAndSetWaitStatus(pred,ws,Node.SIGNAL);
}
return false;
}
首先获取前驱节点的waitStatus值,Node中的waitStatus一共5种取值:
waitStatue值 | 描述 |
---|---|
CANCELLED(1) | 当前线程因为超时或中断被取消。这是一个终结态,也就是状态到此为止 |
SIGNAL(-1) | 当前线程的后继线程被阻塞或即将被阻塞,当前线程释放锁或取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的 |
CONDITION(-2) | 当前线程在condition队列种 |
PROPAGATE(-3) | 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的 |
0 | 表示无锁状态 |
接下来根据waitStatus不同的值进行不同的操作,有以下情况:
代码再回到acquireQueued中,如果shouldParkAfterFailedAcquire返回true表示线程需要被挂起,那么会继续调用parkAndCheckInterrupt()方法执行真正的阻塞线程代码 。
private final boolean parkAndCheckInterrupt(){
LockSupport.park(this);
return Thread.interrupted();
}
//LockSupport.java
public static void park(Object blocker){
Thread t = Thread.currentThread();
setBlocker(t,blocker);
U.park(false,0L);
setBlocker(t,null);
}
只是调用了LockSupport中的park()方法。在LockSupport.park()方法中调用了Unsafe API来执行底层natice方法将线程挂起,代码到这已经到OS层面,没必要再深入分析。
至此,获取锁的大体流程已分析完,总结:
在上面加锁阶段被阻塞的线程需要被唤醒后才可以重新执行 (阻塞状态)。那具体AQS是何时尝试唤醒等待队列中被阻塞的线程呢?
同加锁过程一样,释放锁需要从ReentrantLock.unlock()方法开始
public void unlock(){
sync.release(1);
}
//AbstractQueuedSynchronizer
public final boolean release(int arg){
if(tryRelease(arg)){
Node h = head;
if(h!=null && h.waitStatus!=0){
unparkSuccessor(h);
}
return true;
}
return false;
}
首先调用tryRelease()方法尝试释放锁,如果成功最终会调用AQS中的unparkSuccessor方法来实现释放锁的操作。
private void unparkSuccessor(Node node){
//获取头节点waitStatus
int ws = node.waitStatus;
if(ws < 0){
compareAndSetWaitStatus(node,ws,0);
}
//获取当前节点(实际是head节点)的下一个节点
Node s = node.next;
//如果下个节点是null或是CANCEL状态
//就找到队列最开始的非CANCEL的节点
if(s == null || s.waitStatus > 0){
s = null;
//从尾部节点开始找,到队首
//找到队列第一个waitStatus < 0的节点
for(Node t=tail;t!=null&&t!=node;t=t.prev){
if(t.waitStatus <= 0){
s = t;
}
}
}
//如果当前节点的下个节点!=null,且状态<=0。unpark
if(s != null){
LockSupport.unpark(s.thread);
}
}
首先获取当前节点(实际上传入的是head节点)的状态,如果head节点的下一个节点是null,或者下一个节点的状态为CANCEL,则从等待队列的尾部开始遍历,直到寻找到第一个waitStatus小于0的节点。
如果最终遍历到的节点!=null,再调用LockSupport.unpar()方法,调用底层方法唤醒线程。至此,线程被唤醒的时机也分析完了。
不管是加锁还是释放锁的阶段,多次提到一种通用操作:compareAndSetXXX。这种操作最终会调用Unsafe中的API进行CAS操作。
CAS全称是:Compare And Swap,译为比较和替换,是一种通过硬件实现并发安全 的常用技术,底层是利用CPU的CAS指令对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作。
它的实现过程主要有3个操作数:内存值V,旧的预期值E,要修改的新值U,当且仅当预期值E和内存值V相同时 ,才将内存值V修改为U,否则什么都不做。
public class MyLock{
private Sync sync = new Sync();
//加锁
public void lock(){
sync.acquire(1);
}
//释放锁
public void unlock(){
sync.release(1);
}
static class Sync extends AbstractQueuedSynchroizer{
@Override
protected boolean tryAcquire(int arg){
return compareAndSetState(0,1);
}
@Override
protected boolean tryRelease(int arg){
setState(0);
return true;
}
}
}
代码中的MyLock就是一个最简单的独占锁,通过使用MyLock也能实现同synchronized和ReentrantLock相同的功能,如下:
static int count = 0;
static MyLock myLock = new MyLock();
@Test
public void testMyLock() throws InterruptException{
Runnable runnable = new Runnable(){
@Override
public void run(){
try{
myLock.lock();
for(int i=0;i<1000;i++){
count++;
}
}catch(Exception e){
print(e)
}finally{
myLock.unlock();
}
}
};
Thread thread1 = new Thread(runnable);
Thread thread2 = new Thread(runnable);
thread1.start();
thread2.start();
thread1.join();
thread2.join();
print(count);
}
打印200,说明线程安全的同步操作。
总体来说,AQS是一套框架,在框架内部已经封装好了大部分同步需要的逻辑,,在AQS内部维护了一个状态指示器state和一个等待队列Node,而通过state的操作又分为两种:独占式和共享式,这就导致AQS有两种不同的实现:独占锁(ReentrantLock)和分享锁(CountDownLatch,读写锁)。这里主要从独占锁的角度来跟进的分析AQS加锁和释放锁的流程。
自定义时有几个可能需要子类同步器实现: