第17章

🎭 并发编程模式

掌握高级并发设计模式,提升并发编程的设计能力和代码质量

学习目标

单例模式的线程安全

单例模式是最常用的设计模式之一,但在多线程环境下,传统的单例实现可能存在线程安全问题。我们需要掌握各种线程安全的单例实现方式。

懒汉式单例

线程不安全的懒汉式
UnsafeLazySingleton.java
public class UnsafeLazySingleton {
    private static UnsafeLazySingleton instance;
    
    private UnsafeLazySingleton() {}
    
    // 线程不安全:多个线程可能同时创建实例
    public static UnsafeLazySingleton getInstance() {
        if (instance == null) {
            instance = new UnsafeLazySingleton();
        }
        return instance;
    }
}
线程安全的懒汉式(同步方法)
SafeLazySingleton.java
public class SafeLazySingleton {
    private static SafeLazySingleton instance;
    
    private SafeLazySingleton() {}
    
    // 线程安全但性能较差:每次调用都需要同步
    public static synchronized SafeLazySingleton getInstance() {
        if (instance == null) {
            instance = new SafeLazySingleton();
        }
        return instance;
    }
}

双重检查锁定

双重检查锁定(Double-Checked Locking)是一种优化的懒汉式实现,既保证线程安全又提高性能。

DoubleCheckedSingleton.java
public class DoubleCheckedSingleton {
    // 使用volatile确保可见性和禁止指令重排序
    private static volatile DoubleCheckedSingleton instance;
    
    private DoubleCheckedSingleton() {}
    
    public static DoubleCheckedSingleton getInstance() {
        // 第一次检查:避免不必要的同步
        if (instance == null) {
            synchronized (DoubleCheckedSingleton.class) {
                // 第二次检查:确保只创建一个实例
                if (instance == null) {
                    instance = new DoubleCheckedSingleton();
                }
            }
        }
        return instance;
    }
}

饿汉式单例

饿汉式单例在类加载时就创建实例,天然线程安全,但可能造成资源浪费。

EagerSingleton.java
public class EagerSingleton {
    // 类加载时就创建实例,线程安全
    private static final EagerSingleton INSTANCE = new EagerSingleton();
    
    private EagerSingleton() {}
    
    public static EagerSingleton getInstance() {
        return INSTANCE;
    }
}

静态内部类单例

利用类加载机制保证线程安全,同时实现懒加载。

StaticInnerClassSingleton.java
public class StaticInnerClassSingleton {
    private StaticInnerClassSingleton() {}
    
    // 静态内部类,只有在被引用时才会加载
    private static class SingletonHolder {
        private static final StaticInnerClassSingleton INSTANCE = 
            new StaticInnerClassSingleton();
    }
    
    public static StaticInnerClassSingleton getInstance() {
        return SingletonHolder.INSTANCE;
    }
}

枚举单例

枚举实现的单例是最安全的,能防止反射和序列化攻击。

EnumSingleton.java
public enum EnumSingleton {
    INSTANCE;
    
    public void doSomething() {
        System.out.println("枚举单例执行操作");
    }
}

// 使用方式
EnumSingleton.INSTANCE.doSomething();

不变性模式

不变性模式通过创建不可变对象来避免并发问题。不可变对象一旦创建就不能修改,天然线程安全。

不可变对象的特征

ImmutablePerson.java
public final class ImmutablePerson {
    private final String name;
    private final int age;
    private final List<String> hobbies;
    
    public ImmutablePerson(String name, int age, List<String> hobbies) {
        this.name = name;
        this.age = age;
        // 防御性复制,避免外部修改
        this.hobbies = Collections.unmodifiableList(new ArrayList<>(hobbies));
    }
    
    public String getName() {
        return name;
    }
    
    public int getAge() {
        return age;
    }
    
    public List<String> getHobbies() {
        // 返回不可修改的视图
        return hobbies;
    }
    
    // 修改操作返回新对象
    public ImmutablePerson withAge(int newAge) {
        return new ImmutablePerson(this.name, newAge, 
            new ArrayList<>(this.hobbies));
    }
}

使用Builder模式创建不可变对象

ImmutablePersonBuilder.java
public final class ImmutablePersonBuilder {
    private final String name;
    private final int age;
    private final List<String> hobbies;
    
    private ImmutablePersonBuilder(Builder builder) {
        this.name = builder.name;
        this.age = builder.age;
        this.hobbies = Collections.unmodifiableList(builder.hobbies);
    }
    
    public static class Builder {
        private String name;
        private int age;
        private List<String> hobbies = new ArrayList<>();
        
        public Builder setName(String name) {
            this.name = name;
            return this;
        }
        
        public Builder setAge(int age) {
            this.age = age;
            return this;
        }
        
