第13章

🔧 同步工具类

掌握CountDownLatch、CyclicBarrier、Semaphore等高级同步工具的使用场景和实战应用

学习目标

CountDownLatch - 倒计时门闩

CountDownLatch是一个同步工具类,它允许一个或多个线程等待其他线程完成操作。它通过一个计数器来实现,计数器的初始值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程都已经完成了任务,然后在闭锁上等待的线程就可以恢复执行任务。

核心特点

CountDownLatch是一次性的,计数器的值只能在构造方法中初始化一次,之后没有任何机制再次对其设置值,当CountDownLatch使用完毕后,它不能再次被使用。

主要方法

countDown()
递减锁存器的计数,如果计数到达零,则释放所有等待的线程。
await()
使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断。
await(timeout, unit)
使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间。

实战示例:并行任务协调

CountDownLatchDemo.java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CountDownLatchDemo {
    private static final int THREAD_COUNT = 5;
    
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(THREAD_COUNT);
        
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        
        // 创建工作线程
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    // 等待开始信号
                    startSignal.await();
                    System.out.println("任务 " + taskId + " 开始执行");
                    
                    // 模拟任务执行
                    Thread.sleep((long) (Math.random() * 3000));
                    System.out.println("任务 " + taskId + " 执行完成");
                    
                    // 任务完成,计数器减1
                    doneSignal.countDown();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        System.out.println("主线程准备启动所有任务...");
        Thread.sleep(1000);
        
        // 发出开始信号
        startSignal.countDown();
        
        // 等待所有任务完成
        doneSignal.await();
        System.out.println("所有任务执行完成!");
        
        executor.shutdown();
    }
}

CyclicBarrier - 循环屏障

CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时CyclicBarrier很有用。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

循环特性

与CountDownLatch不同,CyclicBarrier可以重复使用。当所有线程都到达屏障点后,屏障会自动重置,可以进行下一轮的同步。

使用场景

实战示例:多阶段数据处理

