第10章
⚙️ Executors工具类
掌握线程池工厂方法,学会选择和自定义合适的线程池
学习目标
- 掌握Executors工具类的使用方法
- 了解各种预定义线程池的特点和适用场景
- 学会选择合适的线程池类型
- 掌握定时任务的执行方式
- 学会自定义线程池的最佳实践
Executors详解
Executors是Java并发包中的一个工具类,它提供了一系列静态工厂方法来创建不同类型的线程池。这个类采用了工厂方法模式,为开发者提供了便利的线程池创建方式,同时也隐藏了线程池的复杂配置细节。
核心理解
Executors提供便利性,但在生产环境中,建议使用ThreadPoolExecutor直接创建线程池以获得更好的控制力。
工厂方法模式的优势
简化创建
通过简单的方法调用就能创建常用的线程池,无需了解复杂的构造参数。
最佳实践
内置了经过验证的线程池配置,减少了配置错误的可能性。
代码简洁
减少样板代码,让开发者专注于业务逻辑而非线程池配置。
便利性vs灵活性
虽然Executors提供了便利性,但在某些场景下可能限制了灵活性。阿里巴巴Java开发手册建议避免使用Executors创建线程池,主要原因是:
- 资源控制:某些方法创建的线程池可能导致OOM
- 参数透明:隐藏了重要的配置参数,不利于性能调优
- 监控困难:缺少自定义的线程命名和监控机制
- 异常处理:默认的异常处理策略可能不符合业务需求
FixedThreadPool
FixedThreadPool创建一个固定大小的线程池,线程数量始终保持不变。当所有线程都在执行任务时,新任务会在队列中等待。
创建FixedThreadPool
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executed by " +
Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
executor.shutdown();
特点分析
固定大小
线程数量固定,不会因为任务增加而无限制创建线程。
无界队列
使用LinkedBlockingQueue,理论上可以存储无限数量的任务。
线程复用
线程会被重复使用,减少了线程创建和销毁的开销。
适用场景
- CPU密集型任务:线程数量通常设置为CPU核心数
- 负载可预测:任务数量相对稳定的场景
- 资源控制:需要严格控制并发线程数量的场景
- 长期运行:适合长期运行的服务
注意事项
由于使用无界队列,在高负载情况下可能导致内存溢出。建议在生产环境中使用有界队列。
CachedThreadPool
CachedThreadPool创建一个可缓存的线程池,线程数量会根据需要动态调整。空闲线程会被缓存60秒,超时后会被回收。
创建CachedThreadPool
// 创建缓存线程池
ExecutorService executor = Executors.newCachedThreadPool();
// 提交大量短期任务
for (int i = 0; i < 100; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " executed by " +
Thread.currentThread().getName());
try {
Thread.sleep(100); // 短期任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
executor.shutdown();
动态调整机制
弹性扩展
根据任务数量动态创建线程,理论上没有上限。
自动回收
空闲线程60秒后自动回收,节省系统资源。
快速响应
新任务能够快速得到执行,适合突发性负载。
适用场景
- 短期任务:执行时间较短的任务
- 突发负载:任务数量变化较大的场景
- I/O密集型:大量I/O操作,线程经常阻塞
- 快速响应:对响应时间要求较高的场景
风险提醒
在高并发场景下可能创建大量线程,导致系统资源耗尽。建议设置合理的线程数量上限。
ScheduledThreadPool
ScheduledThreadPool用于执行定时任务和周期性任务,支持延迟执行和固定频率执行。
创建ScheduledThreadPool
// 创建定时任务线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);
// 延迟执行任务
scheduler.schedule(() -> {
System.out.println("延迟5秒执行的任务");
}, 5, TimeUnit.SECONDS);
// 固定频率执行任务
scheduler.scheduleAtFixedRate(() -> {
System.out.println("每3秒执行一次: " + new Date());
}, 0, 3, TimeUnit.SECONDS);
// 固定延迟执行任务
scheduler.scheduleWithFixedDelay(() -> {
System.out.println("上次执行完成后延迟2秒再执行");
try {
Thread.sleep(1000); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 0, 2, TimeUnit.SECONDS);
定时任务类型
延迟执行
schedule()方法可以延迟指定时间后执行任务。
固定频率
scheduleAtFixedRate()按固定频率执行,不考虑任务执行时间。
固定延迟
scheduleWithFixedDelay()在上次任务完成后延迟指定时间。
实际应用示例
数据清理任务
public class DataCleanupTask {
private final ScheduledExecutorService scheduler =
Executors.newScheduledThreadPool(2);
public void startCleanupTasks() {
// 每天凌晨2点执行数据清理
long initialDelay = calculateInitialDelay();
scheduler.scheduleAtFixedRate(
this::cleanupOldData,
initialDelay,
TimeUnit.DAYS.toSeconds(1),
TimeUnit.SECONDS
);
// 每小时执行缓存清理
scheduler.scheduleAtFixedRate(
this::cleanupCache,
0,
1,
TimeUnit.HOURS
);
}
private void cleanupOldData() {
System.out.println("执行数据清理任务: " + new Date());
// 清理逻辑
}
private void cleanupCache() {
System.out.println("执行缓存清理任务: " + new Date());
// 缓存清理逻辑
}
private long calculateInitialDelay() {
// 计算到下一个凌晨2点的延迟时间
// 实现省略...
return 0;
}
}
自定义线程池最佳实践
在生产环境中,建议使用ThreadPoolExecutor直接创建线程池,以获得更好的控制力和可观测性。
自定义线程池示例
public class CustomThreadPoolFactory {
public static ThreadPoolExecutor createCustomThreadPool(
String poolName,
int corePoolSize,
int maximumPoolSize) {
return new ThreadPoolExecutor(
corePoolSize, // 核心线程数
maximumPoolSize, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 有界队列
new CustomThreadFactory(poolName), // 自定义线程工厂
new CustomRejectedExecutionHandler() // 自定义拒绝策略
);
}
// 自定义线程工厂
static class CustomThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix + "-thread-";
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
// 自定义拒绝策略
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 记录日志
System.err.println("Task rejected: " + r.toString());
// 可以选择其他处理方式:
// 1. 抛出异常(默认)
// 2. 在调用者线程中执行
// 3. 丢弃任务
// 4. 丢弃队列中最老的任务
// 这里选择在调用者线程中执行
if (!executor.isShutdown()) {
r.run();
}
}
}
}
参数选择指南
CPU密集型
核心线程数 = CPU核心数 + 1,最大线程数 = CPU核心数 * 2
I/O密集型
核心线程数 = CPU核心数 * 2,根据I/O等待时间调整
混合型
根据任务特点和性能测试结果进行调优
监控和调优
线程池监控
public class ThreadPoolMonitor {
public static void monitorThreadPool(ThreadPoolExecutor executor, String poolName) {
ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
monitor.scheduleAtFixedRate(() -> {
System.out.println("=== " + poolName + " 线程池监控 ===");
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
System.out.println("总任务数: " + executor.getTaskCount());
System.out.println();
}, 0, 10, TimeUnit.SECONDS);
}
}