        public Builder addHobby(String hobby) {
            this.hobbies.add(hobby);
            return this;
        }
        
        public ImmutablePersonBuilder build() {
            return new ImmutablePersonBuilder(this);
        }
    }
    
    // getter方法...
}

线程特有存储模式

线程特有存储模式通过为每个线程提供独立的存储空间来避免共享状态,ThreadLocal是这种模式的典型实现。

ThreadLocal基本使用

ThreadLocalExample.java
public class ThreadLocalExample {
    // 为每个线程提供独立的SimpleDateFormat实例
    private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = 
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    
    // 用户上下文信息
    private static final ThreadLocal<UserContext> USER_CONTEXT = new ThreadLocal<>();
    
    public static String formatDate(Date date) {
        return DATE_FORMAT.get().format(date);
    }
    
    public static void setUserContext(UserContext context) {
        USER_CONTEXT.set(context);
    }
    
    public static UserContext getUserContext() {
        return USER_CONTEXT.get();
    }
    
    public static void clearUserContext() {
        USER_CONTEXT.remove();
    }
    
    static class UserContext {
        private final String userId;
        private final String userName;
        
        public UserContext(String userId, String userName) {
            this.userId = userId;
            this.userName = userName;
        }
        
        // getter方法...
    }
}

自定义ThreadLocal实现

CustomThreadLocal.java
public class CustomThreadLocal<T> {
    private final Map<Thread, T> storage = new ConcurrentHashMap<>();
    
    public void set(T value) {
        storage.put(Thread.currentThread(), value);
    }
    
    public T get() {
        return storage.get(Thread.currentThread());
    }
    
    public void remove() {
        storage.remove(Thread.currentThread());
    }
    
    // 清理已终止线程的数据
    public void cleanup() {
        storage.entrySet().removeIf(entry -> !entry.getKey().isAlive());
    }
}
ThreadLocal使用注意事项
  • 内存泄漏:使用完毕后要调用remove()方法清理
  • 线程池环境:线程复用可能导致数据污染
  • 父子线程:普通ThreadLocal不会传递给子线程
  • 性能考虑:过多的ThreadLocal变量会影响性能

Master-Worker模式

Master-Worker模式将一个大任务分解为多个小任务,由Master负责任务分发和结果汇总,Worker负责具体的任务执行。

基本实现

MasterWorkerPattern.java
public class MasterWorkerPattern {
    
    // 任务接口
    public interface Task<T, R> {
        R execute(T input);
    }
    
    // Worker类
    public static class Worker<T, R> implements Runnable {
        private final BlockingQueue<T> taskQueue;
        private final BlockingQueue<R> resultQueue;
        private final Task<T, R> task;
        private volatile boolean running = true;
        
        public Worker(BlockingQueue<T> taskQueue, 
                     BlockingQueue<R> resultQueue, 
                     Task<T, R> task) {
            this.taskQueue = taskQueue;
            this.resultQueue = resultQueue;
            this.task = task;
        }
        
