第9章

⚙️ ThreadPoolExecutor详解

深入理解ThreadPoolExecutor的工作原理、核心参数配置和自定义线程池的创建

学习目标

ThreadPoolExecutor工作原理

ThreadPoolExecutor是Java并发包中最核心的线程池实现类,它提供了完整的线程池功能,包括线程的创建、管理、任务调度和资源回收。理解其工作原理是掌握线程池技术的关键。

核心理解

ThreadPoolExecutor通过维护一个工作线程池和一个任务队列,实现了任务的异步执行和线程的复用,从而提高了系统的性能和资源利用率。

内部结构组成

工作线程池
维护一组工作线程,负责执行提交的任务。线程数量根据配置参数动态调整。
任务队列
存储等待执行的任务,当所有核心线程都在忙碌时,新任务会被放入队列中等待。
线程工厂
负责创建新的工作线程,可以自定义线程的名称、优先级等属性。
拒绝策略
当线程池无法处理新任务时的处理策略,如抛出异常、丢弃任务等。

任务执行流程

// ThreadPoolExecutor任务执行流程示例
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    
    // 1. 如果当前线程数小于核心线程数,创建新线程执行任务
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    
    // 2. 如果核心线程都在忙碌,尝试将任务加入队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (!isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. 如果队列已满,尝试创建新线程(直到最大线程数)
    else if (!addWorker(command, false))
        reject(command); // 4. 如果无法创建新线程,执行拒绝策略
}

核心参数详解

ThreadPoolExecutor的行为完全由其构造参数决定。理解这些参数的含义和相互关系,是正确配置线程池的基础。

构造函数参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
参数名称 类型 说明 建议值
corePoolSize int 核心线程数,即使空闲也会保留的线程数量 CPU密集型:CPU核数+1
IO密集型:2*CPU核数
maximumPoolSize int 最大线程数,线程池允许创建的最大线程数量 根据系统资源和业务需求确定
keepAliveTime long 非核心线程的空闲存活时间 30-60秒(根据业务特点调整)
unit TimeUnit keepAliveTime的时间单位 TimeUnit.SECONDS
workQueue BlockingQueue 任务队列,存储等待执行的任务 根据任务特点选择合适的队列类型
threadFactory ThreadFactory 线程工厂,用于创建新线程 自定义工厂,设置有意义的线程名
handler RejectedExecutionHandler 拒绝策略,处理无法执行的任务 根据业务需求选择合适的策略
参数配置注意事项
  • corePoolSize和maximumPoolSize的关系:corePoolSize ≤ maximumPoolSize
  • 当使用无界队列时,maximumPoolSize参数实际上不起作用
  • keepAliveTime只对超过corePoolSize的线程生效
  • 合理的参数配置需要根据具体的业务场景和性能测试结果来确定

任务队列选择策略

任务队列是线程池的重要组成部分,不同类型的队列会显著影响线程池的行为和性能。选择合适的队列类型对于线程池的正确运行至关重要。

常用队列类型

ArrayBlockingQueue
有界阻塞队列
• 基于数组实现,容量固定
• 支持公平和非公平访问
• 适用于资源有限的场景
LinkedBlockingQueue
可选有界阻塞队列
• 基于链表实现,可指定容量
• 默认容量为Integer.MAX_VALUE
• 适用于任务数量不确定的场景
SynchronousQueue
同步队列
• 不存储元素,直接传递
• 每个插入操作必须等待移除操作
• 适用于任务处理速度快的场景
PriorityBlockingQueue
优先级阻塞队列
• 支持优先级排序,无界队列
• 元素必须实现Comparable接口
• 适用于有优先级要求的任务

队列选择示例

// 1. 使用ArrayBlockingQueue - 有界队列
ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(100), // 最多缓存100个任务
    new ThreadFactoryBuilder().setNameFormat("worker-%d").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()
);

// 2. 使用LinkedBlockingQueue - 可选有界队列
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(200), // 指定容量为200
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()
);

// 3. 使用SynchronousQueue - 直接传递
ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
    0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    new SynchronousQueue<>(), // 不缓存任务,直接传递
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy()
);

// 4. 使用PriorityBlockingQueue - 优先级队列
ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new PriorityBlockingQueue<>(), // 支持优先级排序
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.DiscardPolicy()
);
队列选择建议
  • CPU密集型任务:使用SynchronousQueue或小容量的ArrayBlockingQueue
  • IO密集型任务:使用LinkedBlockingQueue,容量根据内存情况设定
  • 有优先级要求:使用PriorityBlockingQueue
  • 资源受限环境:使用ArrayBlockingQueue控制内存使用

拒绝策略详解

当线程池无法处理新提交的任务时(线程池已关闭或达到最大容量),就会触发拒绝策略。Java提供了四种内置的拒绝策略,也支持自定义拒绝策略。

内置拒绝策略

AbortPolicy(默认)
抛出RejectedExecutionException异常,阻止系统正常运行。适用于关键任务,不允许任务丢失的场景。
CallerRunsPolicy
由调用线程直接执行该任务。这种策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
DiscardPolicy
静默丢弃无法处理的任务,不抛出异常。适用于允许任务丢失的场景,如日志记录等。
DiscardOldestPolicy
丢弃队列中最老的任务,然后重新尝试执行新任务。适用于任务有时效性的场景。

拒绝策略使用示例

// 1. AbortPolicy - 抛出异常(默认策略)
ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.AbortPolicy() // 抛出异常
);

// 2. CallerRunsPolicy - 调用者执行
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行
);

