第10章

⚡ Executors框架

掌握Java并发工具类Executors的使用,学习预定义线程池、Future和Callable等高级特性

学习目标

Executors工具类概述

Executors是Java并发包中的一个工具类,提供了创建各种类型线程池的便捷方法。它采用工厂模式,封装了ThreadPoolExecutor的复杂配置,让开发者能够快速创建适合不同场景的线程池。

核心理解

Executors虽然提供了便捷的线程池创建方法,但在生产环境中建议根据具体需求手动配置ThreadPoolExecutor,以获得更好的控制和性能。

主要功能

线程池创建
提供多种预定义的线程池类型,如固定大小线程池、缓存线程池、单线程池等。
调度服务
创建支持定时和周期性任务执行的调度线程池。
线程工厂
提供默认的线程工厂实现,支持自定义线程创建逻辑。

基本使用示例

import java.util.concurrent.*;

public class ExecutorsExample {
    public static void main(String[] args) {
        // 创建固定大小的线程池
        ExecutorService fixedPool = Executors.newFixedThreadPool(5);
        
        // 提交任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            fixedPool.submit(() -> {
                System.out.println("Task " + taskId + " executed by " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        // 关闭线程池
        fixedPool.shutdown();
    }
}

预定义线程池类型

Executors提供了几种常用的预定义线程池,每种都有其特定的使用场景和特点。了解这些线程池的特性有助于选择合适的线程池类型。

newFixedThreadPool

特点说明
  • 固定线程数:线程池大小固定,不会动态调整
  • 无界队列:使用LinkedBlockingQueue,理论上可以无限排队
  • 适用场景:负载比较均匀,需要限制并发线程数的场景
  • 注意事项:队列可能无限增长,导致内存溢出
// 创建固定大小的线程池
ExecutorService fixedPool = Executors.newFixedThreadPool(10);

// 等价的ThreadPoolExecutor配置
ExecutorService equivalentPool = new ThreadPoolExecutor(
    10, 10, 0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue()
);

newCachedThreadPool

特点说明
  • 动态调整:根据需要创建新线程,空闲线程会被回收
  • 同步队列:使用SynchronousQueue,不存储任务
  • 适用场景:执行大量短期异步任务的场景
  • 注意事项:线程数可能无限增长,需要控制任务提交速度
// 创建缓存线程池
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 等价的ThreadPoolExecutor配置
ExecutorService equivalentPool = new ThreadPoolExecutor(
    0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    new SynchronousQueue()
);

newSingleThreadExecutor

特点说明
  • 单线程执行:只有一个工作线程执行任务
  • 顺序执行:任务按照提交顺序依次执行
  • 适用场景:需要保证任务执行顺序的场景
  • 容错性:如果线程异常终止,会创建新线程继续执行
// 创建单线程执行器
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 示例:顺序处理任务
for (int i = 0; i < 5; i++) {
    final int taskId = i;
    singlePool.submit(() -> {
        System.out.println("Processing task " + taskId);
        // 任务处理逻辑
    });
}

newScheduledThreadPool

特点说明
  • 定时执行:支持延迟和周期性任务执行
  • 固定线程数:核心线程数固定,最大线程数无限
  • 适用场景:需要定时执行任务的场景
  • 高精度:基于时间轮算法,支持高精度定时
// 创建调度线程池
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(5);

// 延迟执行
scheduledPool.schedule(() -> {
    System.out.println("Delayed task executed");
}, 5, TimeUnit.SECONDS);

// 周期性执行
scheduledPool.scheduleAtFixedRate(() -> {
    System.out.println("Periodic task executed");
}, 0, 10, TimeUnit.SECONDS);

Future和Callable

Future和Callable是Java并发编程中处理异步任务结果的重要接口。Callable允许任务返回结果或抛出异常,而Future提供了获取异步计算结果的方法。

Callable接口

接口定义

Callable接口类似于Runnable,但可以返回结果并抛出异常。

import java.util.concurrent.*;
import java.util.Random;

public class CallableExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 创建Callable任务
        Callable task = () -> {
            Thread.sleep(2000); // 模拟耗时操作
            return new Random().nextInt(100);
        };
        
        // 提交任务并获取Future
        Future future = executor.submit(task);
        
        // 执行其他操作
        System.out.println("Task submitted, doing other work...");
        
        // 获取结果(阻塞直到任务完成)
        Integer result = future.get();
        System.out.println("Task result: " + result);
        
        executor.shutdown();
    }
}

Future接口方法

get()方法
  • get():阻塞等待结果
  • get(timeout, unit):超时等待
cancel()方法
  • cancel(true):中断执行
  • cancel(false):不中断执行
状态查询
  • isDone():是否完成
  • isCancelled():是否取消

批量任务处理

import java.util.*;
import java.util.concurrent.*;

