第7章

🔄 线程间通信

掌握多种线程间通信方式,实现高效的线程协作和数据交换

学习目标

共享变量通信

共享变量是线程间通信最基本的方式。多个线程通过访问同一个变量来实现数据交换和状态同步。为了保证线程安全,需要使用适当的同步机制。

核心概念

共享变量通信的关键在于保证可见性和原子性,避免数据竞争和不一致状态。

volatile变量

volatile关键字保证变量的可见性和有序性,适用于简单的状态标志和单一变量的读写操作。

VolatileExample.java
public class VolatileExample {
    private volatile boolean running = true;
    private volatile int counter = 0;
    
    public void start() {
        new Thread(() -> {
            while (running) {
                counter++;
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            System.out.println("Worker thread stopped, counter: " + counter);
        }).start();
    }
    
    public void stop() {
        running = false;
    }
    
    public int getCounter() {
        return counter;
    }
}

原子变量

java.util.concurrent.atomic包提供了线程安全的原子变量,支持无锁的原子操作。

AtomicExample.java
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

public class AtomicExample {
    private AtomicInteger counter = new AtomicInteger(0);
    private AtomicReference message = new AtomicReference<>("初始消息");
    
    public void incrementCounter() {
        int newValue = counter.incrementAndGet();
        System.out.println("Counter incremented to: " + newValue);
    }
    
    public void updateMessage(String newMessage) {
        String oldMessage = message.getAndSet(newMessage);
        System.out.println("Message updated from '" + oldMessage + "' to '" + newMessage + "'");
    }
    
    public void compareAndSetMessage(String expected, String newMessage) {
        boolean updated = message.compareAndSet(expected, newMessage);
        System.out.println("Message update " + (updated ? "successful" : "failed"));
    }
}

管道通信

管道通信通过PipedInputStream和PipedOutputStream实现线程间的数据流传输,适用于生产者-消费者模式。

字节流管道
PipedInputStream和PipedOutputStream用于传输字节数据。
字符流管道
PipedReader和PipedWriter用于传输字符数据。
PipeExample.java
import java.io.*;

public class PipeExample {
    public static void main(String[] args) throws IOException {
        PipedOutputStream outputStream = new PipedOutputStream();
        PipedInputStream inputStream = new PipedInputStream(outputStream);
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            try (PrintWriter writer = new PrintWriter(outputStream)) {
                for (int i = 1; i <= 5; i++) {
                    String message = "Message " + i;
                    writer.println(message);
                    writer.flush();
                    System.out.println("Sent: " + message);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream))) {
                String message;
                while ((message = reader.readLine()) != null) {
                    System.out.println("Received: " + message);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        
        producer.start();
        consumer.start();
    }
}

信号量Semaphore

Semaphore是一个计数信号量,用于控制同时访问特定资源的线程数量。它维护了一组许可证,线程在访问资源前必须获取许可证。

应用场景
  • 连接池管理:限制同时使用的数据库连接数
  • 限流控制:控制并发请求的数量
  • 资源池:管理有限的共享资源
SemaphoreExample.java
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreExample {
    // 创建一个许可证数量为3的信号量
    private static final Semaphore semaphore = new Semaphore(3);
    
    public static void main(String[] args) {
        // 启动10个线程模拟并发访问
        for (int i = 1; i <= 10; i++) {
            new Thread(new Worker(i)).start();
        }
    }
    
    static class Worker implements Runnable {
        private final int id;
        
        public Worker(int id) {
            this.id = id;
        }
        
        @Override
        public void run() {
            try {
                // 获取许可证
                semaphore.acquire();
                System.out.println("Worker " + id + " 获得许可证,开始工作");
                
                // 模拟工作
                TimeUnit.SECONDS.sleep(2);
                
                System.out.println("Worker " + id + " 完成工作,释放许可证");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                // 释放许可证
                semaphore.release();
            }
        }
    }
}

同步辅助类

Java并发包提供了多个同步辅助类,用于实现复杂的线程协调场景。

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。它维护一个计数器,当计数器减到0时,等待的线程被唤醒。

CountDownLatchExample.java
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

public class CountDownLatchExample {
    public static void main(String[] args) throws InterruptedException {
        int workerCount = 5;
        CountDownLatch latch = new CountDownLatch(workerCount);
        
        System.out.println("主线程等待所有工作线程完成...");
        
        // 启动工作线程
        for (int i = 1; i <= workerCount; i++) {
            new Thread(new Worker(i, latch)).start();
        }
        
        // 等待所有工作线程完成
        latch.await();
        System.out.println("所有工作线程已完成,主线程继续执行");
    }
    
