🔧 AQS抽象队列同步器
深入理解Java并发编程的核心基础设施,掌握同步器的设计思想和实现原理
学习目标
- 深入理解AQS的设计思想和核心概念
- 掌握同步状态的管理机制和CAS操作
- 了解等待队列的实现原理和节点状态
- 区分独占锁和共享锁的实现方式
- 学会自定义同步器的开发技巧
AQS概述
AbstractQueuedSynchronizer(AQS)是Java并发包中最重要的基础设施之一,它为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器提供了一个框架。AQS是ReentrantLock、CountDownLatch、Semaphore等同步工具的基础。
AQS使用模板方法模式,定义了同步器的基本框架,子类只需要实现特定的方法来定义同步状态的获取和释放逻辑。
AQS的核心组件
AQS设计思想
AQS的设计基于模板方法模式,它定义了同步器的骨架,而具体的同步语义由子类来实现。这种设计使得不同类型的同步器可以复用AQS的基础设施。
模板方法模式的应用
// AQS提供的模板方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 子类需要实现的方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 共享模式的获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 子类需要实现的共享获取方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
设计原则
- 状态管理:使用单一的state变量表示同步状态
- 队列管理:维护一个FIFO的等待队列
- 公平性:支持公平和非公平两种获取模式
- 可扩展性:通过模板方法模式支持不同的同步语义
- 高性能:使用CAS操作避免锁竞争
同步状态管理
AQS使用一个32位的整数来维护同步状态,这个状态的含义完全由子类来定义。例如,在ReentrantLock中,state表示锁的重入次数;在CountDownLatch中,state表示计数器的值。
状态操作方法
// 获取当前同步状态
protected final int getState() {
return state;
}
// 设置同步状态
protected final void setState(int newState) {
state = newState;
}
// CAS操作更新状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 示例:ReentrantLock中的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
state变量被声明为volatile,确保了可见性。所有对state的修改都应该使用CAS操作来保证原子性。
等待队列机制
AQS维护了一个FIFO的双向链表作为等待队列,当线程获取同步状态失败时,会被包装成Node节点加入到队列尾部,并阻塞等待。
Node节点结构
static final class Node {
// 共享模式标记
static final Node SHARED = new Node();
// 独占模式标记
static final Node EXCLUSIVE = null;
// 节点状态常量
static final int CANCELLED = 1; // 节点已取消
static final int SIGNAL = -1; // 后继节点需要唤醒
static final int CONDITION = -2; // 节点在条件队列中
static final int PROPAGATE = -3; // 共享模式下需要传播
volatile int waitStatus; // 节点状态
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 关联的线程
Node nextWaiter; // 条件队列中的下一个节点
}
队列操作
独占锁和共享锁
AQS支持两种同步模式:独占模式(Exclusive)和共享模式(Shared)。独占模式下,同一时刻只有一个线程能够获取同步状态;共享模式下,多个线程可以同时获取同步状态。
独占模式
// 独占模式获取
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 独占模式释放
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// 子类实现的获取方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 子类实现的释放方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
共享模式
// 共享模式获取
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 共享模式释放
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 子类实现的共享获取方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 子类实现的共享释放方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
模式对比
自定义同步器
通过继承AQS并实现相应的方法,我们可以创建自定义的同步器。下面演示如何实现一个简单的互斥锁。
public class CustomMutex {
private final Sync sync = new Sync();
// 内部同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于独占状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 尝试获取锁
protected boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 尝试释放锁
protected boolean tryRelease(int releases) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 创建条件变量
Condition newCondition() {
return new ConditionObject();
}
}
// 公共接口
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public Condition newCondition() {
return sync.newCondition();
}
}
实现要点
- 状态定义:明确state变量的含义,0表示未锁定,1表示锁定
- 原子操作:使用compareAndSetState确保状态修改的原子性
- 线程记录:记录当前持有锁的线程,支持重入检查
- 异常处理:在不合法的状态下抛出适当的异常
- 条件支持:可选择性地支持条件变量
实战应用
理解AQS的原理对于使用和优化Java并发程序非常重要。下面是一些实际应用中的考虑因素。
性能优化建议
- 优先使用JUC包提供的同步器,而不是自己实现
- 理解不同同步器的适用场景和性能特点
- 在自定义同步器时,仔细考虑状态的定义和转换
- 充分测试并发场景下的正确性和性能