第16章

🔧 AQS抽象队列同步器

深入理解Java并发编程的核心基础设施,掌握同步器的设计思想和实现原理

学习目标

AQS概述

AbstractQueuedSynchronizer(AQS)是Java并发包中最重要的基础设施之一,它为实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器提供了一个框架。AQS是ReentrantLock、CountDownLatch、Semaphore等同步工具的基础。

核心理解

AQS使用模板方法模式,定义了同步器的基本框架,子类只需要实现特定的方法来定义同步状态的获取和释放逻辑。

AQS的核心组件

同步状态
使用volatile int state变量表示同步状态,通过CAS操作保证原子性修改。
等待队列
基于CLH队列的变种,管理等待获取同步状态的线程。
模板方法
定义了获取和释放同步状态的标准流程,子类实现具体逻辑。

AQS设计思想

AQS的设计基于模板方法模式,它定义了同步器的骨架,而具体的同步语义由子类来实现。这种设计使得不同类型的同步器可以复用AQS的基础设施。

模板方法模式的应用

AQS核心方法
// AQS提供的模板方法
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 子类需要实现的方法
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 共享模式的获取
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 子类需要实现的共享获取方法
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

设计原则

同步状态管理

AQS使用一个32位的整数来维护同步状态,这个状态的含义完全由子类来定义。例如,在ReentrantLock中,state表示锁的重入次数;在CountDownLatch中,state表示计数器的值。

状态操作方法

状态管理API
// 获取当前同步状态
protected final int getState() {
    return state;
}

// 设置同步状态
protected final void setState(int newState) {
    state = newState;
}

// CAS操作更新状态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

// 示例:ReentrantLock中的tryAcquire实现
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
重要提示

state变量被声明为volatile,确保了可见性。所有对state的修改都应该使用CAS操作来保证原子性。

等待队列机制

AQS维护了一个FIFO的双向链表作为等待队列,当线程获取同步状态失败时,会被包装成Node节点加入到队列尾部,并阻塞等待。

Node节点结构

Node节点定义
static final class Node {
    // 共享模式标记
    static final Node SHARED = new Node();
    // 独占模式标记
    static final Node EXCLUSIVE = null;

    // 节点状态常量
    static final int CANCELLED =  1;  // 节点已取消
    static final int SIGNAL    = -1;  // 后继节点需要唤醒
    static final int CONDITION = -2;  // 节点在条件队列中
    static final int PROPAGATE = -3;  // 共享模式下需要传播

    volatile int waitStatus;          // 节点状态
    volatile Node prev;               // 前驱节点
    volatile Node next;               // 后继节点
    volatile Thread thread;           // 关联的线程
    Node nextWaiter;                  // 条件队列中的下一个节点
}

队列操作

入队操作
线程获取同步状态失败时,会被包装成Node节点,通过CAS操作加入到队列尾部。
唤醒机制
当同步状态被释放时,会唤醒队列中的头节点,头节点获取成功后会唤醒后继节点。
取消机制
支持中断和超时,被取消的节点会被从队列中移除。

独占锁和共享锁

AQS支持两种同步模式:独占模式(Exclusive)和共享模式(Shared)。独占模式下,同一时刻只有一个线程能够获取同步状态;共享模式下,多个线程可以同时获取同步状态。

独占模式

独占模式实现
// 独占模式获取
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// 独占模式释放
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 子类实现的获取方法
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

// 子类实现的释放方法
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

共享模式

共享模式实现
// 共享模式获取
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// 共享模式释放
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// 子类实现的共享获取方法
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}

// 子类实现的共享释放方法
protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}

模式对比

独占模式
如ReentrantLock,同一时刻只有一个线程能够获取锁,具有排他性。
共享模式
如Semaphore、CountDownLatch,多个线程可以同时获取同步状态。

自定义同步器

通过继承AQS并实现相应的方法,我们可以创建自定义的同步器。下面演示如何实现一个简单的互斥锁。

自定义互斥锁
public class CustomMutex {
    private final Sync sync = new Sync();
    
    // 内部同步器
    private static class Sync extends AbstractQueuedSynchronizer {
        // 是否处于独占状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }
        
        // 尝试获取锁
        protected boolean tryAcquire(int acquires) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        // 尝试释放锁
        protected boolean tryRelease(int releases) {
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        
        // 创建条件变量
        Condition newCondition() {
            return new ConditionObject();
        }
    }
    
    // 公共接口
    public void lock() {
        sync.acquire(1);
    }
    
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }
    
    public void unlock() {
        sync.release(1);
    }
    
    public boolean isLocked() {
        return sync.isHeldExclusively();
    }
    
    public Condition newCondition() {
        return sync.newCondition();
    }
}

实现要点

实战应用

理解AQS的原理对于使用和优化Java并发程序非常重要。下面是一些实际应用中的考虑因素。

性能优化建议

减少竞争
合理设计锁的粒度,避免不必要的锁竞争,考虑使用读写锁分离读写操作。
公平性权衡
根据应用场景选择公平锁或非公平锁,非公平锁通常有更好的性能。
超时机制
合理使用带超时的获取方法,避免线程无限期等待。
最佳实践
  • 优先使用JUC包提供的同步器,而不是自己实现
  • 理解不同同步器的适用场景和性能特点
  • 在自定义同步器时,仔细考虑状态的定义和转换
  • 充分测试并发场景下的正确性和性能
上一章:ThreadLocal详解 返回目录 下一章:并发编程模式