第9章
⚙️ ThreadPoolExecutor详解
深入理解ThreadPoolExecutor的工作原理、核心参数配置和自定义线程池的创建
学习目标
- 深入理解ThreadPoolExecutor的工作原理
- 掌握线程池的核心参数配置
- 学会自定义线程池的创建
- 理解任务队列的选择策略
- 掌握拒绝策略的使用
ThreadPoolExecutor工作原理
ThreadPoolExecutor是Java并发包中最核心的线程池实现类,它提供了完整的线程池功能,包括线程的创建、管理、任务调度和资源回收。理解其工作原理是掌握线程池技术的关键。
核心理解
ThreadPoolExecutor通过维护一个工作线程池和一个任务队列,实现了任务的异步执行和线程的复用,从而提高了系统的性能和资源利用率。
内部结构组成
工作线程池
维护一组工作线程,负责执行提交的任务。线程数量根据配置参数动态调整。
任务队列
存储等待执行的任务,当所有核心线程都在忙碌时,新任务会被放入队列中等待。
线程工厂
负责创建新的工作线程,可以自定义线程的名称、优先级等属性。
拒绝策略
当线程池无法处理新任务时的处理策略,如抛出异常、丢弃任务等。
任务执行流程
// ThreadPoolExecutor任务执行流程示例
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1. 如果当前线程数小于核心线程数,创建新线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 如果核心线程都在忙碌,尝试将任务加入队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3. 如果队列已满,尝试创建新线程(直到最大线程数)
else if (!addWorker(command, false))
reject(command); // 4. 如果无法创建新线程,执行拒绝策略
}
- 步骤1:检查当前线程数是否小于核心线程数,如果是则创建新线程执行任务
- 步骤2:如果核心线程都在忙碌,尝试将任务放入任务队列
- 步骤3:如果队列已满,尝试创建新线程(直到达到最大线程数)
- 步骤4:如果无法创建新线程,执行拒绝策略
核心参数详解
ThreadPoolExecutor的行为完全由其构造参数决定。理解这些参数的含义和相互关系,是正确配置线程池的基础。
构造函数参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数名称 | 类型 | 说明 | 建议值 |
---|---|---|---|
corePoolSize | int | 核心线程数,即使空闲也会保留的线程数量 | CPU密集型:CPU核数+1 IO密集型:2*CPU核数 |
maximumPoolSize | int | 最大线程数,线程池允许创建的最大线程数量 | 根据系统资源和业务需求确定 |
keepAliveTime | long | 非核心线程的空闲存活时间 | 30-60秒(根据业务特点调整) |
unit | TimeUnit | keepAliveTime的时间单位 | TimeUnit.SECONDS |
workQueue | BlockingQueue | 任务队列,存储等待执行的任务 | 根据任务特点选择合适的队列类型 |
threadFactory | ThreadFactory | 线程工厂,用于创建新线程 | 自定义工厂,设置有意义的线程名 |
handler | RejectedExecutionHandler | 拒绝策略,处理无法执行的任务 | 根据业务需求选择合适的策略 |
参数配置注意事项
- corePoolSize和maximumPoolSize的关系:corePoolSize ≤ maximumPoolSize
- 当使用无界队列时,maximumPoolSize参数实际上不起作用
- keepAliveTime只对超过corePoolSize的线程生效
- 合理的参数配置需要根据具体的业务场景和性能测试结果来确定
任务队列选择策略
任务队列是线程池的重要组成部分,不同类型的队列会显著影响线程池的行为和性能。选择合适的队列类型对于线程池的正确运行至关重要。
常用队列类型
ArrayBlockingQueue
有界阻塞队列
• 基于数组实现,容量固定
• 支持公平和非公平访问
• 适用于资源有限的场景
• 基于数组实现,容量固定
• 支持公平和非公平访问
• 适用于资源有限的场景
LinkedBlockingQueue
可选有界阻塞队列
• 基于链表实现,可指定容量
• 默认容量为Integer.MAX_VALUE
• 适用于任务数量不确定的场景
• 基于链表实现,可指定容量
• 默认容量为Integer.MAX_VALUE
• 适用于任务数量不确定的场景
SynchronousQueue
同步队列
• 不存储元素,直接传递
• 每个插入操作必须等待移除操作
• 适用于任务处理速度快的场景
• 不存储元素,直接传递
• 每个插入操作必须等待移除操作
• 适用于任务处理速度快的场景
PriorityBlockingQueue
优先级阻塞队列
• 支持优先级排序,无界队列
• 元素必须实现Comparable接口
• 适用于有优先级要求的任务
• 支持优先级排序,无界队列
• 元素必须实现Comparable接口
• 适用于有优先级要求的任务
队列选择示例
// 1. 使用ArrayBlockingQueue - 有界队列
ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), // 最多缓存100个任务
new ThreadFactoryBuilder().setNameFormat("worker-%d").build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 2. 使用LinkedBlockingQueue - 可选有界队列
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200), // 指定容量为200
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 3. 使用SynchronousQueue - 直接传递
ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue<>(), // 不缓存任务,直接传递
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
// 4. 使用PriorityBlockingQueue - 优先级队列
ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new PriorityBlockingQueue<>(), // 支持优先级排序
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy()
);
队列选择建议
- CPU密集型任务:使用SynchronousQueue或小容量的ArrayBlockingQueue
- IO密集型任务:使用LinkedBlockingQueue,容量根据内存情况设定
- 有优先级要求:使用PriorityBlockingQueue
- 资源受限环境:使用ArrayBlockingQueue控制内存使用
拒绝策略详解
当线程池无法处理新提交的任务时(线程池已关闭或达到最大容量),就会触发拒绝策略。Java提供了四种内置的拒绝策略,也支持自定义拒绝策略。
内置拒绝策略
AbortPolicy(默认)
抛出RejectedExecutionException异常,阻止系统正常运行。适用于关键任务,不允许任务丢失的场景。
CallerRunsPolicy
由调用线程直接执行该任务。这种策略提供简单的反馈控制机制,能够减缓新任务的提交速度。
DiscardPolicy
静默丢弃无法处理的任务,不抛出异常。适用于允许任务丢失的场景,如日志记录等。
DiscardOldestPolicy
丢弃队列中最老的任务,然后重新尝试执行新任务。适用于任务有时效性的场景。
拒绝策略使用示例
// 1. AbortPolicy - 抛出异常(默认策略)
ThreadPoolExecutor executor1 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy() // 抛出异常
);
// 2. CallerRunsPolicy - 调用者执行
ThreadPoolExecutor executor2 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行
);
// 3. DiscardPolicy - 静默丢弃
ThreadPoolExecutor executor3 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardPolicy() // 静默丢弃
);
// 4. DiscardOldestPolicy - 丢弃最老任务
ThreadPoolExecutor executor4 = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.DiscardOldestPolicy() // 丢弃最老任务
);
自定义拒绝策略
// 自定义拒绝策略示例
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
private static final Logger logger = LoggerFactory.getLogger(CustomRejectedExecutionHandler.class);
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录被拒绝的任务信息
logger.warn("Task {} rejected from {}", r.toString(), executor.toString());
// 可以选择不同的处理策略:
// 1. 记录到数据库或消息队列,稍后重试
// 2. 发送告警通知
// 3. 降级处理
// 示例:尝试放入备用队列
if (!backupQueue.offer(r)) {
logger.error("Backup queue is also full, task {} will be discarded", r.toString());
}
}
private final BlockingQueue backupQueue = new LinkedBlockingQueue<>(1000);
}
// 使用自定义拒绝策略
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(10),
Executors.defaultThreadFactory(),
new CustomRejectedExecutionHandler() // 使用自定义策略
);
自定义线程池最佳实践
虽然Java提供了Executors工厂类来创建常用的线程池,但在生产环境中,我们通常需要根据具体的业务需求来自定义线程池配置,以获得最佳的性能和可控性。
自定义ThreadFactory
// 自定义线程工厂
public class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
private final boolean daemon;
private final int priority;
public CustomThreadFactory(String namePrefix, boolean daemon, int priority) {
this.namePrefix = namePrefix;
this.daemon = daemon;
this.priority = priority;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
thread.setDaemon(daemon);
thread.setPriority(priority);
// 设置未捕获异常处理器
thread.setUncaughtExceptionHandler((t, e) -> {
logger.error("Thread {} threw exception", t.getName(), e);
});
return thread;
}
}
// 使用自定义线程工厂
ThreadFactory threadFactory = new CustomThreadFactory(
"business-pool", false, Thread.NORM_PRIORITY
);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy()
);
线程池监控和钩子方法
// 带监控功能的自定义线程池
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(MonitoredThreadPoolExecutor.class);
private final AtomicLong totalTasks = new AtomicLong();
private final AtomicLong completedTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
totalTasks.incrementAndGet();
logger.debug("Thread {} start executing task {}", t.getName(), r.toString());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.nanoTime();
long taskTime = endTime - startTime.get();
totalTime.addAndGet(taskTime);
completedTasks.incrementAndGet();
if (t != null) {
logger.error("Task {} execution failed", r.toString(), t);
} else {
logger.debug("Task {} completed in {} ns", r.toString(), taskTime);
}
} finally {
super.afterExecute(r, t);
}
}
@Override
protected void terminated() {
try {
logger.info("ThreadPool terminated. Total tasks: {}, Completed: {}, Average time: {} ns",
totalTasks.get(), completedTasks.get(),
totalTime.get() / Math.max(completedTasks.get(), 1));
} finally {
super.terminated();
}
}
// 获取监控指标
public long getTotalTasks() { return totalTasks.get(); }
public long getCompletedTasks() { return completedTasks.get(); }
public double getAverageTaskTime() {
return totalTime.get() / (double) Math.max(completedTasks.get(), 1);
}
private final ThreadLocal startTime = new ThreadLocal() {
@Override
protected Long initialValue() {
return System.nanoTime();
}
};
}
完整的自定义线程池配置
@Configuration
public class ThreadPoolConfig {
@Bean("businessThreadPool")
public ThreadPoolExecutor businessThreadPool() {
// 根据CPU核数计算线程数
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
// 自定义线程工厂
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("business-pool-%d")
.setDaemon(false)
.setPriority(Thread.NORM_PRIORITY)
.setUncaughtExceptionHandler((t, e) -> {
logger.error("Thread {} threw exception", t.getName(), e);
})
.build();
// 创建线程池
ThreadPoolExecutor executor = new MonitoredThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 允许核心线程超时
executor.allowCoreThreadTimeOut(true);
return executor;
}
@Bean("ioThreadPool")
public ThreadPoolExecutor ioThreadPool() {
// IO密集型任务配置
int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
int maximumPoolSize = corePoolSize * 2;
return new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(500),
new CustomThreadFactory("io-pool", false, Thread.NORM_PRIORITY),
new ThreadPoolExecutor.AbortPolicy()
);
}
}
生产环境注意事项
- 避免使用Executors创建线程池,应该手动创建ThreadPoolExecutor
- 合理设置线程池参数,避免资源浪费或系统过载
- 为线程设置有意义的名称,便于问题排查
- 实现适当的监控和告警机制
- 在应用关闭时正确关闭线程池