第11章

🚀 Future和CompletableFuture

掌握Java异步编程的核心技术,学会使用Future和CompletableFuture进行高效的异步处理

学习目标

Future接口详解

Future接口是Java并发编程中处理异步计算结果的核心接口。它代表一个异步计算的结果,提供了检查计算是否完成、等待计算完成、获取计算结果等方法。

核心概念

Future就像是一张"提货单",当你提交一个异步任务后,立即得到一个Future对象,可以用它在将来某个时候获取任务的执行结果。

Future接口的主要方法

get()方法
阻塞等待任务完成并返回结果,如果任务执行异常则抛出ExecutionException。
get(timeout, unit)
带超时的获取结果,如果在指定时间内未完成则抛出TimeoutException。
isDone()
检查任务是否已经完成,无论是正常完成、异常结束还是被取消。
cancel()
尝试取消任务的执行,参数决定是否中断正在执行的任务。

基本使用示例

Future基本用法
import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        // 提交Callable任务
        Future<String> future = executor.submit(() -> {
            Thread.sleep(2000); // 模拟耗时操作
            return "异步任务执行完成";
        });
        
        System.out.println("任务已提交,继续执行其他操作...");
        
        // 检查任务是否完成
        if (!future.isDone()) {
            System.out.println("任务还在执行中...");
        }
        
        // 获取结果(阻塞等待)
        String result = future.get();
        System.out.println("任务结果: " + result);
        
        executor.shutdown();
    }
}

FutureTask详解

FutureTask是Future接口的具体实现,同时也实现了Runnable接口。它可以包装Callable或Runnable任务,提供了Future的所有功能,是连接异步任务和Future结果的桥梁。

FutureTask的特点

FutureTask使用示例
import java.util.concurrent.*;

public class FutureTaskExample {
    public static void main(String[] args) throws Exception {
        // 创建FutureTask
        FutureTask<Integer> futureTask = new FutureTask<>(() -> {
            System.out.println("开始计算...");
            Thread.sleep(3000);
            return 42;
        });
        
        // 启动线程执行任务
        Thread thread = new Thread(futureTask);
        thread.start();
        
        System.out.println("主线程继续执行其他任务...");
        
        // 获取计算结果
        Integer result = futureTask.get();
        System.out.println("计算结果: " + result);
    }
}

CompletableFuture详解

CompletableFuture是Java 8引入的强大异步编程工具,它不仅实现了Future接口,还提供了丰富的组合、链式调用和异常处理功能,是现代Java异步编程的首选。

核心优势

CompletableFuture支持函数式编程风格,可以轻松构建复杂的异步处理流水线,避免了传统回调地狱的问题。

创建CompletableFuture

supplyAsync()
创建有返回值的异步任务,接受Supplier函数式接口。
runAsync()
创建无返回值的异步任务,接受Runnable函数式接口。
completedFuture()
创建一个已经完成的CompletableFuture,常用于测试。
CompletableFuture创建示例
import java.util.concurrent.CompletableFuture;

public class CompletableFutureBasic {
    public static void main(String[] args) throws Exception {
        // 创建有返回值的异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Hello";
        });
        
        // 创建无返回值的异步任务
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            System.out.println("执行异步任务");
        });
        
        // 创建已完成的Future
        CompletableFuture<String> future3 = CompletableFuture.completedFuture("World");
        
        System.out.println(future1.get()); // Hello
        future2.get(); // 等待任务完成
        System.out.println(future3.get()); // World
    }
}

链式调用和组合操作

CompletableFuture的强大之处在于其丰富的链式调用方法,可以构建复杂的异步处理流水线。

常用链式方法

thenApply()
对结果进行转换,类似于Stream的map操作。
thenAccept()
消费结果但不返回新值,类似于Stream的forEach。
thenCompose()
用于连接两个CompletableFuture,避免嵌套。
thenCombine()
组合两个独立的CompletableFuture的结果。
链式调用示例
import java.util.concurrent.CompletableFuture;

public class CompletableFutureChain {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> "Hello")
            .thenApply(s -> s + " World")
            .thenApply(String::toUpperCase)
            .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "!"));
        
        System.out.println(future.get()); // HELLO WORLD!
        
        // 组合两个独立的Future
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");
        
        CompletableFuture<String> combined = future1.thenCombine(future2, 
            (s1, s2) -> s1 + " " + s2);
        
        System.out.println(combined.get()); // Hello World
    }
}

异常处理

CompletableFuture提供了完善的异常处理机制,可以优雅地处理异步操作中的异常情况。

异常处理方法

异常处理示例
import java.util.concurrent.CompletableFuture;

public class CompletableFutureException {
    public static void main(String[] args) throws Exception {
        // 使用exceptionally处理异常
        CompletableFuture<String> future1 = CompletableFuture
            .supplyAsync(() -> {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("随机异常");
                }
                return "成功结果";
            })
            .exceptionally(throwable -> {
                System.out.println("发生异常: " + throwable.getMessage());
                return "默认值";
            });
        
