定时任务的实现方式,手写代码实现(延迟队列与调度逻辑的结合)
  1. 定义任务累,实现 Delayed 接口以支持延迟队列
  2. 定义调度器类,管理任务的调度和执行
  3. 测试
    源代码如下:
package org.pt.schedule;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger; 

class MyScheduledTask implements Delayed {
    private final Runnable task; // 定义任务的具体执行逻辑
    private final long period; // 定义周期时间(毫秒),0 表示非周期任务
    private long nextExecutionTime; // 定义下次执行的绝对时间(毫秒)

    // 构造函数:初始化任务、首次执行时间和周期
    public MyScheduledTask(Runnable task, LocalDateTime executionTime, long period, TimeUnit unit) {
        this.task = task; // 初始化任务逻辑
        this.period = period > 0 ? unit.toMillis(period) : 0; // 将周期时间转换为毫秒,若无周期则为 0
        this.nextExecutionTime = executionTime.atZone(ZoneId.systemDefault()) // 将 LocalDateTime 转换为带时区的时间
                .toInstant().toEpochMilli(); // 转换为毫秒时间戳
    }

    // 执行任务的方法
    public void run() {
        System.out.println("任务开始执行: " + System.currentTimeMillis()); // 打印任务开始时间
        task.run(); // 执行用户定义的任务逻辑
        if (period > 0) { // 如果是周期任务
            nextExecutionTime = System.currentTimeMillis() + period; // 更新下次执行时间
        }
    }

    // 判断任务是否为周期任务
    public boolean isPeriodic() {
        return period > 0; // period 大于 0 表示周期任务
    }

    // 获取任务距离到期的时间(实现 Delayed 接口)
    @Override
    public long getDelay(TimeUnit unit) {
        long delay = nextExecutionTime - System.currentTimeMillis(); // 计算剩余延迟时间
        return unit.convert(delay, TimeUnit.MILLISECONDS); // 转换为指定时间单位
    }

    // 比较两个任务的到期时间(实现 Delayed 接口,用于队列排序)
    @Override
    public int compareTo(Delayed o) {
        return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); // 按延迟时间排序
    }

    // 获取下次执行时间
    public long getNextExecutionTime() {
        return nextExecutionTime; // 返回下次执行的绝对时间
    }
}

// 定义调度器类,管理任务的调度和执行
class MyScheduler {
    private final DelayQueue<MyScheduledTask> taskQueue; // 定义延迟队列,存储待执行任务
    private final ThreadPoolExecutor executor; // 定义线程池,用于并发执行任务
    private volatile boolean running; // 定义运行状态,volatile 确保线程可见性

