🚀 Future和CompletableFuture
掌握Java异步编程的核心技术,学会使用Future和CompletableFuture进行高效的异步处理
学习目标
- 掌握Future接口的使用方法和核心概念
- 理解FutureTask的实现原理和应用场景
- 熟练使用CompletableFuture进行异步编程
- 学会异步编程模式和最佳实践
- 掌握组合式异步编程技术
Future接口详解
Future接口是Java并发编程中处理异步计算结果的核心接口。它代表一个异步计算的结果,提供了检查计算是否完成、等待计算完成、获取计算结果等方法。
Future就像是一张"提货单",当你提交一个异步任务后,立即得到一个Future对象,可以用它在将来某个时候获取任务的执行结果。
Future接口的主要方法
基本使用示例
import java.util.concurrent.*;
public class FutureExample {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交Callable任务
Future<String> future = executor.submit(() -> {
Thread.sleep(2000); // 模拟耗时操作
return "异步任务执行完成";
});
System.out.println("任务已提交,继续执行其他操作...");
// 检查任务是否完成
if (!future.isDone()) {
System.out.println("任务还在执行中...");
}
// 获取结果(阻塞等待)
String result = future.get();
System.out.println("任务结果: " + result);
executor.shutdown();
}
}
FutureTask详解
FutureTask是Future接口的具体实现,同时也实现了Runnable接口。它可以包装Callable或Runnable任务,提供了Future的所有功能,是连接异步任务和Future结果的桥梁。
FutureTask的特点
- 双重身份:既是Future又是Runnable,可以直接提交给线程执行
- 状态管理:内部维护任务的执行状态(NEW、COMPLETING、NORMAL等)
- 结果缓存:任务只会执行一次,多次调用get()返回相同结果
- 异常处理:能够捕获并传播任务执行过程中的异常
import java.util.concurrent.*;
public class FutureTaskExample {
public static void main(String[] args) throws Exception {
// 创建FutureTask
FutureTask<Integer> futureTask = new FutureTask<>(() -> {
System.out.println("开始计算...");
Thread.sleep(3000);
return 42;
});
// 启动线程执行任务
Thread thread = new Thread(futureTask);
thread.start();
System.out.println("主线程继续执行其他任务...");
// 获取计算结果
Integer result = futureTask.get();
System.out.println("计算结果: " + result);
}
}
CompletableFuture详解
CompletableFuture是Java 8引入的强大异步编程工具,它不仅实现了Future接口,还提供了丰富的组合、链式调用和异常处理功能,是现代Java异步编程的首选。
CompletableFuture支持函数式编程风格,可以轻松构建复杂的异步处理流水线,避免了传统回调地狱的问题。
创建CompletableFuture
import java.util.concurrent.CompletableFuture;
public class CompletableFutureBasic {
public static void main(String[] args) throws Exception {
// 创建有返回值的异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello";
});
// 创建无返回值的异步任务
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("执行异步任务");
});
// 创建已完成的Future
CompletableFuture<String> future3 = CompletableFuture.completedFuture("World");
System.out.println(future1.get()); // Hello
future2.get(); // 等待任务完成
System.out.println(future3.get()); // World
}
}
链式调用和组合操作
CompletableFuture的强大之处在于其丰富的链式调用方法,可以构建复杂的异步处理流水线。
常用链式方法
import java.util.concurrent.CompletableFuture;
public class CompletableFutureChain {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World")
.thenApply(String::toUpperCase)
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "!"));
System.out.println(future.get()); // HELLO WORLD!
// 组合两个独立的Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<String> combined = future1.thenCombine(future2,
(s1, s2) -> s1 + " " + s2);
System.out.println(combined.get()); // Hello World
}
}
异常处理
CompletableFuture提供了完善的异常处理机制,可以优雅地处理异步操作中的异常情况。
异常处理方法
- exceptionally():当发生异常时提供默认值或替代逻辑
- handle():同时处理正常结果和异常情况
- whenComplete():无论成功还是失败都会执行的回调
import java.util.concurrent.CompletableFuture;
public class CompletableFutureException {
public static void main(String[] args) throws Exception {
// 使用exceptionally处理异常
CompletableFuture<String> future1 = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) {
throw new RuntimeException("随机异常");
}
return "成功结果";
})
.exceptionally(throwable -> {
System.out.println("发生异常: " + throwable.getMessage());
return "默认值";
});
System.out.println(future1.get());
// 使用handle同时处理成功和异常
CompletableFuture<String> future2 = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("测试异常");
})
.handle((result, throwable) -> {
if (throwable != null) {
return "处理异常: " + throwable.getMessage();
}
return result;
});
System.out.println(future2.get());
}
}
组合式异步编程
CompletableFuture提供了强大的组合功能,可以处理多个异步任务的协调和组合。
多任务组合方法
import java.util.concurrent.CompletableFuture;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class CompletableFutureCombination {
public static void main(String[] args) throws Exception {
// 创建多个异步任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "任务1完成";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(2000);
return "任务2完成";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "任务3完成";
});
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<List<String>> allResults = allFutures.thenApply(v ->
Arrays.asList(future1, future2, future3)
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<String> results = allResults.get();
results.forEach(System.out::println);
// 等待任意一个任务完成
CompletableFuture<Object> anyResult = CompletableFuture.anyOf(future1, future2, future3);
System.out.println("最先完成的任务: " + anyResult.get());
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
实战案例:异步文件处理系统
通过一个完整的异步文件处理系统案例,展示CompletableFuture在实际项目中的应用。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.Arrays;
public class AsyncFileProcessor {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
public CompletableFuture<String> processFile(String fileName) {
return CompletableFuture
.supplyAsync(() -> readFile(fileName), executor)
.thenApply(this::validateContent)
.thenApply(this::processContent)
.thenCompose(this::saveToDatabase)
.exceptionally(this::handleError);
}
public CompletableFuture<List<String>> processBatch(List<String> fileNames) {
List<CompletableFuture<String>> futures = fileNames.stream()
.map(this::processFile)
.collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}
private String readFile(String fileName) {
// 模拟文件读取
System.out.println("读取文件: " + fileName);
sleep(1000);
return "文件内容: " + fileName;
}
private String validateContent(String content) {
// 模拟内容验证
System.out.println("验证内容: " + content);
if (content.contains("error")) {
throw new RuntimeException("内容验证失败");
}
return content;
}
private String processContent(String content) {
// 模拟内容处理
System.out.println("处理内容: " + content);
sleep(500);
return "已处理: " + content;
}
private CompletableFuture<String> saveToDatabase(String processedContent) {
// 模拟数据库保存
return CompletableFuture.supplyAsync(() -> {
System.out.println("保存到数据库: " + processedContent);
sleep(800);
return "保存成功: " + processedContent;
}, executor);
}
private String handleError(Throwable throwable) {
System.err.println("处理失败: " + throwable.getMessage());
return "处理失败,使用默认值";
}
private void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws Exception {
AsyncFileProcessor processor = new AsyncFileProcessor();
// 处理单个文件
CompletableFuture<String> singleResult = processor.processFile("test.txt");
System.out.println(singleResult.get());
// 批量处理文件
List<String> fileNames = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
CompletableFuture<List<String>> batchResult = processor.processBatch(fileNames);
List<String> results = batchResult.get();
results.forEach(System.out::println);
processor.executor.shutdown();
}
}
最佳实践和注意事项
- 合理使用线程池:为CompletableFuture指定合适的线程池,避免使用默认的ForkJoinPool
- 异常处理:始终为异步操作添加异常处理逻辑
- 避免阻塞:尽量使用非阻塞的组合方法,避免在异步回调中调用get()
- 资源管理:及时关闭自定义的线程池,避免资源泄露
- 超时控制:为长时间运行的任务设置合理的超时时间
- 内存泄露:长时间运行的CompletableFuture可能导致内存泄露
- 异常传播:未处理的异常会被包装在CompletionException中
- 线程安全:CompletableFuture本身是线程安全的,但回调中的操作需要注意线程安全
- 性能考虑:过度使用异步可能导致上下文切换开销增大