第7章

🔄 Condition条件变量

掌握线程间协调通信的高级工具,实现精确的线程控制

学习目标

Condition接口概述

Condition接口是Java并发包中提供的条件变量实现,它必须与Lock配合使用,提供了比Object.wait/notify更加灵活和强大的线程间通信机制。Condition允许线程在特定条件下等待,并在条件满足时被唤醒。

核心概念

Condition提供了一种让线程等待特定条件成立的机制,相比Object.wait/notify,它支持多个等待队列,提供更精确的线程控制。

Condition的主要方法

await()方法
使当前线程等待,直到被signal或interrupt,类似Object.wait()。
signal()方法
唤醒一个等待的线程,类似Object.notify()。
signalAll()方法
唤醒所有等待的线程,类似Object.notifyAll()。

基本使用示例

ConditionBasic.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionBasic {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    private boolean ready = false;

    public void waitForReady() throws InterruptedException {
        lock.lock();
        try {
            while (!ready) {
                System.out.println(Thread.currentThread().getName() + " 等待条件满足");
                condition.await(); // 等待条件
            }
            System.out.println(Thread.currentThread().getName() + " 条件已满足,继续执行");
        } finally {
            lock.unlock();
        }
    }

    public void setReady() {
        lock.lock();
        try {
            ready = true;
            System.out.println("条件已设置为ready");
            condition.signalAll(); // 唤醒所有等待的线程
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionBasic example = new ConditionBasic();

        // 启动等待线程
        Thread waiter1 = new Thread(() -> {
            try {
                example.waitForReady();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Waiter-1");

        Thread waiter2 = new Thread(() -> {
            try {
                example.waitForReady();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Waiter-2");

        waiter1.start();
        waiter2.start();

        // 等待2秒后设置条件
        Thread.sleep(2000);
        example.setReady();

        waiter1.join();
        waiter2.join();
    }
}

await和signal机制

await和signal是Condition的核心机制,它们提供了线程间的协调通信能力。理解这两个方法的工作原理对于正确使用Condition至关重要。

await方法详解

signal机制

signal()
  • 唤醒等待队列中的一个线程
  • 被唤醒的线程需要重新获取锁
  • 适用于只需要唤醒一个线程的场景
signalAll()
  • 唤醒等待队列中的所有线程
  • 所有线程竞争获取锁
  • 适用于条件变化影响多个线程的场景
重要提醒

使用await时必须在while循环中检查条件,因为可能发生虚假唤醒(spurious wakeup)。这是一种防御性编程的最佳实践。

生产者消费者模式实现

生产者消费者模式是并发编程中的经典模式,使用Condition可以实现更加精确和高效的控制。下面展示一个完整的有界缓冲区实现。

BoundedBuffer.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BoundedBuffer {
    private final Object[] buffer;
    private int putIndex = 0;
    private int takeIndex = 0;
    private int count = 0;
    
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    public BoundedBuffer(int capacity) {
        buffer = new Object[capacity];
    }
    
    public void put(T item) throws InterruptedException {
        lock.lock();
        try {
            // 等待缓冲区不满
            while (count == buffer.length) {
                System.out.println(Thread.currentThread().getName() + " 等待缓冲区不满");
                notFull.await();
            }
            
            buffer[putIndex] = item;
            putIndex = (putIndex + 1) % buffer.length;
            count++;
            
            System.out.println(Thread.currentThread().getName() + " 生产了: " + item + ", 当前数量: " + count);
            
            // 通知消费者
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
    }
    
    @SuppressWarnings("unchecked")
    public T take() throws InterruptedException {
        lock.lock();
        try {
            // 等待缓冲区不空
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " 等待缓冲区不空");
                notEmpty.await();
            }
            
            T item = (T) buffer[takeIndex];
            buffer[takeIndex] = null;
            takeIndex = (takeIndex + 1) % buffer.length;
            count--;
            
            System.out.println(Thread.currentThread().getName() + " 消费了: " + item + ", 当前数量: " + count);
            
            // 通知生产者
            notFull.signal();
            
            return item;
        } finally {
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        BoundedBuffer buffer = new BoundedBuffer<>(3);
        
        // 生产者线程
        Thread producer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.put("Item-P1-" + i);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer-1");
        
        Thread producer2 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.put("Item-P2-" + i);
                    Thread.sleep(150);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer-2");
        
        // 消费者线程
        Thread consumer1 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.take();
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer-1");
        
        Thread consumer2 = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    buffer.take();
                    Thread.sleep(300);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer-2");
        
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
    }
}

多条件变量的使用

一个Lock可以创建多个Condition,每个Condition维护自己的等待队列。这允许我们对不同的条件进行精确控制,提高程序的效率和可读性。

MultiConditionExample.java
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class MultiConditionExample {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition readerCondition = lock.newCondition();
    private final Condition writerCondition = lock.newCondition();
    
    private int readers = 0;
    private boolean writing = false;
    
    public void readLock() throws InterruptedException {
        lock.lock();
        try {
            // 等待没有写操作
            while (writing) {
                System.out.println(Thread.currentThread().getName() + " 等待写操作完成");
                readerCondition.await();
            }
            readers++;
            System.out.println(Thread.currentThread().getName() + " 获得读锁,当前读者数: " + readers);
        } finally {
            lock.unlock();
        }
    }
    
    public void readUnlock() {
        lock.lock();
        try {
            readers--;
            System.out.println(Thread.currentThread().getName() + " 释放读锁,当前读者数: " + readers);
            if (readers == 0) {
                // 没有读者了,唤醒等待的写者
                writerCondition.signal();
            }
        } finally {
            lock.unlock();
        }
    }
    
    public void writeLock() throws InterruptedException {
        lock.lock();
        try {
            // 等待没有读者和写者
            while (readers > 0 || writing) {
                System.out.println(Thread.currentThread().getName() + " 等待读写操作完成");
                writerCondition.await();
            }
            writing = true;
            System.out.println(Thread.currentThread().getName() + " 获得写锁");
        } finally {
            lock.unlock();
        }
    }
    
    public void writeUnlock() {
        lock.lock();
        try {
            writing = false;
            System.out.println(Thread.currentThread().getName() + " 释放写锁");
            // 优先唤醒写者,如果没有写者再唤醒读者
            writerCondition.signal();
            readerCondition.signalAll();
        } finally {
            lock.unlock();
        }
    }
    
    public static void main(String[] args) {
        MultiConditionExample rwLock = new MultiConditionExample();
        
        // 读者线程
        for (int i = 1; i <= 3; i++) {
            final int readerId = i;
            new Thread(() -> {
                try {
                    rwLock.readLock();
                    Thread.sleep(1000); // 模拟读操作
                    rwLock.readUnlock();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Reader-" + readerId).start();
        }
        
        // 写者线程
        for (int i = 1; i <= 2; i++) {
            final int writerId = i;
            new Thread(() -> {
                try {
                    Thread.sleep(500); // 延迟启动
                    rwLock.writeLock();
                    Thread.sleep(1000); // 模拟写操作
                    rwLock.writeUnlock();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Writer-" + writerId).start();
        }
    }
}

Condition vs Object.wait/notify

虽然Condition和Object.wait/notify都能实现线程间的协调,但它们在功能和使用方式上有显著差异。

功能对比

Condition的优势
  • 支持多个等待队列
  • 提供更多的等待方法(超时、不可中断等)
  • 与Lock配合使用,支持公平锁
  • 更好的性能和可扩展性
  • 更清晰的语义
Object.wait/notify的限制
  • 只有一个等待队列
  • 必须在synchronized块中使用
  • notify()随机唤醒一个线程
  • 不支持超时等待
  • 性能相对较差

使用建议

最佳实践
  • 新项目推荐使用Condition:功能更强大,性能更好
  • 需要多个等待条件:必须使用Condition
  • 需要超时等待:Condition提供更多选择
  • 性能敏感场景:Condition通常性能更好
  • 遗留代码维护:可以继续使用Object.wait/notify

最佳实践和注意事项

使用原则

常见陷阱

虚假唤醒
线程可能在条件未满足时被唤醒,必须在while循环中检查条件。
锁泄漏
忘记在finally块中释放锁,可能导致死锁。
信号丢失
在线程开始等待之前就发送了信号,导致线程永久等待。
性能优化建议
  • 使用signal()而不是signalAll(),当只需要唤醒一个线程时
  • 合理设计等待条件,避免不必要的唤醒
  • 考虑使用公平锁,在某些场景下可以提高性能
  • 避免在持有锁的情况下执行耗时操作
上一章:读写锁 返回目录 下一章:原子类