第11章

⏰ 定时任务线程池

掌握ScheduledThreadPoolExecutor的使用方法、定时任务类型、调度机制和异常处理

学习目标

ScheduledThreadPoolExecutor概述

ScheduledThreadPoolExecutor是Java并发包中提供的定时任务线程池,它继承自ThreadPoolExecutor,专门用于执行定时任务和周期性任务。相比Timer类,它提供了更强大的功能和更好的性能。

核心特性

ScheduledThreadPoolExecutor支持多线程执行定时任务,避免了Timer单线程的局限性,提供了更好的并发性能和异常处理能力。

主要优势

多线程支持
支持多个线程同时执行定时任务,避免任务阻塞影响其他任务的执行。
异常隔离
单个任务的异常不会影响其他任务的执行,提供更好的稳定性。
精确调度
提供更精确的时间调度机制,支持多种调度策略。

创建ScheduledThreadPoolExecutor

// 方式1:直接创建
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(5);

// 方式2:通过Executors工厂方法
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(5);

// 方式3:单线程定时任务执行器
ScheduledExecutorService singleScheduled = Executors.newSingleThreadScheduledExecutor();

定时任务类型

ScheduledThreadPoolExecutor支持多种类型的定时任务,包括延迟执行、固定频率执行和固定延迟执行。

延迟执行任务

schedule方法用于在指定延迟后执行一次任务。

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);

// 延迟3秒执行任务
ScheduledFuture future = executor.schedule(() -> {
    System.out.println("延迟任务执行:" + new Date());
}, 3, TimeUnit.SECONDS);

// 延迟执行有返回值的任务
ScheduledFuture futureWithResult = executor.schedule(() -> {
    return "任务执行完成:" + new Date();
}, 5, TimeUnit.SECONDS);

固定频率执行

scheduleAtFixedRate方法按固定频率执行任务,不管上次任务是否完成。

// 初始延迟1秒,然后每2秒执行一次
ScheduledFuture fixedRateTask = executor.scheduleAtFixedRate(() -> {
    System.out.println("固定频率任务:" + new Date());
    // 模拟任务执行时间
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}, 1, 2, TimeUnit.SECONDS);

固定延迟执行

scheduleWithFixedDelay方法在上次任务完成后等待固定时间再执行下次任务。

// 初始延迟1秒,任务完成后延迟2秒再执行下次任务
ScheduledFuture fixedDelayTask = executor.scheduleWithFixedDelay(() -> {
    System.out.println("固定延迟任务:" + new Date());
    // 模拟任务执行时间
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}, 1, 2, TimeUnit.SECONDS);
区别说明
  • scheduleAtFixedRate:按固定频率执行,如果任务执行时间超过间隔时间,下次任务会立即执行
  • scheduleWithFixedDelay:固定延迟执行,确保任务之间有固定的间隔时间

调度机制

ScheduledThreadPoolExecutor内部使用DelayQueue来管理定时任务,通过ScheduledFutureTask包装任务并计算执行时间。

内部实现原理

DelayQueue
使用DelayQueue存储待执行的任务,按执行时间排序,确保最早执行的任务在队列头部。
ScheduledFutureTask
包装原始任务,添加执行时间、周期等信息,实现Delayed接口支持延迟队列。
周期任务处理
周期性任务执行完成后会重新计算下次执行时间并重新加入队列。

时间精度考虑

时间精度
  • ScheduledThreadPoolExecutor的时间精度依赖于系统时钟
  • 在高负载情况下,任务可能会有轻微的延迟
  • 对于需要高精度定时的场景,需要考虑其他解决方案

异常处理

定时任务中的异常处理非常重要,未捕获的异常会导致周期性任务停止执行。

异常处理策略

ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);

// 方式1:在任务内部处理异常
executor.scheduleAtFixedRate(() -> {
    try {
        // 可能抛出异常的业务逻辑
        performBusinessLogic();
    } catch (Exception e) {
        // 记录异常日志
        logger.error("定时任务执行异常", e);
        // 可以选择是否重新抛出异常
    }
}, 0, 1, TimeUnit.MINUTES);

// 方式2:自定义线程工厂处理未捕获异常
ThreadFactory threadFactory = new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setUncaughtExceptionHandler((t, e) -> {
            logger.error("线程 {} 发生未捕获异常", t.getName(), e);
        });
        return thread;
    }
};

ScheduledThreadPoolExecutor customExecutor = 
    new ScheduledThreadPoolExecutor(3, threadFactory);

任务失败恢复

public class RobustScheduledTask {
    private final ScheduledExecutorService executor;
    private final AtomicInteger failureCount = new AtomicInteger(0);
    private final int maxRetries = 3;
    
