🔄 线程间通信
掌握多种线程间通信方式,实现高效的线程协作和数据交换
学习目标
- 掌握多种线程间通信方式
- 理解管道通信的原理和使用
- 学会使用信号量控制资源访问
- 掌握CountDownLatch和CyclicBarrier的使用
- 了解Exchanger的数据交换机制
共享变量通信
共享变量是线程间通信最基本的方式。多个线程通过访问同一个变量来实现数据交换和状态同步。为了保证线程安全,需要使用适当的同步机制。
共享变量通信的关键在于保证可见性和原子性,避免数据竞争和不一致状态。
volatile变量
volatile关键字保证变量的可见性和有序性,适用于简单的状态标志和单一变量的读写操作。
public class VolatileExample {
private volatile boolean running = true;
private volatile int counter = 0;
public void start() {
new Thread(() -> {
while (running) {
counter++;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("Worker thread stopped, counter: " + counter);
}).start();
}
public void stop() {
running = false;
}
public int getCounter() {
return counter;
}
}
原子变量
java.util.concurrent.atomic包提供了线程安全的原子变量,支持无锁的原子操作。
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicExample {
private AtomicInteger counter = new AtomicInteger(0);
private AtomicReference message = new AtomicReference<>("初始消息");
public void incrementCounter() {
int newValue = counter.incrementAndGet();
System.out.println("Counter incremented to: " + newValue);
}
public void updateMessage(String newMessage) {
String oldMessage = message.getAndSet(newMessage);
System.out.println("Message updated from '" + oldMessage + "' to '" + newMessage + "'");
}
public void compareAndSetMessage(String expected, String newMessage) {
boolean updated = message.compareAndSet(expected, newMessage);
System.out.println("Message update " + (updated ? "successful" : "failed"));
}
}
管道通信
管道通信通过PipedInputStream和PipedOutputStream实现线程间的数据流传输,适用于生产者-消费者模式。
import java.io.*;
public class PipeExample {
public static void main(String[] args) throws IOException {
PipedOutputStream outputStream = new PipedOutputStream();
PipedInputStream inputStream = new PipedInputStream(outputStream);
// 生产者线程
Thread producer = new Thread(() -> {
try (PrintWriter writer = new PrintWriter(outputStream)) {
for (int i = 1; i <= 5; i++) {
String message = "Message " + i;
writer.println(message);
writer.flush();
System.out.println("Sent: " + message);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
String message;
while ((message = reader.readLine()) != null) {
System.out.println("Received: " + message);
}
} catch (IOException e) {
e.printStackTrace();
}
});
producer.start();
consumer.start();
}
}
信号量Semaphore
Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。它维护了一组许可证,线程在访问资源前必须获取许可证。
- 连接池管理:限制同时使用的数据库连接数
- 限流控制:控制并发请求的数量
- 资源池:管理有限的共享资源
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreExample {
// 创建一个许可证数量为3的信号量
private static final Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
// 启动10个线程模拟并发访问
for (int i = 1; i <= 10; i++) {
new Thread(new Worker(i)).start();
}
}
static class Worker implements Runnable {
private final int id;
public Worker(int id) {
this.id = id;
}
@Override
public void run() {
try {
// 获取许可证
semaphore.acquire();
System.out.println("Worker " + id + " 获得许可证,开始工作");
// 模拟工作
TimeUnit.SECONDS.sleep(2);
System.out.println("Worker " + id + " 完成工作,释放许可证");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可证
semaphore.release();
}
}
}
}
同步辅助类
Java并发包提供了多个同步辅助类,用于实现复杂的线程协调场景。
CountDownLatch
CountDownLatch允许一个或多个线程等待其他线程完成操作。它维护一个计数器,当计数器减到0时,等待的线程被唤醒。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchExample {
public static void main(String[] args) throws InterruptedException {
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
System.out.println("主线程等待所有工作线程完成...");
// 启动工作线程
for (int i = 1; i <= workerCount; i++) {
new Thread(new Worker(i, latch)).start();
}
// 等待所有工作线程完成
latch.await();
System.out.println("所有工作线程已完成,主线程继续执行");
}
static class Worker implements Runnable {
private final int id;
private final CountDownLatch latch;
public Worker(int id, CountDownLatch latch) {
this.id = id;
this.latch = latch;
}
@Override
public void run() {
try {
System.out.println("Worker " + id + " 开始工作");
// 模拟工作时间
TimeUnit.SECONDS.sleep(2);
System.out.println("Worker " + id + " 完成工作");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 计数器减1
latch.countDown();
}
}
}
}
CyclicBarrier
CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同,CyclicBarrier可以重复使用。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class CyclicBarrierExample {
public static void main(String[] args) {
int participantCount = 4;
// 创建屏障,当4个线程都到达时执行屏障动作
CyclicBarrier barrier = new CyclicBarrier(participantCount, () -> {
System.out.println("所有参与者都已到达屏障,开始下一阶段!");
});
// 启动参与者线程
for (int i = 1; i <= participantCount; i++) {
new Thread(new Participant(i, barrier)).start();
}
}
static class Participant implements Runnable {
private final int id;
private final CyclicBarrier barrier;
public Participant(int id, CyclicBarrier barrier) {
this.id = id;
this.barrier = barrier;
}
@Override
public void run() {
try {
// 第一阶段工作
System.out.println("参与者 " + id + " 开始第一阶段工作");
TimeUnit.SECONDS.sleep((long) (Math.random() * 3 + 1));
System.out.println("参与者 " + id + " 完成第一阶段,等待其他参与者");
// 等待其他参与者
barrier.await();
// 第二阶段工作
System.out.println("参与者 " + id + " 开始第二阶段工作");
TimeUnit.SECONDS.sleep((long) (Math.random() * 2 + 1));
System.out.println("参与者 " + id + " 完成第二阶段");
} catch (InterruptedException | BrokenBarrierException e) {
Thread.currentThread().interrupt();
}
}
}
}
数据交换Exchanger
Exchanger提供了一个同步点,两个线程可以在此交换数据。每个线程在进入exchange方法后会等待另一个线程到达,然后交换数据。
Exchanger适用于两个线程之间需要定期交换数据的场景,如生产者-消费者模式中的缓冲区交换。
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
public class ExchangerExample {
public static void main(String[] args) {
Exchanger exchanger = new Exchanger<>();
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 3; i++) {
String data = "Data-" + i;
System.out.println("生产者准备交换数据: " + data);
// 交换数据
String received = exchanger.exchange(data);
System.out.println("生产者收到: " + received);
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 3; i++) {
String feedback = "Feedback-" + i;
System.out.println("消费者准备交换反馈: " + feedback);
// 交换数据
String received = exchanger.exchange(feedback);
System.out.println("消费者收到: " + received);
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producer.start();
consumer.start();
}
}
实践建议
- 优先使用高级同步工具:如CountDownLatch、CyclicBarrier等
- 避免忙等待:使用阻塞操作而不是轮询
- 合理设置超时:避免无限期等待
- 异常处理:正确处理InterruptedException
- 资源清理:确保在finally块中释放资源