        System.out.println(future1.get());
        
        // 使用handle同时处理成功和异常
        CompletableFuture<String> future2 = CompletableFuture
            .supplyAsync(() -> {
                throw new RuntimeException("测试异常");
            })
            .handle((result, throwable) -> {
                if (throwable != null) {
                    return "处理异常: " + throwable.getMessage();
                }
                return result;
            });
        
        System.out.println(future2.get());
    }
}

组合式异步编程

CompletableFuture提供了强大的组合功能,可以处理多个异步任务的协调和组合。

多任务组合方法

allOf()
等待所有CompletableFuture完成,返回Void类型的Future。
anyOf()
等待任意一个CompletableFuture完成,返回最先完成的结果。
多任务组合示例
import java.util.concurrent.CompletableFuture;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class CompletableFutureCombination {
    public static void main(String[] args) throws Exception {
        // 创建多个异步任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            sleep(1000);
            return "任务1完成";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            sleep(2000);
            return "任务2完成";
        });
        
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            sleep(1500);
            return "任务3完成";
        });
        
        // 等待所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
        
        CompletableFuture<List<String>> allResults = allFutures.thenApply(v -> 
            Arrays.asList(future1, future2, future3)
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );
        
        List<String> results = allResults.get();
        results.forEach(System.out::println);
        
        // 等待任意一个任务完成
        CompletableFuture<Object> anyResult = CompletableFuture.anyOf(future1, future2, future3);
        System.out.println("最先完成的任务: " + anyResult.get());
    }
    
    private static void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

实战案例:异步文件处理系统

通过一个完整的异步文件处理系统案例,展示CompletableFuture在实际项目中的应用。

异步文件处理系统
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.List;
import java.util.Arrays;

public class AsyncFileProcessor {
    private final ExecutorService executor = Executors.newFixedThreadPool(4);
    
    public CompletableFuture<String> processFile(String fileName) {
        return CompletableFuture
            .supplyAsync(() -> readFile(fileName), executor)
            .thenApply(this::validateContent)
            .thenApply(this::processContent)
            .thenCompose(this::saveToDatabase)
            .exceptionally(this::handleError);
    }
    
    public CompletableFuture<List<String>> processBatch(List<String> fileNames) {
        List<CompletableFuture<String>> futures = fileNames.stream()
            .map(this::processFile)
            .collect(Collectors.toList());
        
        return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
            .thenApply(v -> futures.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList()));
    }
    
    private String readFile(String fileName) {
        // 模拟文件读取
        System.out.println("读取文件: " + fileName);
        sleep(1000);
        return "文件内容: " + fileName;
    }
    
    private String validateContent(String content) {
        // 模拟内容验证
        System.out.println("验证内容: " + content);
        if (content.contains("error")) {
            throw new RuntimeException("内容验证失败");
        }
        return content;
    }
    
    private String processContent(String content) {
        // 模拟内容处理
        System.out.println("处理内容: " + content);
        sleep(500);
        return "已处理: " + content;
    }
    
    private CompletableFuture<String> saveToDatabase(String processedContent) {
        // 模拟数据库保存
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("保存到数据库: " + processedContent);
            sleep(800);
            return "保存成功: " + processedContent;
        }, executor);
    }
    
    private String handleError(Throwable throwable) {
        System.err.println("处理失败: " + throwable.getMessage());
        return "处理失败,使用默认值";
    }
    
    private void sleep(long millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    public static void main(String[] args) throws Exception {
        AsyncFileProcessor processor = new AsyncFileProcessor();
        
        // 处理单个文件
        CompletableFuture<String> singleResult = processor.processFile("test.txt");
        System.out.println(singleResult.get());
        
        // 批量处理文件
        List<String> fileNames = Arrays.asList("file1.txt", "file2.txt", "file3.txt");
        CompletableFuture<List<String>> batchResult = processor.processBatch(fileNames);
        
        List<String> results = batchResult.get();
        results.forEach(System.out::println);
        
        processor.executor.shutdown();
    }
}

最佳实践和注意事项

最佳实践
  • 合理使用线程池:为CompletableFuture指定合适的线程池,避免使用默认的ForkJoinPool
  • 异常处理:始终为异步操作添加异常处理逻辑
  • 避免阻塞:尽量使用非阻塞的组合方法,避免在异步回调中调用get()
  • 资源管理:及时关闭自定义的线程池,避免资源泄露
  • 超时控制:为长时间运行的任务设置合理的超时时间
注意事项
  • 内存泄露:长时间运行的CompletableFuture可能导致内存泄露
  • 异常传播:未处理的异常会被包装在CompletionException中
  • 线程安全:CompletableFuture本身是线程安全的,但回调中的操作需要注意线程安全
  • 性能考虑:过度使用异步可能导致上下文切换开销增大
上一章:Executors工具类 返回目录 下一章:并发集合