第10章

⚙️ Executors工具类

掌握线程池工厂方法,学会选择和自定义合适的线程池

学习目标

Executors详解

Executors是Java并发包中的一个工具类,它提供了一系列静态工厂方法来创建不同类型的线程池。这个类采用了工厂方法模式,为开发者提供了便利的线程池创建方式,同时也隐藏了线程池的复杂配置细节。

核心理解

Executors提供便利性,但在生产环境中,建议使用ThreadPoolExecutor直接创建线程池以获得更好的控制力。

工厂方法模式的优势

简化创建
通过简单的方法调用就能创建常用的线程池,无需了解复杂的构造参数。
最佳实践
内置了经过验证的线程池配置,减少了配置错误的可能性。
代码简洁
减少样板代码,让开发者专注于业务逻辑而非线程池配置。

便利性vs灵活性

虽然Executors提供了便利性,但在某些场景下可能限制了灵活性。阿里巴巴Java开发手册建议避免使用Executors创建线程池,主要原因是:

FixedThreadPool

FixedThreadPool创建一个固定大小的线程池,线程数量始终保持不变。当所有线程都在执行任务时,新任务会在队列中等待。

创建FixedThreadPool
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(5);

// 提交任务
for (int i = 0; i < 10; i++) {
    final int taskId = i;
    executor.submit(() -> {
        System.out.println("Task " + taskId + " executed by " + 
                          Thread.currentThread().getName());
        try {
            Thread.sleep(2000); // 模拟任务执行
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// 关闭线程池
executor.shutdown();

特点分析

固定大小
线程数量固定,不会因为任务增加而无限制创建线程。
无界队列
使用LinkedBlockingQueue,理论上可以存储无限数量的任务。
线程复用
线程会被重复使用,减少了线程创建和销毁的开销。

适用场景

注意事项

由于使用无界队列,在高负载情况下可能导致内存溢出。建议在生产环境中使用有界队列。

CachedThreadPool

CachedThreadPool创建一个可缓存的线程池,线程数量会根据需要动态调整。空闲线程会被缓存60秒,超时后会被回收。

创建CachedThreadPool
// 创建缓存线程池
ExecutorService executor = Executors.newCachedThreadPool();

// 提交大量短期任务
for (int i = 0; i < 100; i++) {
    final int taskId = i;
    executor.submit(() -> {
        System.out.println("Task " + taskId + " executed by " + 
                          Thread.currentThread().getName());
        try {
            Thread.sleep(100); // 短期任务
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    });
}

// 关闭线程池
executor.shutdown();

动态调整机制

弹性扩展
根据任务数量动态创建线程,理论上没有上限。
自动回收
空闲线程60秒后自动回收,节省系统资源。
快速响应
新任务能够快速得到执行,适合突发性负载。

适用场景

风险提醒

在高并发场景下可能创建大量线程,导致系统资源耗尽。建议设置合理的线程数量上限。

ScheduledThreadPool

ScheduledThreadPool用于执行定时任务和周期性任务,支持延迟执行和固定频率执行。

创建ScheduledThreadPool
// 创建定时任务线程池
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3);

// 延迟执行任务
scheduler.schedule(() -> {
    System.out.println("延迟5秒执行的任务");
}, 5, TimeUnit.SECONDS);

// 固定频率执行任务
scheduler.scheduleAtFixedRate(() -> {
    System.out.println("每3秒执行一次: " + new Date());
}, 0, 3, TimeUnit.SECONDS);

// 固定延迟执行任务
scheduler.scheduleWithFixedDelay(() -> {
    System.out.println("上次执行完成后延迟2秒再执行");
    try {
        Thread.sleep(1000); // 模拟任务执行时间
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}, 0, 2, TimeUnit.SECONDS);

定时任务类型

延迟执行
schedule()方法可以延迟指定时间后执行任务。
固定频率
scheduleAtFixedRate()按固定频率执行,不考虑任务执行时间。
固定延迟
scheduleWithFixedDelay()在上次任务完成后延迟指定时间。

实际应用示例

数据清理任务
public class DataCleanupTask {
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(2);
    
    public void startCleanupTasks() {
        // 每天凌晨2点执行数据清理
        long initialDelay = calculateInitialDelay();
        scheduler.scheduleAtFixedRate(
            this::cleanupOldData,
            initialDelay,
            TimeUnit.DAYS.toSeconds(1),
            TimeUnit.SECONDS
        );
        
        // 每小时执行缓存清理
        scheduler.scheduleAtFixedRate(
            this::cleanupCache,
            0,
            1,
            TimeUnit.HOURS
        );
    }
    
    private void cleanupOldData() {
        System.out.println("执行数据清理任务: " + new Date());
        // 清理逻辑
    }
    
    private void cleanupCache() {
        System.out.println("执行缓存清理任务: " + new Date());
        // 缓存清理逻辑
    }
    
    private long calculateInitialDelay() {
        // 计算到下一个凌晨2点的延迟时间
        // 实现省略...
        return 0;
    }
}

自定义线程池最佳实践

在生产环境中,建议使用ThreadPoolExecutor直接创建线程池,以获得更好的控制力和可观测性。

自定义线程池示例
public class CustomThreadPoolFactory {
    
    public static ThreadPoolExecutor createCustomThreadPool(
            String poolName, 
            int corePoolSize, 
            int maximumPoolSize) {
        
        return new ThreadPoolExecutor(
            corePoolSize,                    // 核心线程数
            maximumPoolSize,                 // 最大线程数
            60L,                            // 空闲线程存活时间
            TimeUnit.SECONDS,               // 时间单位
            new LinkedBlockingQueue<>(1000), // 有界队列
            new CustomThreadFactory(poolName), // 自定义线程工厂
            new CustomRejectedExecutionHandler() // 自定义拒绝策略
        );
    }
    
    // 自定义线程工厂
    static class CustomThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        
        CustomThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix + "-thread-";
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement());
            thread.setDaemon(false);
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        }
    }
    
    // 自定义拒绝策略
    static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // 记录日志
            System.err.println("Task rejected: " + r.toString());
            
            // 可以选择其他处理方式:
            // 1. 抛出异常(默认)
            // 2. 在调用者线程中执行
            // 3. 丢弃任务
            // 4. 丢弃队列中最老的任务
            
            // 这里选择在调用者线程中执行
            if (!executor.isShutdown()) {
                r.run();
            }
        }
    }
}