public class BatchTaskExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        // 创建多个任务
        List> tasks = Arrays.asList(
            () -> { Thread.sleep(1000); return "Task 1"; },
            () -> { Thread.sleep(2000); return "Task 2"; },
            () -> { Thread.sleep(1500); return "Task 3"; }
        );
        
        // 方式1:invokeAll - 等待所有任务完成
        List> futures = executor.invokeAll(tasks);
        for (Future future : futures) {
            System.out.println("Result: " + future.get());
        }
        
        // 方式2:invokeAny - 返回第一个完成的任务结果
        String firstResult = executor.invokeAny(tasks);
        System.out.println("First completed: " + firstResult);
        
        executor.shutdown();
    }
}

CompletionService

CompletionService将Executor和BlockingQueue的功能融合在一起,可以按照任务完成的顺序获取结果,而不是按照任务提交的顺序。这在处理批量异步任务时非常有用。

基本使用

import java.util.concurrent.*;
import java.util.Random;

public class CompletionServiceExample {
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletionService completionService = 
            new ExecutorCompletionService<>(executor);
        
        // 提交多个任务
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            completionService.submit(() -> {
                // 模拟不同的执行时间
                Thread.sleep(new Random().nextInt(3000));
                return "Task " + taskId + " completed";
            });
        }
        
        // 按完成顺序获取结果
        for (int i = 0; i < 5; i++) {
            Future future = completionService.take(); // 阻塞等待
            String result = future.get();
            System.out.println("Received: " + result);
        }
        
        executor.shutdown();
    }
}

实际应用场景

搜索聚合
从多个数据源搜索数据,按照返回速度展示结果,提升用户体验。
文件下载
并行下载多个文件,按完成顺序处理下载结果。
数据处理
批量处理数据,按处理完成顺序进行后续操作。

高级用法示例

public class AdvancedCompletionServiceExample {
    private static final int TIMEOUT_SECONDS = 5;
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        CompletionService completionService = 
            new ExecutorCompletionService<>(executor);
        
        // 提交任务
        int taskCount = 10;
        for (int i = 0; i < taskCount; i++) {
            final int taskId = i;
            completionService.submit(() -> processData(taskId));
        }
        
        // 处理结果,支持超时
        int completedTasks = 0;
        while (completedTasks < taskCount) {
            try {
                Future future = completionService.poll(
                    TIMEOUT_SECONDS, TimeUnit.SECONDS);
                
                if (future != null) {
                    String result = future.get();
                    System.out.println("Processed: " + result);
                    completedTasks++;
                } else {
                    System.out.println("Timeout waiting for task completion");
                    break;
                }
            } catch (Exception e) {
                System.err.println("Task failed: " + e.getMessage());
                completedTasks++;
            }
        }
        
        executor.shutdown();
    }
    
    private static String processData(int taskId) throws Exception {
        // 模拟数据处理
        Thread.sleep(new Random().nextInt(4000));
        if (taskId == 7) {
            throw new RuntimeException("Task " + taskId + " failed");
        }
        return "Data " + taskId;
    }
}

ExecutorService最佳实践

正确使用ExecutorService需要注意资源管理、异常处理、性能优化等多个方面。以下是一些重要的最佳实践。

资源管理

重要提醒

ExecutorService必须正确关闭,否则会导致应用程序无法正常退出。

public class ExecutorShutdownExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        
        try {
            // 提交任务
            for (int i = 0; i < 10; i++) {
                executor.submit(() -> {
                    // 任务逻辑
                });
            }
        } finally {
            // 正确的关闭方式
            shutdownExecutor(executor);
        }
    }
    
    private static void shutdownExecutor(ExecutorService executor) {
        executor.shutdown(); // 不再接受新任务
        
        try {
            // 等待已提交任务完成
            if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 强制关闭
                
                // 再次等待
                if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Executor did not terminate");
                }
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

异常处理

public class ExceptionHandlingExample {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 方式1:通过Future获取异常
        Future future = executor.submit(() -> {
            if (Math.random() > 0.5) {
                throw new RuntimeException("Task failed");
            }
            return "Success";
        });
        
        try {
            String result = future.get();
            System.out.println("Result: " + result);
        } catch (ExecutionException e) {
            System.err.println("Task failed: " + e.getCause().getMessage());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 方式2:自定义ThreadFactory处理未捕获异常
        ThreadFactory threadFactory = r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler((thread, ex) -> {
                System.err.println("Uncaught exception in thread " + 
                    thread.getName() + ": " + ex.getMessage());
            });
            return t;
        };
        
        ExecutorService customExecutor = new ThreadPoolExecutor(
            5, 5, 0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(),
            threadFactory
        );
        
        executor.shutdown();
        customExecutor.shutdown();
    }
}

性能优化建议

线程池大小
  • CPU密集型:线程数 = CPU核心数 + 1
  • IO密集型:线程数 = CPU核心数 × (1 + IO等待时间/CPU计算时间)
  • 混合型:根据实际测试调优
队列选择
  • ArrayBlockingQueue:有界队列,防止内存溢出
  • LinkedBlockingQueue:可选有界,性能较好
  • SynchronousQueue:直接交换,适合缓存线程池
监控指标
  • 活跃线程数
  • 队列长度
  • 任务完成数
  • 拒绝任务数
上一章:ThreadPoolExecutor详解 返回目录 下一章:定时任务线程池