|
@@ -1,29 +1,27 @@
|
|
|
package com.nokia.tsl_data.scheduling.service;
|
|
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
+import java.time.Duration;
|
|
|
+import java.time.Instant;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ScheduledFuture;
|
|
|
+
|
|
|
+import javax.annotation.PreDestroy;
|
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.context.ApplicationContext;
|
|
|
-import org.springframework.scheduling.Trigger;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
|
|
|
-import org.springframework.scheduling.support.CronTrigger;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
|
|
|
-import com.nokia.tsl_data.scheduling.dao.ScheduledTaskMapper;
|
|
|
-import com.nokia.tsl_data.scheduling.entity.ScheduledTask;
|
|
|
-import com.nokia.tsl_data.scheduling.entity._enum.ScheduledStatus;
|
|
|
-import com.nokia.tsl_data.scheduling.entity.pojo.RunnableTask;
|
|
|
-import com.nokia.tsl_data.scheduling.entity.pojo.ScheduledParameter;
|
|
|
+import com.nokia.tsl_data.scheduling.entity.RunnableTask;
|
|
|
|
|
|
-import java.time.Duration;
|
|
|
-import java.time.Instant;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-import java.util.concurrent.ScheduledFuture;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
/**
|
|
|
- * 任务调度的核心服务
|
|
|
+ * 马上调度或者延时调度
|
|
|
*/
|
|
|
@Slf4j
|
|
|
@Service
|
|
@@ -31,203 +29,121 @@ public class TaskScheduleService {
|
|
|
/**
|
|
|
* 存储任务调度信息
|
|
|
*/
|
|
|
- private ConcurrentHashMap<ScheduledTask, ScheduledFuture<?>> tasksScheduledMap = new ConcurrentHashMap<>();
|
|
|
+ private ConcurrentHashMap<RunnableTask, ScheduledFuture<?>> tasksSchedulingMap = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
@Autowired
|
|
|
private ThreadPoolTaskScheduler taskScheduler;
|
|
|
+
|
|
|
@Autowired
|
|
|
private ApplicationContext applicationContext;
|
|
|
- @Autowired
|
|
|
- private ScheduledTaskMapper scheduledTaskMapper;
|
|
|
|
|
|
/**
|
|
|
* 调度任务
|
|
|
*/
|
|
|
- public void schedule(ScheduledTask scheduledTask) {
|
|
|
- if (tasksScheduledMap.containsKey(scheduledTask)) {
|
|
|
- // 如果是重复调度,直接返回
|
|
|
- ScheduledFuture<?> future = tasksScheduledMap.get(scheduledTask);
|
|
|
+ public void schedule(RunnableTask task) {
|
|
|
+ // 检查任务是否合法
|
|
|
+ checkRunnableTask(task);
|
|
|
+ // 检查是否已调度过
|
|
|
+ if (tasksSchedulingMap.containsKey(task)) {
|
|
|
+ ScheduledFuture<?> future = tasksSchedulingMap.get(task);
|
|
|
if (!future.isDone()) {
|
|
|
- // 任务尚未完成
|
|
|
- log.warn("任务{}已调度但未完成,不要重复调度...", scheduledTask.getName());
|
|
|
- return;
|
|
|
+ log.error("任务 {} 已调度但未完成,不要重复调度...", task);
|
|
|
+ throw new RuntimeException(String.format("任务 %s 已调度但未完成,不要重复调度...", task));
|
|
|
+ } else {
|
|
|
+ log.debug("任务 {} 已完成,将重新调度...", task);
|
|
|
}
|
|
|
}
|
|
|
- Runnable runnable = new RunnableTask(applicationContext, scheduledTask);
|
|
|
- if (scheduledTask.getStatus().needToStartSchedule()) {
|
|
|
- ScheduledFuture<?> future = null;
|
|
|
- ScheduledParameter scheduledParameter = scheduledTask.getScheduledParameter();
|
|
|
- switch (scheduledTask.getScheduledType()) {
|
|
|
- case CRON:
|
|
|
- if (scheduledParameter == null) {
|
|
|
- throw new RuntimeException("定时任务(CRON)调度参数 scheduledParameter 必须指定 cronExpression ...");
|
|
|
- }
|
|
|
- try {
|
|
|
- // 调度定时任务 必须指定cronExpression
|
|
|
- Trigger trigger = new CronTrigger(scheduledParameter.getCronExpression());
|
|
|
- // 开始时间为空或者早于当前时间
|
|
|
- future = taskScheduler.schedule(runnable, trigger);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- throw new RuntimeException("定时任务(CRON)调度出错: " + e.getMessage());
|
|
|
- }
|
|
|
- break;
|
|
|
- case INTERVAL:
|
|
|
- // 周期任务
|
|
|
- if (scheduledParameter == null) {
|
|
|
- throw new RuntimeException(
|
|
|
- "周期任务(INTERVAL)调度参数 scheduledParameter 必须指定 periodOfSeconds 可选 startTime ...");
|
|
|
- }
|
|
|
- if (scheduledParameter.getPeriodOfSeconds() == null
|
|
|
- || scheduledParameter.getPeriodOfSeconds() <= 0) {
|
|
|
- throw new RuntimeException("周期任务(INTERVAL)调度参数 scheduledParameter.periodOfSeconds 必须为正整数: "
|
|
|
- + scheduledParameter.getPeriodOfSeconds());
|
|
|
- }
|
|
|
- Duration period = Duration.ofSeconds(scheduledParameter.getPeriodOfSeconds());
|
|
|
- if (instantNullOrBeforeNow(scheduledParameter.getStartTime())) {
|
|
|
- // 开始时间为空或者早于当前时间
|
|
|
- future = taskScheduler.scheduleAtFixedRate(runnable, period);
|
|
|
- } else {
|
|
|
- future = taskScheduler.scheduleAtFixedRate(runnable,
|
|
|
- scheduledParameter.getStartTime(), period);
|
|
|
- }
|
|
|
- break;
|
|
|
- case FIXED_DELAY:
|
|
|
- // 延时调度
|
|
|
- if (scheduledParameter == null) {
|
|
|
- throw new RuntimeException(
|
|
|
- "固定延时任务(FIXED_DELAY)调度参数 scheduledParameter 必须指定 delayOfSeconds 可选 startTime ...");
|
|
|
- }
|
|
|
- if (scheduledParameter.getDelayOfSeconds() == null || scheduledParameter.getDelayOfSeconds() <= 0) {
|
|
|
- throw new RuntimeException("固定延时任务(FIXED_DELAY)调度参数 scheduledParameter.delayOfSeconds 必须为正整数: "
|
|
|
- + scheduledParameter.getDelayOfSeconds());
|
|
|
- }
|
|
|
- // 如果 scheduledTask.getScheduledParameter().getDelayOfSeconds() 为空会报空指针
|
|
|
- Duration delay = Duration.ofSeconds(scheduledParameter.getDelayOfSeconds());
|
|
|
- if (instantNullOrBeforeNow(scheduledParameter.getStartTime())) {
|
|
|
- // 开始时间为空或者早于当前时间
|
|
|
- future = taskScheduler.scheduleWithFixedDelay(runnable, delay);
|
|
|
- } else {
|
|
|
- future = taskScheduler.scheduleWithFixedDelay(runnable,
|
|
|
- scheduledParameter.getStartTime(), delay);
|
|
|
- }
|
|
|
- break;
|
|
|
- case ONCE:
|
|
|
- // 单次任务
|
|
|
- if (scheduledParameter == null) {
|
|
|
- throw new RuntimeException(
|
|
|
- "单次任务(ONCE)必须指定调度参数 scheduledParameter 的 startTime 或 delayOfSeconds ... ");
|
|
|
- }
|
|
|
- Instant startTimeForOnceTask = scheduledParameter.getStartTime();
|
|
|
- Long delayOfSeconds = scheduledParameter.getDelayOfSeconds();
|
|
|
- if (startTimeForOnceTask == null && delayOfSeconds == null) {
|
|
|
- throw new RuntimeException(
|
|
|
- "单次任务(ONCE)必须指定调度参数 scheduledParameter 的 startTime 或 delayOfSeconds ... ");
|
|
|
- } else if (startTimeForOnceTask != null) {
|
|
|
- // startTime 优先级高于 delay
|
|
|
- if (startTimeForOnceTask.isAfter(Instant.now())) {
|
|
|
- // 单次任务只有在启动时间大于等于当前时间时才启动调度
|
|
|
- future = taskScheduler.schedule(runnable, startTimeForOnceTask);
|
|
|
- } else {
|
|
|
- // 当启动时间早于当前时间时,更新状态为超时
|
|
|
- log.info("单次任务(ONCE) startTime: {} 早于当前时间: {} 直接跳过", startTimeForOnceTask, Instant.now());
|
|
|
- scheduledTask.setStatus(ScheduledStatus.TIMEOUT);
|
|
|
- scheduledTaskMapper.updateById(scheduledTask);
|
|
|
- }
|
|
|
- } else if (delayOfSeconds > 0) {
|
|
|
- future = taskScheduler.schedule(runnable, Instant.now().plusSeconds(delayOfSeconds));
|
|
|
- } else {
|
|
|
- throw new RuntimeException("单次任务(ONCE)指定的调度参数 scheduledParameter 中 delayOfSeconds 必须大于 0 ...");
|
|
|
- }
|
|
|
- break;
|
|
|
- case IMMEDIATELY:
|
|
|
- // 马上执行的任务
|
|
|
- default:
|
|
|
- future = taskScheduler.schedule(runnable, Instant.now());
|
|
|
- break;
|
|
|
- }
|
|
|
- if (future != null) {
|
|
|
- tasksScheduledMap.put(scheduledTask, future);
|
|
|
- }
|
|
|
+ // 调度任务
|
|
|
+ Long delay = task.getDelay();
|
|
|
+ if (delay != null) {
|
|
|
+ // 延时调度
|
|
|
+ tasksSchedulingMap.put(task, taskScheduler.schedule(task, Instant.now().plus(Duration.ofSeconds(delay))));
|
|
|
+ } else {
|
|
|
+ // 马上执行
|
|
|
+ tasksSchedulingMap.put(task, taskScheduler.schedule(task, Instant.now()));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 删除任务
|
|
|
*/
|
|
|
- public void remove(ScheduledTask scheduledTask) {
|
|
|
- if (tasksScheduledMap.containsKey(scheduledTask)) {
|
|
|
- ScheduledFuture<?> future = tasksScheduledMap.remove(scheduledTask);
|
|
|
+ public void remove(RunnableTask task) {
|
|
|
+ if (tasksSchedulingMap.contains(task)) {
|
|
|
+ ScheduledFuture<?> future = tasksSchedulingMap.remove(task);
|
|
|
if (future.isDone()) {
|
|
|
- scheduledTask.setStatus(ScheduledStatus.DONE);
|
|
|
- scheduledTaskMapper.updateById(scheduledTask);
|
|
|
+ log.debug("要删除的任务 {} 已完成...", task);
|
|
|
} else {
|
|
|
+ log.debug("要删除的任务 {} 尚未完成...", task);
|
|
|
future.cancel(true);
|
|
|
}
|
|
|
+ } else {
|
|
|
+ log.warn("尝试删除不存在的任务 {} ...", task);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 获取已调度的任务
|
|
|
+ * 获取正在调度的任务
|
|
|
*/
|
|
|
- public List<ScheduledTask> ListTasksScheduled() {
|
|
|
- refreshStatus();
|
|
|
- return new ArrayList<>(tasksScheduledMap.keySet());
|
|
|
+ public List<RunnableTask> listTasksScheduling() {
|
|
|
+ tasksSchedulingMap.forEach((task, future) -> {
|
|
|
+ if (future.isDone()) {
|
|
|
+ log.debug(" {} 任务已完成...", task);
|
|
|
+ tasksSchedulingMap.remove(task);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ return new ArrayList<>(tasksSchedulingMap.keySet());
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 结束任务
|
|
|
+ * 周期更新任务状态 每10分钟更新任务状态
|
|
|
*/
|
|
|
- public void cancelAll() {
|
|
|
- // 遍历,刷新已完成的任务
|
|
|
- tasksScheduledMap.forEach((task, future) -> {
|
|
|
+ @Scheduled(cron = "0 0/10 * * * *")
|
|
|
+ public void cronUpdate() {
|
|
|
+ log.debug("开始调度周期更新任务状态...");
|
|
|
+ tasksSchedulingMap.forEach((task, future) -> {
|
|
|
if (future.isDone()) {
|
|
|
- handleTaskIsDone(task);
|
|
|
- } else {
|
|
|
- future.cancel(true);
|
|
|
+ log.debug(" {} 任务已完成...", task);
|
|
|
+ tasksSchedulingMap.remove(task);
|
|
|
}
|
|
|
});
|
|
|
- }
|
|
|
-
|
|
|
- public boolean isTaskScheduled(ScheduledTask task) {
|
|
|
- ScheduledFuture<?> future = tasksScheduledMap.get(task);
|
|
|
- if (future == null)
|
|
|
- return false;
|
|
|
- if (future.isDone()) {
|
|
|
- handleTaskIsDone(task);
|
|
|
- return false;
|
|
|
- }
|
|
|
- return true;
|
|
|
+ log.debug("周期更新任务状态已完成...");
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 刷新任务状态
|
|
|
+ * 在退出时结束全部任务
|
|
|
+ * 这里不做记录,也就是说不支持重新加载未完成的任务
|
|
|
*/
|
|
|
- public void refreshStatus() {
|
|
|
- // 遍历,刷新已完成的任务
|
|
|
- tasksScheduledMap.forEach((task, future) -> {
|
|
|
+ @PreDestroy
|
|
|
+ public void cancelAll() {
|
|
|
+ tasksSchedulingMap.forEach((task, future) -> {
|
|
|
if (future.isDone()) {
|
|
|
- handleTaskIsDone(task);
|
|
|
+ log.debug("退出时任务 {} 已完成...", task);
|
|
|
+ } else {
|
|
|
+ log.warn("退出时任务 {} 尚未完成...", task);
|
|
|
+ future.cancel(true);
|
|
|
}
|
|
|
});
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 处理已完成任务
|
|
|
+ * 检查任务
|
|
|
*/
|
|
|
- private void handleTaskIsDone(ScheduledTask task) {
|
|
|
- if (!ScheduledStatus.FAILED.equals(task.getStatus())) {
|
|
|
- // 如果执行出现异常,会修改状态,所以如果状态未改变则认为执行未出现异常
|
|
|
- task.setStatus(ScheduledStatus.DONE);
|
|
|
+ public void checkRunnableTask(RunnableTask task) {
|
|
|
+ try {
|
|
|
+ Object bean = applicationContext.getBean(task.getBeanName());
|
|
|
+ if (bean == null) {
|
|
|
+ throw new RuntimeException(String.format("%s 不存在...", task.getBeanName()));
|
|
|
+ }
|
|
|
+ if (!StringUtils.hasLength(task.getMethodParameter())) {
|
|
|
+ bean.getClass().getMethod(task.getMethodName());
|
|
|
+ } else {
|
|
|
+ bean.getClass().getMethod(task.getMethodName(), String.class);
|
|
|
+ }
|
|
|
+ } catch (NoSuchMethodException e) {
|
|
|
+ throw new RuntimeException(String.format("未找到 %s.%s() ...", task.getBeanName(), task.getMethodName()));
|
|
|
+ } catch (SecurityException e) {
|
|
|
+ throw new RuntimeException(
|
|
|
+ String.format("%s.%s() 出错: %s", task.getBeanName(), task.getMethodName(), e.getMessage()));
|
|
|
}
|
|
|
- tasksScheduledMap.remove(task);
|
|
|
- scheduledTaskMapper.updateById(task);
|
|
|
- log.debug("任务 {}=={} 已完成...", task.getName(), task.getDescription());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 输入时间为空或者早于当前时间
|
|
|
- */
|
|
|
- private boolean instantNullOrBeforeNow(Instant instant) {
|
|
|
- return instant == null || instant.isBefore(Instant.now());
|
|
|
}
|
|
|
}
|