    public void scheduleRobustTask() {
        executor.scheduleAtFixedRate(() -> {
            try {
                executeTask();
                // 任务成功,重置失败计数
                failureCount.set(0);
            } catch (Exception e) {
                int failures = failureCount.incrementAndGet();
                logger.warn("任务执行失败,失败次数: {}", failures, e);
                
                if (failures >= maxRetries) {
                    logger.error("任务连续失败{}次,停止执行", maxRetries);
                    // 可以选择停止任务或采取其他措施
                    throw new RuntimeException("任务失败次数过多", e);
                }
            }
        }, 0, 1, TimeUnit.MINUTES);
    }
}

实际应用场景

定时任务线程池在实际开发中有很多应用场景,以下是一些常见的使用案例。

数据清理任务

@Component
public class DataCleanupScheduler {
    
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(2);
    
    @PostConstruct
    public void startCleanupTasks() {
        // 每天凌晨2点清理过期数据
        scheduleDaily(() -> {
            cleanupExpiredData();
        }, 2, 0); // 2点0分
        
        // 每小时清理临时文件
        scheduler.scheduleAtFixedRate(() -> {
            cleanupTempFiles();
        }, 0, 1, TimeUnit.HOURS);
    }
    
    private void scheduleDaily(Runnable task, int hour, int minute) {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nextRun = now.withHour(hour).withMinute(minute).withSecond(0);
        if (now.isAfter(nextRun)) {
            nextRun = nextRun.plusDays(1);
        }
        
        long initialDelay = Duration.between(now, nextRun).getSeconds();
        scheduler.scheduleAtFixedRate(task, initialDelay, 
            TimeUnit.DAYS.toSeconds(1), TimeUnit.SECONDS);
    }
}

监控检查任务

@Service
public class HealthCheckScheduler {
    
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(3);
    
    public void startHealthChecks() {
        // 数据库连接检查
        scheduler.scheduleAtFixedRate(() -> {
            checkDatabaseConnection();
        }, 0, 30, TimeUnit.SECONDS);
        
        // 外部服务检查
        scheduler.scheduleAtFixedRate(() -> {
            checkExternalServices();
        }, 0, 1, TimeUnit.MINUTES);
        
        // 系统资源检查
        scheduler.scheduleAtFixedRate(() -> {
            checkSystemResources();
        }, 0, 5, TimeUnit.MINUTES);
    }
    
    private void checkDatabaseConnection() {
        try {
            // 执行简单查询验证数据库连接
            dataSource.getConnection().isValid(5);
            logger.debug("数据库连接正常");
        } catch (Exception e) {
            logger.error("数据库连接异常", e);
            // 发送告警
            alertService.sendAlert("数据库连接异常", e.getMessage());
        }
    }
}

定期报告生成

@Service
public class ReportScheduler {
    
    private final ScheduledExecutorService scheduler = 
        Executors.newScheduledThreadPool(2);
    
    public void scheduleReports() {
        // 每周一生成周报
        scheduleWeekly(() -> {
            generateWeeklyReport();
        }, DayOfWeek.MONDAY, 9, 0); // 周一9点
        
        // 每月1号生成月报
        scheduleMonthly(() -> {
            generateMonthlyReport();
        }, 1, 10, 0); // 每月1号10点
    }
    
    private void scheduleWeekly(Runnable task, DayOfWeek dayOfWeek, 
                               int hour, int minute) {
        LocalDateTime now = LocalDateTime.now();
        LocalDateTime nextRun = now.with(TemporalAdjusters.nextOrSame(dayOfWeek))
                                  .withHour(hour).withMinute(minute).withSecond(0);
        
        if (now.isAfter(nextRun)) {
            nextRun = nextRun.plusWeeks(1);
        }
        
        long initialDelay = Duration.between(now, nextRun).getSeconds();
        scheduler.scheduleAtFixedRate(task, initialDelay, 
            TimeUnit.DAYS.toSeconds(7), TimeUnit.SECONDS);
    }
}

最佳实践

线程池配置

配置建议
  • 核心线程数:根据定时任务的数量和执行频率合理设置
  • 线程命名:使用有意义的线程名称便于问题排查
  • 异常处理:确保所有任务都有适当的异常处理机制
  • 资源清理:应用关闭时正确关闭线程池

性能优化

注意事项

重要提醒
  • 周期性任务中的未捕获异常会导致任务停止执行
  • scheduleAtFixedRate可能导致任务堆积,需要监控任务执行时间
  • 应用关闭时需要正确关闭线程池,避免资源泄露
  • 对于需要高精度定时的场景,考虑使用专门的定时框架
上一章:Executors工具类 返回目录 下一章:性能优化