基础
AbstractQueuedSynchronizer抽象同步队列简称AQS,它是实现同步器的基础组件,如常用的ReentrantLock、Semaphore、CountDownLatch等。
AQS定义了一套多线程访问共享资源的同步模板,解决了实现同步器时涉及的大量细节问题,能够极大地减少实现工作,虽然大多数开发者可能永远不会使用AQS实现自己的同步器(JUC包下提供的同步器基本足够应对日常开发),但是知道AQS的原理对于架构设计还是很有帮助的,面试还可以吹吹牛,下面是AQS的组成结构。
三部分组成:volatile int state同步状态
、Node组成的CLH队列
、ConditionObject条件变量
(包含Node组成的条件单向队列)。
状态
getState()
:返回同步状态setState(int newState)
:设置同步状态compareAndSetState(int expect, int update)
:使用CAS设置同步状态isHeldExclusively()
:当前线程是否持有资源
独占资源(不响应线程中断)
tryAcquire(int arg)
:独占式获取资源,子类实现acquire(int arg)
:独占式获取资源模板tryRelease(int arg)
:独占式释放资源,子类实现release(int arg)
:独占式释放资源模板
共享资源(不响应线程中断)
tryAcquireShared(int arg)
:共享式获取资源,返回值大于等于0则表示获取成功,否则获取失败,子类实现acquireShared(int arg)
:共享形获取资源模板tryReleaseShared(int arg)
:共享式释放资源,子类实现releaseShared(int arg)
:共享式释放资源模板
同步状态
在AQS中维护了一个同步状态变量state,getState函数获取同步状态,setState、compareAndSetState函数修改同步状态,对于AQS来说,线程同步的关键是对state的操作,可以说获取、释放资源是否成功都是由state决定的,比如state>0代表可获取资源,否则无法获取,所以state的具体语义由实现者去定义,现有的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch定义的state语义都不一样。
- ReentrantLock的state用来表示是否有锁资源
- ReentrantReadWriteLock的state高16位代表读锁状态,低16位代表写锁状态
- Semaphore的state用来表示可用信号的个数
- CountDownLatch的state用来表示计数器的值
CLH队列
CLH是AQS内部维护的FIFO(先进先出)双端双向队列(方便尾部节点插入),基于链表数据结构,当一个线程竞争资源失败,就会将等待资源的线程封装成一个Node节点,通过CAS原子操作插入队列尾部,最终不同的Node节点连接组成了一个CLH队列,所以说AQS通过CLH队列管理竞争资源的线程,个人总结CLH队列具有如下几个优点:
- 先进先出保证了公平性
- 非阻塞的队列,通过自旋锁和CAS保证节点插入和移除的原子性,实现无锁快速插入
- 采用了自旋锁思想,所以CLH也是一种基于链表的可扩展、高性能、公平的自旋锁
Node内部类
Node
是AQS
的内部类,每个等待资源的线程都会封装成Node
节点组成CLH
队列、等待队列,所以说Node
是非常重要的部分,理解它是理解AQS
的第一步。
waitStatus等待状态如下
nextWaiter特殊标记
Node
在CLH
队列时,nextWaiter
表示共享式或独占式标记Node
在条件队列时,nextWaiter
表示下个Node
节点指针
流程概述
线程获取资源失败,封装成Node
节点从CLH
队列尾部入队并阻塞线程,某线程释放资源时会把CLH
队列首部Node
节点关联的线程唤醒(此处的首部是指第二个节点,后面会细说),再次获取资源。
入队
获取资源失败的线程需要封装成Node
节点,接着尾部入队,在AQS
中提供addWaiter
函数完成Node
节点的创建与入队。
/**
* @description: Node节点入队-CLH队列
* @param mode 标记Node.EXCLUSIVE独占式 or Node.SHARED共享式
*/
private Node addWaiter(Node mode) {
// 根据当前线程创建节点,等待状态为0
Node node = new Node(Thread.currentThread(), mode);
// 获取尾节点
Node pred = tail;
if (pred != null) {
// 如果尾节点不等于null,把当前节点的前驱节点指向尾节点
node.prev = pred;
// 通过CAS把尾节点指向当前节点
if (compareAndSetTail(pred, node)) {
// 之前尾节点的下个节点指向当前节点
pred.next = node;
return node;
}
}
// 如果添加失败或队列不存在,执行end函数
enq(node);
return node;
}
添加节点的时候,如果从CLH
队列已经存在,通过CAS
快速将当前节点添加到队列尾部,如果添加失败或队列不存在,则指向enq
函数自旋入队。
/**
* @description: 自旋cas入队
* @param node 节点
*/
private Node enq(final Node node) {
for (;;) { //循环
//获取尾节点
Node t = tail;
if (t == null) {
//如果尾节点为空,创建哨兵节点,通过cas把头节点指向哨兵节点
if (compareAndSetHead(new Node()))
//cas成功,尾节点指向哨兵节点
tail = head;
} else {
//当前节点的前驱节点设指向之前尾节点
node.prev = t;
//cas设置把尾节点指向当前节点
if (compareAndSetTail(t, node)) {
//cas成功,之前尾节点的下个节点指向当前节点
t.next = node;
return t;
}
}
}
}
通过自旋CAS
尝试往队列尾部插入节点,直到成功,自旋过程如果发现CLH
队列不存在时会初始化CLH
队列,入队过程流程如下图:
第一次循环
- 刚开始C L H队列不存在,head与tail都指向null
- 要初始化C L H队列,会创建一个哨兵节点,head与tail都指向哨兵节点
第二次循环
- 当前线程节点的前驱节点指向尾部节点(哨兵节点)
- 设置当前线程节点为尾部,tail指向当前线程节点
- 前尾部节点的后驱节点指向当前线程节点(当前尾部节点)
最后结合addWaiter与enq函数的入队流程图如下
出队
CLH
队列中的节点都是获取资源失败的线程节点,当持有资源的线程释放资源时,会将head.next
指向的线程节点唤醒(CLH
队列的第二个节点),如果唤醒的线程节点获取资源成功,线程节点清空信息设置为头部节点(新哨兵节点),原头部节点出队(原哨兵节点)acquireQueued函数中的部分代码
//1.获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//2.获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//3.原来首节点下个节点指向为null
p.next = null; // help GC
//4.非异常状态,防止指向finally逻辑
failed = false;
//5.返回线程中断状态
return interrupted;
}
private void setHead(Node node) {
//节点设置为头部
head = node;
//清空线程
node.thread = null;
//清空前驱节点
node.prev = null;
}
只需要关注1~3
步骤即可,过程非常简单,假设获取资源成功,更换头部节点,并把头部节点的信息清除变成哨兵节点,注意这个过程是不需要使用CAS
来保证,因为只有一个线程能够成功获取到资源。
条件变量
Object的wait、notify函数是配合Synchronized锁实现线程间同步协作的功能,A Q S的ConditionObject条件变量也提供这样的功能,通过ConditionObject的await和signal两类函数完成。不同于Synchronized锁,一个A Q S可以对应多个条件变量,而Synchronized只有一个。
如上图所示,ConditionObject内部维护着一个单向条件队列,不同于C H L队列,条件队列只入队执行await的线程节点,并且加入条件队列的节点,不能在C H L队列, 条件队列出队的节点,会入队到C H L队列。
当某个线程执行了ConditionObject的await函数,阻塞当前线程,线程会被封装成Node节点添加到条件队列的末端,其他线程执行ConditionObject的signal函数,会将条件队列头部线程节点转移到C H L队列参与竞争资源,具体流程如下图
模板方法
AQS
采用了模板方法设计模式,提供了两类模板,一类是独占式模板,另一类是共享形模式,对应的模板函数如下
- 独占式
acquire
获取资源release
释放资源
- 共享式
acquireShared
获取资源releaseShared
释放资源
独占式获取资源
acquire
是个模板函数,模板流程就是线程获取共享资源,如果获取资源成功,线程直接返回,否则进入CLH
队列,直到获取资源成功为止,且整个过程忽略中断的影响,acquire
函数代码如下
- 执行tryAcquire函数,tryAcquire是由子类实现,代表获取资源是否成功,如果资源获取失败,执行下面的逻辑
- 执行addWaiter函数(前面已经介绍过),根据当前线程创建出独占式节点,并入队CLH队列
- 执行acquireQueued函数,自旋阻塞等待获取资源
- 如果acquireQueued函数中获取资源成功,根据线程是否被中断状态,来决定执行线程中断逻辑
acquire
函数的大致流程都清楚了,下面来分析下acquireQueued
函数,线程封装成节点后,是如何自旋阻塞等待获取资源的,代码如下:
/**
* @description: 自旋机制等待获取资源
* @param node
* @param arg
* @return: boolean
*/
final boolean acquireQueued(final Node node, int arg) {
//异常状态,默认是
boolean failed = true;
try {
//该线程是否中断过,默认否
boolean interrupted = false;
for (;;) {//自旋
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是首节点,获取资源(子类实现)
if (p == head && tryAcquire(arg)) {
//获取资源成功,设置当前节点为头节点,清空当前节点的信息,把当前节点变成哨兵节点
setHead(node);
//原来首节点下个节点指向为null
p.next = null; // help GC
//非异常状态,防止指向finally逻辑
failed = false;
//返回线程中断状态
return interrupted;
}
/**
* 如果前驱节点不是首节点,先执行shouldParkAfterFailedAcquire函数,shouldParkAfterFailedAcquire做了三件事
* 1.如果前驱节点的等待状态是SIGNAL,返回true,执行parkAndCheckInterrupt函数,返回false
* 2.如果前驱节点的等大状态是CANCELLED,把CANCELLED节点全部移出队列(条件节点)
* 3.以上两者都不符合,更新前驱节点的等待状态为SIGNAL,返回false
*/
if (shouldParkAfterFailedAcquire(p, node) &&
//使用LockSupport类的静态方法park挂起当前线程,直到被唤醒,唤醒后检查当前线程是否被中断,返回该线程中断状态并重置中断状态
parkAndCheckInterrupt())
//该线程被中断过
interrupted = true;
}
} finally {
// 尝试获取资源失败并执行异常,取消请求,将当前节点从队列中移除
if (failed)
cancelAcquire(node);
}
}
一图胜千言,核心流程图如下:
独占式释放资源
有获取资源,自然就少不了释放资源,A Q S
中提供了release
模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CLH
队列的第二个线程节点(首节点的下个节点),代码如下
/**
* @description: 独占式-释放资源模板函数
* @param arg
* @return: boolean
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {//释放资源成功,tryRelease子类实现
//获取头部线程节点
Node h = head;
if (h != null && h.waitStatus != 0) //头部线程节点不为null,并且等待状态不为0
//唤醒CHL队列第二个线程节点
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
//获取节点等待状态
int ws = node.waitStatus;
if (ws < 0)
//cas更新节点状态为0
compareAndSetWaitStatus(node, ws, 0);
//获取下个线程节点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下个节点信息异常,从尾节点循环向前获取到正常的节点为止,正常情况不会执行
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒线程节点
LockSupport.unpark(s.thread);
}
}
release
逻辑非常简单,流程图如下:
共享式获取资源
acquireShared
是个模板函数,模板流程就是线程获取共享资源,如果获取到资源,线程直接返回,否则进入CLH
队列,直到获取到资源为止,且整个过程忽略中断的影响,acquireShared
函数代码如下
/**
* @description: 共享式-获取资源模板函数
* @param arg
* @return: void
*/
public final void acquireShared(int arg) {
/**
* 1.负数表示失败
* 2.0表示成功,但没有剩余可用资源
* 3.正数表示成功且有剩余资源
*/
if (tryAcquireShared(arg) < 0) //获取资源失败,tryAcquireShared子类实现
//自旋阻塞等待获取资源
doAcquireShared(arg);
}
doAcquireShared
函数与独占式的acquireQueued
函数逻辑基本一致,唯一的区别就是下图红框部分
- 节点的标记是共享式
- 获取资源成功,还会唤醒后续资源,因为资源数可能
>0
,代表还有资源可获取,所以需要做后续线程节点的唤醒
共享式释放资源
AQS
中提供了releaseShared
模板函数来释放资源,模板流程就是线程释放资源成功,唤醒CHL队列的第二个线程节点(首节点的下个节点),代码如下
/**
* @description: 共享式-释放资源模板函数
* @param arg
* @return: boolean
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//释放资源成功,tryReleaseShared子类实现
//唤醒后继节点
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
//获取头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//如果头节点等待状态为SIGNAL
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//更新头节点等待状态为0
continue; // loop to recheck cases
//唤醒头节点下个线程节点
unparkSuccessor(h);
}
//如果后继节点暂时不需要被唤醒,更新头节点等待状态为PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
与独占式释放资源区别不大,都是唤醒头节点的下个节点。
什么是AQS?
AQS
的全称是 AbstractQueuedSynchronizer
,即抽象队列同步器
。是Java并发工具的基础,采用乐观锁,通过CAS与自旋轻量级的获取锁。维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。很多JUC包,比如ReentrantLock、Semaphore、CountDownLatch等并发类均是继承AQS,通过AQS的模板方法,来实现的。
核心思想
- 若请求的共享资源空闲,则将当前请求的线程设置为有效的工作线程,并将共享资源设置为锁定状态
- 若共享资源被占用,则需要阻塞等待唤醒机制保证锁的分配
工作原理
AQS = 同步状态(volatile int state)
+ 同步队列(即等待队列,FIFO的CLH队列)
+ 条件队列(ConditionObject)
- state:代表共享资源。
volatile
保证并发读,CAS
保证并发写 - 同步队列(即等待队列,CLH队列):是CLH变体的虚拟双向队列(先进先出FIFO)来等待获取共享资源。当前线程可以通过signal和signalAll将条件队列中的节点转移到同步队列中
- 条件队列(ConditionObject):当前线程存在于同步队列的头节点,可以通过await从同步队列转移到条件队列中
实现原理
- 通过CLH队列的变体:FIFO双向队列实现的
- 每个请求资源的线程被包装成一个节点来实现锁的分配
- 通过
volatile
的int
类型的成员变量state
表示同步状态 - 通过FIFO队列完成资源获取的排队工作
- 通过CAS完成对
state
的修改
共享方式
AQS定义两种资源共享方式。无论是独占锁还是共享锁,本质上都是对AQS内部的一个变量state
的获取。state
是一个原子的int变量,用来表示锁状态、资源数等。
① 独占锁(Exclusive
)模式:只能被一个线程获取到(Reentrantlock
)。
② 共享锁(Share
)模式:可以被多个线程同时获取(Semaphore/CountDownLatch/ReadWriteLock
)。
state机制
提供volatile
变量state
,用于同步线程之间的共享状态。通过 CAS
和 volatile
保证其原子性和可见性。核心要点:
- state 用
volatile
修饰,保证多线程中的可见性 getState()
和setState()
方法采用final修饰,限制AQS的子类重写它们两compareAndSetState()
方法采用乐观锁思想的CAS算法,也是采用final修饰的,不允许子类重写
state应用案例:
案例 | 描述 |
---|---|
Semaphore | 使用AQS同步状态来保存信号量的当前计数。tryRelease会增加计数,acquireShared会减少计数 |
CountDownLatch | 使用AQS同步状态来表示计数。计数为0时,所有的Acquire操作(CountDownLatch的await方法)才可以通过 |
ReentrantReadWriteLock | 使用AQS同步状态中的16位保存写锁持有的次数,剩下的16位用于保存读锁的持有次数 |
ThreadPoolExecutor | Worker利用AQS同步状态实现对独占线程变量的设置(tryAcquire和tryRelease) |
ReentrantLock | 使用AQS保存锁重复持有的次数。当一个线程获取锁时,ReentrantLock记录当前获得锁的线程标识,用于检测是否重复获取,以及错误线程试图解锁操作时异常情况的处理 |
双队列
- 同步队列(syncQueue):管理多个线程的休眠与唤醒
- 条件队列(waitQueue):类似wait与signal作用,实现在使用锁时对线程管理
注意
- 同步队列与条件队列节点可相互转化
- 一个线程只能存在于两个队列中的一个
同步队列
同步队列是用来管理多个线程的休眠与唤醒。
同步队列依赖一个双向链表(CHL)来完成同步状态的管理,当前线程获取同步状态失败后,同步器会将线程构建成一个节点,并将其加入同步队列中。通过signal
或signalAll
将条件队列中的节点转移到同步队列。(由条件队列转化为同步队列)
如果没有锁竞争,线程可以直接获取到锁,就不会进入同步队列。即没有锁竞争时,同步队列(syncQueue)是空的,当存在锁竞争时,线程会进入到同步队列中。一旦进入到同步队列中,就会有线程切换。标准的 CHL 无锁队列是单向链表,同步队列(syncQueue) 在 CHL 基础上做了改进:
-
同步队列是双向链表。和二叉树一样,双向链表目前也没有无锁算法的实现。双向链表需要同时设置前驱和后继结点,这两次操作只能保证一个是原子性的
-
node.pre一定可以遍历所有结点,是线程安全的。而后继结点 node.next 则是线程不安全的。也就是说,node.pre 一定可以遍历整个链表,而 node.next 则不一定
条件队列
条件队列是类似wait与signal作用,实现在使用锁时对线程管理。且由于实现了Condition,对线程的管理可更加细化。
当线程存在于同步队列的头结点时,调用 await
方法进行阻塞(从同步队列转化到条件队列)。Condition条件队列(waitQueue
)要比Lock同步队列(syncQueue
)简单很多,最重要的原因是 waitQueue
的操作都是在获取锁的线程中执行,不存在数据竞争的问题。
ConditionObject重要的方法说明:
- await:阻塞线程并放弃锁,加入到等待队列中
- signal:唤醒等待线程,没有特殊的要求,尽量使用 signalAll
- addConditionWaiter:将结点(状态为 CONDITION)添加到等待队列 waitQueue 中,不存在锁竞争
- fullyRelease:释放锁,并唤醒后继等待线程
- isOnSyncQueue:根据结点是否在同步队列上,判断等待线程是否已经被唤醒
- acquireQueued:Lock 接口中的方法,通过同步队列方法竞争锁
- unlinkCancelledWaiters:清理取消等待的线程
框架架构图
常见问题
问题1:state为什么要提供setState和compareAndSetState两种修改状态的方法?
这个问题,关键是修改状态时是否存在数据竞争,如果有则必须使用 compareAndSetState。
- lock.lock() 获取锁时会发生数据竞争,必须使用 CAS 来保障线程安全,也就是 compareAndSetState 方法
- lock.unlock() 释放锁时,线程已经获取到锁,没有数据竞争,也就可以直接使用 setState 修改锁的状态
问题2:AQS为什么选择node.prev前驱结点的原子性,而node.next后继结点则是辅助结点?
- next 域:需要修改二处来保证原子性,一是 tail.next;二是 tail 指针
- prev 域:只需要修改一处来保证原子性,就是 tail 指针。你可能会说不需要修改 node.prev 吗?当然需要,但 node 还没添加到链表中,其 node.prev 修改并没有锁竞争的问题,将 tail 指针指向 node 时,如果失败会通过自旋不断尝试
问题3:AQS明知道node.next有可见性问题,为什么还要设计成双向链表?
唤醒同步线程时,如果有后继结点,那么时间复杂为 O(1)。否则只能只反向遍历,时间复杂度为 O(n)。以下两种情况,则认为 node.next 不可靠,需要从 tail 反向遍历。
- node.next=null:可能结点刚刚插入链表中,node.next 仍为空。此时有其它线程通过 unparkSuccessor 来唤醒该线程
- node.next.waitStatus>0:结点已经取消,next 值可能已经改变
评论区