第10章
⚡ Executors框架
掌握Java并发工具类Executors的使用,学习预定义线程池、Future和Callable等高级特性
学习目标
- 掌握Executors工具类的使用方法
- 理解各种预定义线程池的特点和适用场景
- 学会Future和Callable的使用技巧
- 掌握CompletionService的应用场景
- 了解ExecutorService的最佳实践
Executors工具类概述
Executors是Java并发包中的一个工具类,提供了创建各种类型线程池的便捷方法。它采用工厂模式,封装了ThreadPoolExecutor的复杂配置,让开发者能够快速创建适合不同场景的线程池。
核心理解
Executors虽然提供了便捷的线程池创建方法,但在生产环境中建议根据具体需求手动配置ThreadPoolExecutor,以获得更好的控制和性能。
主要功能
线程池创建
提供多种预定义的线程池类型,如固定大小线程池、缓存线程池、单线程池等。
调度服务
创建支持定时和周期性任务执行的调度线程池。
线程工厂
提供默认的线程工厂实现,支持自定义线程创建逻辑。
基本使用示例
import java.util.concurrent.*;
public class ExecutorsExample {
public static void main(String[] args) {
// 创建固定大小的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(5);
// 提交任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
fixedPool.submit(() -> {
System.out.println("Task " + taskId + " executed by " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 关闭线程池
fixedPool.shutdown();
}
}
预定义线程池类型
Executors提供了几种常用的预定义线程池,每种都有其特定的使用场景和特点。了解这些线程池的特性有助于选择合适的线程池类型。
newFixedThreadPool
特点说明
- 固定线程数:线程池大小固定,不会动态调整
- 无界队列:使用LinkedBlockingQueue,理论上可以无限排队
- 适用场景:负载比较均匀,需要限制并发线程数的场景
- 注意事项:队列可能无限增长,导致内存溢出
// 创建固定大小的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);
// 等价的ThreadPoolExecutor配置
ExecutorService equivalentPool = new ThreadPoolExecutor(
10, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue()
);
newCachedThreadPool
特点说明
- 动态调整:根据需要创建新线程,空闲线程会被回收
- 同步队列:使用SynchronousQueue,不存储任务
- 适用场景:执行大量短期异步任务的场景
- 注意事项:线程数可能无限增长,需要控制任务提交速度
// 创建缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();
// 等价的ThreadPoolExecutor配置
ExecutorService equivalentPool = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
new SynchronousQueue()
);
newSingleThreadExecutor
特点说明
- 单线程执行:只有一个工作线程执行任务
- 顺序执行:任务按照提交顺序依次执行
- 适用场景:需要保证任务执行顺序的场景
- 容错性:如果线程异常终止,会创建新线程继续执行
// 创建单线程执行器
ExecutorService singlePool = Executors.newSingleThreadExecutor();
// 示例:顺序处理任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
singlePool.submit(() -> {
System.out.println("Processing task " + taskId);
// 任务处理逻辑
});
}
newScheduledThreadPool
特点说明
- 定时执行:支持延迟和周期性任务执行
- 固定线程数:核心线程数固定,最大线程数无限
- 适用场景:需要定时执行任务的场景
- 高精度:基于时间轮算法,支持高精度定时
// 创建调度线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);
// 延迟执行
scheduledPool.schedule(() -> {
System.out.println("Delayed task executed");
}, 5, TimeUnit.SECONDS);
// 周期性执行
scheduledPool.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed");
}, 0, 10, TimeUnit.SECONDS);
Future和Callable
Future和Callable是Java并发编程中处理异步任务结果的重要接口。Callable允许任务返回结果或抛出异常,而Future提供了获取异步计算结果的方法。
Callable接口
接口定义
Callable接口类似于Runnable,但可以返回结果并抛出异常。
import java.util.concurrent.*;
import java.util.Random;
public class CallableExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 创建Callable任务
Callable task = () -> {
Thread.sleep(2000); // 模拟耗时操作
return new Random().nextInt(100);
};
// 提交任务并获取Future
Future future = executor.submit(task);
// 执行其他操作
System.out.println("Task submitted, doing other work...");
// 获取结果(阻塞直到任务完成)
Integer result = future.get();
System.out.println("Task result: " + result);
executor.shutdown();
}
}
Future接口方法
get()方法
- get():阻塞等待结果
- get(timeout, unit):超时等待
cancel()方法
- cancel(true):中断执行
- cancel(false):不中断执行
状态查询
- isDone():是否完成
- isCancelled():是否取消
批量任务处理
import java.util.*;
import java.util.concurrent.*;
public class BatchTaskExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(5);
// 创建多个任务
List> tasks = Arrays.asList(
() -> { Thread.sleep(1000); return "Task 1"; },
() -> { Thread.sleep(2000); return "Task 2"; },
() -> { Thread.sleep(1500); return "Task 3"; }
);
// 方式1:invokeAll - 等待所有任务完成
List> futures = executor.invokeAll(tasks);
for (Future future : futures) {
System.out.println("Result: " + future.get());
}
// 方式2:invokeAny - 返回第一个完成的任务结果
String firstResult = executor.invokeAny(tasks);
System.out.println("First completed: " + firstResult);
executor.shutdown();
}
}
CompletionService
CompletionService将Executor和BlockingQueue的功能融合在一起,可以按照任务完成的顺序获取结果,而不是按照任务提交的顺序。这在处理批量异步任务时非常有用。
基本使用
import java.util.concurrent.*;
import java.util.Random;
public class CompletionServiceExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(5);
CompletionService completionService =
new ExecutorCompletionService<>(executor);
// 提交多个任务
for (int i = 0; i < 5; i++) {
final int taskId = i;
completionService.submit(() -> {
// 模拟不同的执行时间
Thread.sleep(new Random().nextInt(3000));
return "Task " + taskId + " completed";
});
}
// 按完成顺序获取结果
for (int i = 0; i < 5; i++) {
Future future = completionService.take(); // 阻塞等待
String result = future.get();
System.out.println("Received: " + result);
}
executor.shutdown();
}
}
实际应用场景
搜索聚合
从多个数据源搜索数据,按照返回速度展示结果,提升用户体验。
文件下载
并行下载多个文件,按完成顺序处理下载结果。
数据处理
批量处理数据,按处理完成顺序进行后续操作。
高级用法示例
public class AdvancedCompletionServiceExample {
private static final int TIMEOUT_SECONDS = 5;
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletionService completionService =
new ExecutorCompletionService<>(executor);
// 提交任务
int taskCount = 10;
for (int i = 0; i < taskCount; i++) {
final int taskId = i;
completionService.submit(() -> processData(taskId));
}
// 处理结果,支持超时
int completedTasks = 0;
while (completedTasks < taskCount) {
try {
Future future = completionService.poll(
TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (future != null) {
String result = future.get();
System.out.println("Processed: " + result);
completedTasks++;
} else {
System.out.println("Timeout waiting for task completion");
break;
}
} catch (Exception e) {
System.err.println("Task failed: " + e.getMessage());
completedTasks++;
}
}
executor.shutdown();
}
private static String processData(int taskId) throws Exception {
// 模拟数据处理
Thread.sleep(new Random().nextInt(4000));
if (taskId == 7) {
throw new RuntimeException("Task " + taskId + " failed");
}
return "Data " + taskId;
}
}
ExecutorService最佳实践
正确使用ExecutorService需要注意资源管理、异常处理、性能优化等多个方面。以下是一些重要的最佳实践。
资源管理
重要提醒
ExecutorService必须正确关闭,否则会导致应用程序无法正常退出。
public class ExecutorShutdownExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(5);
try {
// 提交任务
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
// 任务逻辑
});
}
} finally {
// 正确的关闭方式
shutdownExecutor(executor);
}
}
private static void shutdownExecutor(ExecutorService executor) {
executor.shutdown(); // 不再接受新任务
try {
// 等待已提交任务完成
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow(); // 强制关闭
// 再次等待
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Executor did not terminate");
}
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
异常处理
public class ExceptionHandlingExample {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 方式1:通过Future获取异常
Future future = executor.submit(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("Task failed");
}
return "Success";
});
try {
String result = future.get();
System.out.println("Result: " + result);
} catch (ExecutionException e) {
System.err.println("Task failed: " + e.getCause().getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 方式2:自定义ThreadFactory处理未捕获异常
ThreadFactory threadFactory = r -> {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler((thread, ex) -> {
System.err.println("Uncaught exception in thread " +
thread.getName() + ": " + ex.getMessage());
});
return t;
};
ExecutorService customExecutor = new ThreadPoolExecutor(
5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
threadFactory
);
executor.shutdown();
customExecutor.shutdown();
}
}
性能优化建议
线程池大小
- CPU密集型:线程数 = CPU核心数 + 1
- IO密集型:线程数 = CPU核心数 × (1 + IO等待时间/CPU计算时间)
- 混合型:根据实际测试调优
队列选择
- ArrayBlockingQueue:有界队列,防止内存溢出
- LinkedBlockingQueue:可选有界,性能较好
- SynchronousQueue:直接交换,适合缓存线程池
监控指标
- 活跃线程数
- 队列长度
- 任务完成数
- 拒绝任务数