    // 构造函数:初始化调度器
    public MyScheduler(int poolSize) {
        taskQueue = new DelayQueue<>(); // 初始化延迟队列
        executor = new ThreadPoolExecutor( // 初始化线程池
                poolSize, // 核心线程数
                poolSize, // 最大线程数
                0L, TimeUnit.MILLISECONDS, // 空闲线程存活时间
                new LinkedBlockingQueue<>(), // 任务队列,使用无界队列
                new ThreadFactory() { // 自定义线程工厂
                    private final AtomicInteger threadCount = new AtomicInteger(1); // 线程计数器

                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "MyScheduler-Thread-" + threadCount.getAndIncrement()); // 创建并命名线程
                    }
                }
        );
        running = true; // 设置初始运行状态为 true
        startScheduler(); // 启动调度器
    }

    // 提交单次任务,指定执行时间
    public void schedule(Runnable task, LocalDateTime executionTime) {
        schedule(task, executionTime, 0, TimeUnit.MILLISECONDS); // 调用内部方法,period 为 0 表示非周期任务
    }

    // 提交周期任务,指定首次执行时间和周期
    public void scheduleAtFixedRate(Runnable task, LocalDateTime initialExecutionTime, long period, TimeUnit unit) {
        schedule(task, initialExecutionTime, period, unit); // 调用内部方法
    }

    // 内部方法:提交任务到队列
    private void schedule(Runnable task, LocalDateTime executionTime, long period, TimeUnit unit) {
        if (!running) { // 检查调度器是否已关闭
            throw new IllegalStateException("Scheduler is shut down"); // 抛出异常
        }
        MyScheduledTask scheduledTask = new MyScheduledTask(task, executionTime, period, unit); // 创建任务对象
        System.out.println("任务已提交,预计执行时间: " + scheduledTask.getNextExecutionTime()); // 打印任务提交信息
        taskQueue.offer(scheduledTask); // 将任务加入延迟队列
    }

    // 启动调度线程,使用异步回调模型
    private void startScheduler() {
        // 定义调度逻辑为一个递归异步任务
        Runnable schedulerRunnable = new Runnable() {
            @Override
            public void run() {
                if (!running) { // 检查调度器是否运行
                    System.out.println("调度线程已停止"); // 打印停止信息
                    return; // 退出方法
                }
                System.out.println("等待任务到期..."); // 提示等待任务到期
                CompletableFuture.supplyAsync(() -> { // 异步获取到期任务
                    try {
                        return taskQueue.take(); // 阻塞取出到期任务
                    } catch (InterruptedException e) { // 捕获中断异常
                        Thread.currentThread().interrupt(); // 恢复线程中断状态
                        System.out.println("调度线程被中断"); // 打印中断信息
                        return null; // 返回 null 表示异常退出
                    }
                }, executor).thenAccept(task -> { // 处理取出的任务
                    if (task != null && running) { // 如果任务有效且调度器运行
                        System.out.println("任务到期,准备执行: " + System.currentTimeMillis()); // 打印到期时间
                        // 执行任务
                        CompletableFuture.runAsync(() -> { // 异步执行任务
                            try {
                                task.run(); // 执行任务逻辑
                                if (task.isPeriodic() && running) { // 如果是周期任务且调度器未关闭
                                    System.out.println("周期任务,重新加入队列,下一执行时间: " + task.getNextExecutionTime()); // 打印重新入队信息
                                    taskQueue.offer(task); // 将周期任务重新加入队列
                                }
                            } catch (Exception e) { // 捕获任务执行异常
                                e.printStackTrace(); // 打印异常堆栈
                            }
                        }, executor).thenRun(this); // 任务执行完后递归调用当前 Runnable
                    } else if (running) { // 如果任务为空但调度器仍运行
                        // 如果队列为空,继续调度
                        CompletableFuture.runAsync(this, executor); // 异步递归调用调度逻辑
                    }
                });
            }
        };

        // 启动首次调度
        CompletableFuture.runAsync(schedulerRunnable, executor); // 使用 CompletableFuture 异步启动调度
    }

    // 关闭调度器
    public void shutdown() {
        running = false; // 设置运行状态为 false
        taskQueue.clear(); // 清空任务队列
        executor.shutdown(); // 开始关闭线程池,等待任务完成
        try {
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { // 等待 5 秒
                executor.shutdownNow(); // 如果未完成,强制关闭线程池
            }
        } catch (InterruptedException e) { // 捕获等待时的中断异常
            executor.shutdownNow(); // 强制关闭线程池
        }
        System.out.println("调度器已关闭"); // 打印关闭信息
    }
}


class Main {
    public static void main(String[] args) { // 主方法入口
        MyScheduler scheduler = new MyScheduler(2); // 创建调度器,线程池大小为 2

        LocalDateTime singleTaskTime = LocalDateTime.now().plusSeconds(2); // 设置单次任务执行时间为当前时间 + 2 秒
        scheduler.schedule( // 提交单次任务
                () -> System.out.println("单次任务执行完成: " + System.currentTimeMillis()), // 任务逻辑
                singleTaskTime // 执行时间
        );

        LocalDateTime periodicTaskTime = LocalDateTime.now().plusSeconds(1); // 设置周期任务首次执行时间为当前时间 + 1 秒
        scheduler.scheduleAtFixedRate( // 提交周期任务
                () -> System.out.println("周期任务执行完成: " + System.currentTimeMillis()), // 任务逻辑
                periodicTaskTime, 5, TimeUnit.SECONDS // 首次执行时间和周期(5 秒)
        );

        try {
            Thread.sleep(15000); // 主线程休眠 15 秒,观察任务执行
            scheduler.shutdown(); // 关闭调度器
        } catch (InterruptedException e) { // 捕获休眠中断异常
            e.printStackTrace(); // 打印异常堆栈
        }
    }
}