参数选择指南

CPU密集型
核心线程数 = CPU核心数 + 1,最大线程数 = CPU核心数 * 2
I/O密集型
核心线程数 = CPU核心数 * 2,根据I/O等待时间调整
混合型
根据任务特点和性能测试结果进行调优

监控和调优

线程池监控
public class ThreadPoolMonitor {
    
    public static void monitorThreadPool(ThreadPoolExecutor executor, String poolName) {
        ScheduledExecutorService monitor = Executors.newScheduledThreadPool(1);
        
        monitor.scheduleAtFixedRate(() -> {
            System.out.println("=== " + poolName + " 线程池监控 ===");
            System.out.println("核心线程数: " + executor.getCorePoolSize());
            System.out.println("最大线程数: " + executor.getMaximumPoolSize());
            System.out.println("当前线程数: " + executor.getPoolSize());
            System.out.println("活跃线程数: " + executor.getActiveCount());
            System.out.println("队列大小: " + executor.getQueue().size());
            System.out.println("已完成任务数: " + executor.getCompletedTaskCount());
            System.out.println("总任务数: " + executor.getTaskCount());
            System.out.println();
        }, 0, 10, TimeUnit.SECONDS);
    }
}
上一章:线程池详解 返回目录 下一章:Future和CompletableFuture