// 3. DiscardPolicy - 静默丢弃
ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.DiscardPolicy() // 静默丢弃
);

// 4. DiscardOldestPolicy - 丢弃最老任务
ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(2),
    Executors.defaultThreadFactory(),
    new ThreadPoolExecutor.DiscardOldestPolicy() // 丢弃最老任务
);

自定义拒绝策略

// 自定义拒绝策略示例
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
    private static final Logger logger = LoggerFactory.getLogger(CustomRejectedExecutionHandler.class);
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 记录被拒绝的任务信息
        logger.warn("Task {} rejected from {}", r.toString(), executor.toString());
        
        // 可以选择不同的处理策略:
        // 1. 记录到数据库或消息队列,稍后重试
        // 2. 发送告警通知
        // 3. 降级处理
        
        // 示例:尝试放入备用队列
        if (!backupQueue.offer(r)) {
            logger.error("Backup queue is also full, task {} will be discarded", r.toString());
        }
    }
    
    private final BlockingQueue backupQueue = new LinkedBlockingQueue<>(1000);
}

// 使用自定义拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    2, 4, 60L, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(10),
    Executors.defaultThreadFactory(),
    new CustomRejectedExecutionHandler() // 使用自定义策略
);

自定义线程池最佳实践

虽然Java提供了Executors工厂类来创建常用的线程池,但在生产环境中,我们通常需要根据具体的业务需求来自定义线程池配置,以获得最佳的性能和可控性。

自定义ThreadFactory

// 自定义线程工厂
public class CustomThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private final boolean daemon;
    private final int priority;
    
    public CustomThreadFactory(String namePrefix, boolean daemon, int priority) {
        this.namePrefix = namePrefix;
        this.daemon = daemon;
        this.priority = priority;
    }
    
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        thread.setDaemon(daemon);
        thread.setPriority(priority);
        
        // 设置未捕获异常处理器
        thread.setUncaughtExceptionHandler((t, e) -> {
            logger.error("Thread {} threw exception", t.getName(), e);
        });
        
        return thread;
    }
}

// 使用自定义线程工厂
ThreadFactory threadFactory = new CustomThreadFactory(
    "business-pool", false, Thread.NORM_PRIORITY
);

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    5, 10, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    threadFactory,
    new ThreadPoolExecutor.CallerRunsPolicy()
);

线程池监控和钩子方法

// 带监控功能的自定义线程池
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
    private static final Logger logger = LoggerFactory.getLogger(MonitoredThreadPoolExecutor.class);
    
    private final AtomicLong totalTasks = new AtomicLong();
    private final AtomicLong completedTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();
    
    public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                     long keepAliveTime, TimeUnit unit,
                                     BlockingQueue workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
    
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        totalTasks.incrementAndGet();
        logger.debug("Thread {} start executing task {}", t.getName(), r.toString());
    }
    
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            totalTime.addAndGet(taskTime);
            completedTasks.incrementAndGet();
            
            if (t != null) {
                logger.error("Task {} execution failed", r.toString(), t);
            } else {
                logger.debug("Task {} completed in {} ns", r.toString(), taskTime);
            }
        } finally {
            super.afterExecute(r, t);
        }
    }
    
    @Override
    protected void terminated() {
        try {
            logger.info("ThreadPool terminated. Total tasks: {}, Completed: {}, Average time: {} ns",
                       totalTasks.get(), completedTasks.get(), 
                       totalTime.get() / Math.max(completedTasks.get(), 1));
        } finally {
            super.terminated();
        }
    }
    
    // 获取监控指标
    public long getTotalTasks() { return totalTasks.get(); }
    public long getCompletedTasks() { return completedTasks.get(); }
    public double getAverageTaskTime() { 
        return totalTime.get() / (double) Math.max(completedTasks.get(), 1); 
    }
    
    private final ThreadLocal startTime = new ThreadLocal() {
        @Override
        protected Long initialValue() {
            return System.nanoTime();
        }
    };
}

完整的自定义线程池配置

@Configuration
public class ThreadPoolConfig {
    
    @Bean("businessThreadPool")
    public ThreadPoolExecutor businessThreadPool() {
        // 根据CPU核数计算线程数
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        int maximumPoolSize = corePoolSize * 2;
        
        // 自定义线程工厂
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
            .setNameFormat("business-pool-%d")
            .setDaemon(false)
            .setPriority(Thread.NORM_PRIORITY)
            .setUncaughtExceptionHandler((t, e) -> {
                logger.error("Thread {} threw exception", t.getName(), e);
            })
            .build();
        
        // 创建线程池
        ThreadPoolExecutor executor = new MonitoredThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(200),
            threadFactory,
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
        
        // 允许核心线程超时
        executor.allowCoreThreadTimeOut(true);
        
        return executor;
    }
    
    @Bean("ioThreadPool")
    public ThreadPoolExecutor ioThreadPool() {
        // IO密集型任务配置
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        int maximumPoolSize = corePoolSize * 2;
        
        return new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(500),
            new CustomThreadFactory("io-pool", false, Thread.NORM_PRIORITY),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }
}
生产环境注意事项
  • 避免使用Executors创建线程池,应该手动创建ThreadPoolExecutor
  • 合理设置线程池参数,避免资源浪费或系统过载
  • 为线程设置有意义的名称,便于问题排查
  • 实现适当的监控和告警机制
  • 在应用关闭时正确关闭线程池
上一章:线程池基础 返回目录 下一章:Executors工具类