定时任务的实现方式,手写代码实现(延迟队列与调度逻辑的结合)
- 定义任务累,实现 Delayed 接口以支持延迟队列
- 定义调度器类,管理任务的调度和执行
- 测试
源代码如下:
package org.pt.schedule;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
class MyScheduledTask implements Delayed {
private final Runnable task; // 定义任务的具体执行逻辑
private final long period; // 定义周期时间(毫秒),0 表示非周期任务
private long nextExecutionTime; // 定义下次执行的绝对时间(毫秒)
// 构造函数:初始化任务、首次执行时间和周期
public MyScheduledTask(Runnable task, LocalDateTime executionTime, long period, TimeUnit unit) {
this.task = task; // 初始化任务逻辑
this.period = period > 0 ? unit.toMillis(period) : 0; // 将周期时间转换为毫秒,若无周期则为 0
this.nextExecutionTime = executionTime.atZone(ZoneId.systemDefault()) // 将 LocalDateTime 转换为带时区的时间
.toInstant().toEpochMilli(); // 转换为毫秒时间戳
}
// 执行任务的方法
public void run() {
System.out.println("任务开始执行: " + System.currentTimeMillis()); // 打印任务开始时间
task.run(); // 执行用户定义的任务逻辑
if (period > 0) { // 如果是周期任务
nextExecutionTime = System.currentTimeMillis() + period; // 更新下次执行时间
}
}
// 判断任务是否为周期任务
public boolean isPeriodic() {
return period > 0; // period 大于 0 表示周期任务
}
// 获取任务距离到期的时间(实现 Delayed 接口)
@Override
public long getDelay(TimeUnit unit) {
long delay = nextExecutionTime - System.currentTimeMillis(); // 计算剩余延迟时间
return unit.convert(delay, TimeUnit.MILLISECONDS); // 转换为指定时间单位
}
// 比较两个任务的到期时间(实现 Delayed 接口,用于队列排序)
@Override
public int compareTo(Delayed o) {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); // 按延迟时间排序
}
// 获取下次执行时间
public long getNextExecutionTime() {
return nextExecutionTime; // 返回下次执行的绝对时间
}
}
// 定义调度器类,管理任务的调度和执行
class MyScheduler {
private final DelayQueue<MyScheduledTask> taskQueue; // 定义延迟队列,存储待执行任务
private final ThreadPoolExecutor executor; // 定义线程池,用于并发执行任务
private volatile boolean running; // 定义运行状态,volatile 确保线程可见性
// 构造函数:初始化调度器
public MyScheduler(int poolSize) {
taskQueue = new DelayQueue<>(); // 初始化延迟队列
executor = new ThreadPoolExecutor( // 初始化线程池
poolSize, // 核心线程数
poolSize, // 最大线程数
0L, TimeUnit.MILLISECONDS, // 空闲线程存活时间
new LinkedBlockingQueue<>(), // 任务队列,使用无界队列
new ThreadFactory() { // 自定义线程工厂
private final AtomicInteger threadCount = new AtomicInteger(1); // 线程计数器
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "MyScheduler-Thread-" + threadCount.getAndIncrement()); // 创建并命名线程
}
}
);
running = true; // 设置初始运行状态为 true
startScheduler(); // 启动调度器
}
// 提交单次任务,指定执行时间
public void schedule(Runnable task, LocalDateTime executionTime) {
schedule(task, executionTime, 0, TimeUnit.MILLISECONDS); // 调用内部方法,period 为 0 表示非周期任务
}
// 提交周期任务,指定首次执行时间和周期
public void scheduleAtFixedRate(Runnable task, LocalDateTime initialExecutionTime, long period, TimeUnit unit) {
schedule(task, initialExecutionTime, period, unit); // 调用内部方法
}
// 内部方法:提交任务到队列
private void schedule(Runnable task, LocalDateTime executionTime, long period, TimeUnit unit) {
if (!running) { // 检查调度器是否已关闭
throw new IllegalStateException("Scheduler is shut down"); // 抛出异常
}
MyScheduledTask scheduledTask = new MyScheduledTask(task, executionTime, period, unit); // 创建任务对象
System.out.println("任务已提交,预计执行时间: " + scheduledTask.getNextExecutionTime()); // 打印任务提交信息
taskQueue.offer(scheduledTask); // 将任务加入延迟队列
}
// 启动调度线程,使用异步回调模型
private void startScheduler() {
// 定义调度逻辑为一个递归异步任务
Runnable schedulerRunnable = new Runnable() {
@Override
public void run() {
if (!running) { // 检查调度器是否运行
System.out.println("调度线程已停止"); // 打印停止信息
return; // 退出方法
}
System.out.println("等待任务到期..."); // 提示等待任务到期
CompletableFuture.supplyAsync(() -> { // 异步获取到期任务
try {
return taskQueue.take(); // 阻塞取出到期任务
} catch (InterruptedException e) { // 捕获中断异常
Thread.currentThread().interrupt(); // 恢复线程中断状态
System.out.println("调度线程被中断"); // 打印中断信息
return null; // 返回 null 表示异常退出
}
}, executor).thenAccept(task -> { // 处理取出的任务
if (task != null && running) { // 如果任务有效且调度器运行
System.out.println("任务到期,准备执行: " + System.currentTimeMillis()); // 打印到期时间
// 执行任务
CompletableFuture.runAsync(() -> { // 异步执行任务
try {
task.run(); // 执行任务逻辑
if (task.isPeriodic() && running) { // 如果是周期任务且调度器未关闭
System.out.println("周期任务,重新加入队列,下一执行时间: " + task.getNextExecutionTime()); // 打印重新入队信息
taskQueue.offer(task); // 将周期任务重新加入队列
}
} catch (Exception e) { // 捕获任务执行异常
e.printStackTrace(); // 打印异常堆栈
}
}, executor).thenRun(this); // 任务执行完后递归调用当前 Runnable
} else if (running) { // 如果任务为空但调度器仍运行
// 如果队列为空,继续调度
CompletableFuture.runAsync(this, executor); // 异步递归调用调度逻辑
}
});
}
};
// 启动首次调度
CompletableFuture.runAsync(schedulerRunnable, executor); // 使用 CompletableFuture 异步启动调度
}
// 关闭调度器
public void shutdown() {
running = false; // 设置运行状态为 false
taskQueue.clear(); // 清空任务队列
executor.shutdown(); // 开始关闭线程池,等待任务完成
try {
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { // 等待 5 秒
executor.shutdownNow(); // 如果未完成,强制关闭线程池
}
} catch (InterruptedException e) { // 捕获等待时的中断异常
executor.shutdownNow(); // 强制关闭线程池
}
System.out.println("调度器已关闭"); // 打印关闭信息
}
}
class Main {
public static void main(String[] args) { // 主方法入口
MyScheduler scheduler = new MyScheduler(2); // 创建调度器,线程池大小为 2
LocalDateTime singleTaskTime = LocalDateTime.now().plusSeconds(2); // 设置单次任务执行时间为当前时间 + 2 秒
scheduler.schedule( // 提交单次任务
() -> System.out.println("单次任务执行完成: " + System.currentTimeMillis()), // 任务逻辑
singleTaskTime // 执行时间
);
LocalDateTime periodicTaskTime = LocalDateTime.now().plusSeconds(1); // 设置周期任务首次执行时间为当前时间 + 1 秒
scheduler.scheduleAtFixedRate( // 提交周期任务
() -> System.out.println("周期任务执行完成: " + System.currentTimeMillis()), // 任务逻辑
periodicTaskTime, 5, TimeUnit.SECONDS // 首次执行时间和周期(5 秒)
);
try {
Thread.sleep(15000); // 主线程休眠 15 秒,观察任务执行
scheduler.shutdown(); // 关闭调度器
} catch (InterruptedException e) { // 捕获休眠中断异常
e.printStackTrace(); // 打印异常堆栈
}
}
}