CyclicBarrierDemo.java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CyclicBarrierDemo {
    private static final int THREAD_COUNT = 3;
    private static final int PHASE_COUNT = 3;
    
    public static void main(String[] args) {
        // 创建屏障,当3个线程都到达时执行屏障动作
        CyclicBarrier barrier = new CyclicBarrier(THREAD_COUNT, () -> {
            System.out.println("=== 所有线程完成当前阶段,进入下一阶段 ===");
        });
        
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
        
        // 创建工作线程
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int workerId = i;
            executor.submit(() -> {
                try {
                    for (int phase = 1; phase <= PHASE_COUNT; phase++) {
                        System.out.println("工作线程 " + workerId + " 开始阶段 " + phase);
                        
                        // 模拟工作
                        Thread.sleep((long) (Math.random() * 2000));
                        
                        System.out.println("工作线程 " + workerId + " 完成阶段 " + phase);
                        
                        // 等待其他线程完成当前阶段
                        barrier.await();
                    }
                    System.out.println("工作线程 " + workerId + " 完成所有阶段");
                } catch (InterruptedException | BrokenBarrierException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

Semaphore - 信号量

Semaphore是一个计数信号量,从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个acquire(),然后再获取该许可。每个release()添加一个许可,从而可能释放一个正在阻塞的获取者。

资源控制

Semaphore通常用于限制可以访问某些资源的线程数目。例如,数据库连接池、线程池等场景。

主要方法

acquire()
从此信号量获取一个许可,在提供一个许可前一直将线程阻塞,否则线程被中断。
release()
释放一个许可,将其返回给信号量。
availablePermits()
返回此信号量中当前可用的许可数。

实战示例:资源池管理

SemaphoreDemo.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    // 模拟数据库连接池,最多允许3个连接
    private static final Semaphore semaphore = new Semaphore(3);
    
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        // 模拟10个客户端请求数据库连接
        for (int i = 0; i < 10; i++) {
            final int clientId = i;
            executor.submit(() -> {
                try {
                    System.out.println("客户端 " + clientId + " 请求数据库连接...");
                    
                    // 获取许可(数据库连接)
                    semaphore.acquire();
                    
                    System.out.println("客户端 " + clientId + " 获得数据库连接,开始执行查询");
                    System.out.println("当前可用连接数: " + semaphore.availablePermits());
                    
                    // 模拟数据库操作
                    Thread.sleep((long) (Math.random() * 3000));
                    
                    System.out.println("客户端 " + clientId + " 完成查询,释放连接");
                    
                    // 释放许可(归还数据库连接)
                    semaphore.release();
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
}

Exchanger - 数据交换器

Exchanger是一个用于两个工作线程之间交换数据的封装工具类,简单说就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到来时才能彼此交换对应数据。

使用场景

实战示例:数据交换

ExchangerDemo.java
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerDemo {
    private static final Exchanger exchanger = new Exchanger<>();
    
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 生产者线程
        executor.submit(() -> {
            try {
                String data = "生产者数据";
                System.out.println("生产者准备数据: " + data);
                
                // 交换数据
                String exchangedData = exchanger.exchange(data);
                
                System.out.println("生产者收到交换数据: " + exchangedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者线程
        executor.submit(() -> {
            try {
                Thread.sleep(2000); // 模拟处理时间
                
                String data = "消费者数据";
                System.out.println("消费者准备数据: " + data);
                
                // 交换数据
                String exchangedData = exchanger.exchange(data);
                
                System.out.println("消费者收到交换数据: " + exchangedData);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        executor.shutdown();
    }
}

Phaser - 多阶段同步器

Phaser是JDK 7中新增的一个同步辅助类,它可以实现CyclicBarrier和CountDownLatch类似的功能,而且它支持对任务的动态调整,并支持分层结构来达到更高的吞吐量。

高级特性

Phaser支持动态调整参与同步的线程数量,可以在运行时注册和注销参与者,还支持分层结构来提高性能。

主要方法

register()
动态注册一个新的参与者。
arriveAndAwaitAdvance()
到达并等待其他参与者到达。
arriveAndDeregister()
到达并注销,不再参与后续阶段。

实战示例:多阶段任务处理

PhaserDemo.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class PhaserDemo {
    public static void main(String[] args) {
        final int TASK_COUNT = 3;
        final int PHASE_COUNT = 3;
        
        // 创建Phaser,初始参与者数量为TASK_COUNT
        Phaser phaser = new Phaser(TASK_COUNT) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("=== 阶段 " + (phase + 1) + " 完成,参与者数量: " + registeredParties + " ===");
                return phase >= PHASE_COUNT - 1; // 3个阶段后终止
            }
        };
        
        ExecutorService executor = Executors.newFixedThreadPool(TASK_COUNT);
        
        // 创建任务
        for (int i = 0; i < TASK_COUNT; i++) {
            final int taskId = i;
            executor.submit(() -> {
                for (int phase = 0; phase < PHASE_COUNT; phase++) {
                    System.out.println("任务 " + taskId + " 执行阶段 " + (phase + 1));
                    
                    // 模拟工作
                    try {
                        Thread.sleep((long) (Math.random() * 2000));
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                    
                    System.out.println("任务 " + taskId + " 完成阶段 " + (phase + 1));
                    
                    // 到达并等待其他任务完成当前阶段
                    phaser.arriveAndAwaitAdvance();
                }
                System.out.println("任务 " + taskId + " 完成所有阶段");
            });
        }
        
        executor.shutdown();
    }
}

同步工具类对比总结

CountDownLatch
  • 一次性使用
  • 等待多个任务完成
  • 适用于启动信号和完成信号
CyclicBarrier
  • 可重复使用
  • 多线程互相等待
  • 适用于分阶段任务
Semaphore
  • 控制资源访问数量
  • 支持公平和非公平模式
  • 适用于资源池管理
Exchanger
  • 两个线程数据交换
  • 同步点数据传递
  • 适用于数据校验场景
Phaser
  • 动态参与者管理
  • 多阶段同步
  • 支持分层结构
上一章:并发集合 返回目录 下一章:Fork/Join框架