    static class Worker implements Runnable {
        private final int id;
        private final CountDownLatch latch;
        
        public Worker(int id, CountDownLatch latch) {
            this.id = id;
            this.latch = latch;
        }
        
        @Override
        public void run() {
            try {
                System.out.println("Worker " + id + " 开始工作");
                // 模拟工作时间
                TimeUnit.SECONDS.sleep(2);
                System.out.println("Worker " + id + " 完成工作");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                // 计数器减1
                latch.countDown();
            }
        }
    }
}

CyclicBarrier

CyclicBarrier允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同,CyclicBarrier可以重复使用。

CyclicBarrierExample.java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierExample {
    public static void main(String[] args) {
        int participantCount = 4;
        
        // 创建屏障,当4个线程都到达时执行屏障动作
        CyclicBarrier barrier = new CyclicBarrier(participantCount, () -> {
            System.out.println("所有参与者都已到达屏障,开始下一阶段!");
        });
        
        // 启动参与者线程
        for (int i = 1; i <= participantCount; i++) {
            new Thread(new Participant(i, barrier)).start();
        }
    }
    
    static class Participant implements Runnable {
        private final int id;
        private final CyclicBarrier barrier;
        
        public Participant(int id, CyclicBarrier barrier) {
            this.id = id;
            this.barrier = barrier;
        }
        
        @Override
        public void run() {
            try {
                // 第一阶段工作
                System.out.println("参与者 " + id + " 开始第一阶段工作");
                TimeUnit.SECONDS.sleep((long) (Math.random() * 3 + 1));
                System.out.println("参与者 " + id + " 完成第一阶段,等待其他参与者");
                
                // 等待其他参与者
                barrier.await();
                
                // 第二阶段工作
                System.out.println("参与者 " + id + " 开始第二阶段工作");
                TimeUnit.SECONDS.sleep((long) (Math.random() * 2 + 1));
                System.out.println("参与者 " + id + " 完成第二阶段");
                
            } catch (InterruptedException | BrokenBarrierException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

数据交换Exchanger

Exchanger提供了一个同步点,两个线程可以在此交换数据。每个线程在进入exchange方法后会等待另一个线程到达,然后交换数据。

使用场景

Exchanger适用于两个线程之间需要定期交换数据的场景,如生产者-消费者模式中的缓冲区交换。

ExchangerExample.java
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;

public class ExchangerExample {
    public static void main(String[] args) {
        Exchanger exchanger = new Exchanger<>();
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 3; i++) {
                    String data = "Data-" + i;
                    System.out.println("生产者准备交换数据: " + data);
                    
                    // 交换数据
                    String received = exchanger.exchange(data);
                    System.out.println("生产者收到: " + received);
                    
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 3; i++) {
                    String feedback = "Feedback-" + i;
                    System.out.println("消费者准备交换反馈: " + feedback);
                    
                    // 交换数据
                    String received = exchanger.exchange(feedback);
                    System.out.println("消费者收到: " + received);
                    
                    TimeUnit.SECONDS.sleep(1);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        
        producer.start();
        consumer.start();
    }
}

实践建议

选择合适的通信方式
根据具体场景选择最适合的线程间通信机制,避免过度设计。
避免死锁
合理设计锁的获取顺序,使用超时机制避免无限等待。
性能考虑
评估不同通信方式的性能开销,选择最优方案。
最佳实践
  • 优先使用高级同步工具:如CountDownLatch、CyclicBarrier等
  • 避免忙等待:使用阻塞操作而不是轮询
  • 合理设置超时:避免无限期等待
  • 异常处理:正确处理InterruptedException
  • 资源清理:确保在finally块中释放资源
上一章:锁机制详解 返回目录 下一章:线程池基础