        @Override
        public void run() {
            while (running) {
                try {
                    T input = taskQueue.poll(1, TimeUnit.SECONDS);
                    if (input != null) {
                        R result = task.execute(input);
                        resultQueue.offer(result);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        public void stop() {
            running = false;
        }
    }
    
    // Master类
    public static class Master<T, R> {
        private final BlockingQueue<T> taskQueue = new LinkedBlockingQueue<>();
        private final BlockingQueue<R> resultQueue = new LinkedBlockingQueue<>();
        private final List<Worker<T, R>> workers = new ArrayList<>();
        private final ExecutorService executor;
        
        public Master(int workerCount, Task<T, R> task) {
            this.executor = Executors.newFixedThreadPool(workerCount);
            
            // 创建并启动Worker
            for (int i = 0; i < workerCount; i++) {
                Worker<T, R> worker = new Worker<>(taskQueue, resultQueue, task);
                workers.add(worker);
                executor.submit(worker);
            }
        }
        
        public void submitTask(T task) {
            taskQueue.offer(task);
        }
        
        public List<R> getResults(int expectedCount, long timeout, TimeUnit unit) 
                throws InterruptedException {
            List<R> results = new ArrayList<>();
            long deadline = System.nanoTime() + unit.toNanos(timeout);
            
            while (results.size() < expectedCount && System.nanoTime() < deadline) {
                R result = resultQueue.poll(100, TimeUnit.MILLISECONDS);
                if (result != null) {
                    results.add(result);
                }
            }
            
            return results;
        }
        
        public void shutdown() {
            workers.forEach(Worker::stop);
            executor.shutdown();
        }
    }
}

使用示例

MasterWorkerExample.java
public class MasterWorkerExample {
    public static void main(String[] args) throws InterruptedException {
        // 创建计算平方的任务
        Task<Integer, Integer> squareTask = input -> {
            try {
                Thread.sleep(100); // 模拟计算时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return input * input;
        };
        
        // 创建Master,使用4个Worker
        Master<Integer, Integer> master = new Master<>(4, squareTask);
        
        // 提交任务
        for (int i = 1; i <= 10; i++) {
            master.submitTask(i);
        }
        
        // 获取结果
        List<Integer> results = master.getResults(10, 5, TimeUnit.SECONDS);
        System.out.println("计算结果: " + results);
        
        // 关闭Master
        master.shutdown();
    }
}

Actor模式

Actor模式是一种并发计算模型,每个Actor都是独立的计算单元,通过消息传递进行通信,避免了共享状态的并发问题。

Actor模式的特点

消息传递
Actor之间只通过异步消息进行通信,不共享状态。
状态封装
每个Actor封装自己的状态,外部无法直接访问。
并发处理
多个Actor可以并发执行,提高系统吞吐量。

简单Actor实现

SimpleActor.java
public abstract class SimpleActor {
    private final BlockingQueue<Object> mailbox = new LinkedBlockingQueue<>();
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private volatile boolean running = true;
    
    public SimpleActor() {
        executor.submit(this::messageLoop);
    }
    
    // 发送消息
    public void tell(Object message) {
        if (running) {
            mailbox.offer(message);
        }
    }
    
    // 消息处理循环
    private void messageLoop() {
        while (running) {
            try {
                Object message = mailbox.poll(1, TimeUnit.SECONDS);
                if (message != null) {
                    receive(message);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
    
    // 子类实现具体的消息处理逻辑
    protected abstract void receive(Object message);
    
    // 停止Actor
    public void stop() {
        running = false;
        executor.shutdown();
    }
}

计数器Actor示例

CounterActor.java
public class CounterActor extends SimpleActor {
    private int count = 0;
    
    // 消息类型
    public static class Increment {}
    public static class Decrement {}
    public static class GetCount {
        private final CompletableFuture<Integer> future;
        
        public GetCount(CompletableFuture<Integer> future) {
            this.future = future;
        }
        
        public CompletableFuture<Integer> getFuture() {
            return future;
        }
    }
    
    @Override
    protected void receive(Object message) {
        if (message instanceof Increment) {
            count++;
            System.out.println("计数器递增,当前值: " + count);
        } else if (message instanceof Decrement) {
            count--;
            System.out.println("计数器递减,当前值: " + count);
        } else if (message instanceof GetCount) {
            GetCount getCount = (GetCount) message;
            getCount.getFuture().complete(count);
        }
    }
    
    // 便捷方法
    public void increment() {
        tell(new Increment());
    }
    
    public void decrement() {
        tell(new Decrement());
    }
    
    public CompletableFuture<Integer> getCount() {
        CompletableFuture<Integer> future = new CompletableFuture<>();
        tell(new GetCount(future));
        return future;
    }
}

使用示例

ActorExample.java
public class ActorExample {
    public static void main(String[] args) throws Exception {
        CounterActor counter = new CounterActor();
        
        // 并发操作计数器
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        // 提交多个递增任务
        for (int i = 0; i < 100; i++) {
            executor.submit(counter::increment);
        }
        
        // 提交多个递减任务
        for (int i = 0; i < 50; i++) {
            executor.submit(counter::decrement);
        }
        
        // 等待一段时间让所有消息处理完成
        Thread.sleep(1000);
        
        // 获取最终计数
        CompletableFuture<Integer> finalCount = counter.getCount();
        System.out.println("最终计数: " + finalCount.get());
        
        // 清理资源
        counter.stop();
        executor.shutdown();
    }
}

模式对比和选择

单例模式
适用场景:全局唯一实例
优点:节省内存,全局访问
缺点:可能成为性能瓶颈
不变性模式
适用场景:值对象,配置信息
优点:天然线程安全
缺点:创建对象开销
ThreadLocal模式
适用场景:线程上下文,工具类
优点:避免同步开销
缺点:内存泄漏风险
Master-Worker模式
适用场景:大任务分解
优点:充分利用多核
缺点:任务分解复杂度
Actor模式
适用场景:高并发系统
优点:无共享状态
缺点:消息传递开销
选择建议
  • 性能要求高:优先考虑不变性模式和ThreadLocal模式
  • 任务可分解:使用Master-Worker模式提高并行度
  • 状态管理复杂:考虑Actor模式简化设计
  • 全局共享:使用线程安全的单例模式
  • 组合使用:根据具体场景组合多种模式
上一章:AQS抽象队列同步器 返回目录 下一章:并发性能调优