|
@@ -10,10 +10,12 @@ import com.google.gson.Gson;
|
|
import com.nokia.dingtalk_api.common.R;
|
|
import com.nokia.dingtalk_api.common.R;
|
|
import com.nokia.dingtalk_api.common.exception.BizException;
|
|
import com.nokia.dingtalk_api.common.exception.BizException;
|
|
import com.nokia.dingtalk_api.common.exception.MyRuntimeException;
|
|
import com.nokia.dingtalk_api.common.exception.MyRuntimeException;
|
|
-import com.nokia.dingtalk_api.config.AppConfig;
|
|
|
|
|
|
+import com.nokia.dingtalk_api.dao.IAlertConfigService;
|
|
import com.nokia.dingtalk_api.dao.IAppTaskLogService;
|
|
import com.nokia.dingtalk_api.dao.IAppTaskLogService;
|
|
import com.nokia.dingtalk_api.dao.IAppTaskService;
|
|
import com.nokia.dingtalk_api.dao.IAppTaskService;
|
|
|
|
+import com.nokia.dingtalk_api.dao.IRetryAppTaskService;
|
|
import com.nokia.dingtalk_api.mapper.AppTaskMapper;
|
|
import com.nokia.dingtalk_api.mapper.AppTaskMapper;
|
|
|
|
+import com.nokia.dingtalk_api.mapper.RetryAppTaskMapper;
|
|
import com.nokia.dingtalk_api.pojos.bo.AppTaskBo;
|
|
import com.nokia.dingtalk_api.pojos.bo.AppTaskBo;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.AddAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.AddAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.DeleteAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.DeleteAppTaskDto;
|
|
@@ -24,18 +26,23 @@ import com.nokia.dingtalk_api.pojos.dto.cms.RunAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.StopAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.StopAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.UpdateAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.dto.cms.UpdateAppTaskDto;
|
|
import com.nokia.dingtalk_api.pojos.enums.AppTaskStatusEnum;
|
|
import com.nokia.dingtalk_api.pojos.enums.AppTaskStatusEnum;
|
|
|
|
+import com.nokia.dingtalk_api.pojos.enums.FileMethodEnum;
|
|
import com.nokia.dingtalk_api.pojos.enums.SortEnum;
|
|
import com.nokia.dingtalk_api.pojos.enums.SortEnum;
|
|
|
|
+import com.nokia.dingtalk_api.pojos.enums.SubdirectoryMethodEnum;
|
|
|
|
+import com.nokia.dingtalk_api.pojos.enums.TaskAlertTypeEnum;
|
|
|
|
+import com.nokia.dingtalk_api.pojos.po.AlertConfigPo;
|
|
import com.nokia.dingtalk_api.pojos.po.AppTaskLogPo;
|
|
import com.nokia.dingtalk_api.pojos.po.AppTaskLogPo;
|
|
import com.nokia.dingtalk_api.pojos.po.AppTaskPo;
|
|
import com.nokia.dingtalk_api.pojos.po.AppTaskPo;
|
|
|
|
+import com.nokia.dingtalk_api.pojos.po.RetryAppTaskPo;
|
|
import com.nokia.dingtalk_api.pojos.vo.PageVo;
|
|
import com.nokia.dingtalk_api.pojos.vo.PageVo;
|
|
import com.nokia.dingtalk_api.pojos.vo.cms.ListAppTaskVo;
|
|
import com.nokia.dingtalk_api.pojos.vo.cms.ListAppTaskVo;
|
|
import com.nokia.dingtalk_api.util.DingTalkApiUtil;
|
|
import com.nokia.dingtalk_api.util.DingTalkApiUtil;
|
|
import com.taobao.api.FileItem;
|
|
import com.taobao.api.FileItem;
|
|
import jakarta.annotation.PostConstruct;
|
|
import jakarta.annotation.PostConstruct;
|
|
-import lombok.Data;
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.RequiredArgsConstructor;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import org.apache.sshd.sftp.client.SftpClient;
|
|
import org.apache.sshd.sftp.client.SftpClient;
|
|
|
|
+import org.slf4j.MDC;
|
|
import org.springframework.integration.file.remote.SessionCallbackWithoutResult;
|
|
import org.springframework.integration.file.remote.SessionCallbackWithoutResult;
|
|
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
|
|
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
|
|
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
|
|
import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
|
|
@@ -48,8 +55,10 @@ import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
-import java.nio.file.attribute.AclEntry;
|
|
|
|
-import java.nio.file.attribute.FileTime;
|
|
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
+import java.time.Instant;
|
|
|
|
+import java.time.LocalDate;
|
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.Comparator;
|
|
import java.util.Comparator;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
@@ -63,6 +72,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ConcurrentMap;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.ScheduledFuture;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Matcher;
|
|
import java.util.regex.Pattern;
|
|
import java.util.regex.Pattern;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
@@ -79,25 +89,56 @@ public class AppTaskService {
|
|
* 英文逗号分割的userId列表
|
|
* 英文逗号分割的userId列表
|
|
*/
|
|
*/
|
|
static final Pattern USER_ID_LIST_PATTERN = Pattern.compile("\\w+(,\\w+)*");
|
|
static final Pattern USER_ID_LIST_PATTERN = Pattern.compile("\\w+(,\\w+)*");
|
|
|
|
+ /**
|
|
|
|
+ * 运行中的定时任务
|
|
|
|
+ */
|
|
static final ConcurrentMap<String, ScheduledFuture<?>> TASKS = new ConcurrentHashMap<>();
|
|
static final ConcurrentMap<String, ScheduledFuture<?>> TASKS = new ConcurrentHashMap<>();
|
|
|
|
+ /**
|
|
|
|
+ * 重试定时任务
|
|
|
|
+ */
|
|
|
|
+ static final ConcurrentMap<String, ScheduledFuture<?>> RETRY_TASKS = new ConcurrentHashMap<>();
|
|
private final ThreadPoolTaskScheduler taskScheduler;
|
|
private final ThreadPoolTaskScheduler taskScheduler;
|
|
private final AppTaskMapper appTaskMapper;
|
|
private final AppTaskMapper appTaskMapper;
|
|
private final IAppTaskLogService iAppTaskLogService;
|
|
private final IAppTaskLogService iAppTaskLogService;
|
|
private final OpenService openService;
|
|
private final OpenService openService;
|
|
private final AesService aesService;
|
|
private final AesService aesService;
|
|
- private final AppConfig appConfig;
|
|
|
|
private final IAppTaskService iAppTaskService;
|
|
private final IAppTaskService iAppTaskService;
|
|
|
|
+ private final IAlertConfigService iAlertConfigService;
|
|
|
|
+ private final IRetryAppTaskService iRetryAppTaskService;
|
|
|
|
+ private final RetryAppTaskMapper retryAppTaskMapper;
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 应用定时任务初始化
|
|
|
|
+ */
|
|
|
|
+ @PostConstruct
|
|
|
|
+ public void init() {
|
|
|
|
+ List<AppTaskBo> l1 = appTaskMapper.getRunningAppTasks(AppTaskStatusEnum.RUNNING.value);
|
|
|
|
+ l1.forEach(this::addTask);
|
|
|
|
+ List<AppTaskBo> l2 = retryAppTaskMapper.getRetryAppTasks(AppTaskStatusEnum.RUNNING.value);
|
|
|
|
+ l2.forEach(t -> {
|
|
|
|
+ ScheduledFuture<?> future = taskScheduler.schedule(() -> runTask(t), t.getRetryTime());
|
|
|
|
+ RETRY_TASKS.put(t.getTaskId(), future);
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* 删除任务
|
|
* 删除任务
|
|
* @param id 任务id
|
|
* @param id 任务id
|
|
*/
|
|
*/
|
|
public void removeTask(String id) {
|
|
public void removeTask(String id) {
|
|
- ScheduledFuture<?> future = TASKS.get(id);
|
|
|
|
- if (future != null && !future.isCancelled()) {
|
|
|
|
- future.cancel(false);
|
|
|
|
|
|
+ // 删除定时任务
|
|
|
|
+ ScheduledFuture<?> f1 = TASKS.get(id);
|
|
|
|
+ if (f1 != null && !f1.isCancelled()) {
|
|
|
|
+ f1.cancel(false);
|
|
}
|
|
}
|
|
TASKS.remove(id);
|
|
TASKS.remove(id);
|
|
|
|
+ // 删除重试任务
|
|
|
|
+ ScheduledFuture<?> f2 = RETRY_TASKS.get(id);
|
|
|
|
+ if (f2 != null && !f2.isCancelled()) {
|
|
|
|
+ f2.cancel(false);
|
|
|
|
+ }
|
|
|
|
+ iRetryAppTaskService.removeById(id);
|
|
|
|
+ RETRY_TASKS.remove(id);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -117,10 +158,14 @@ public class AppTaskService {
|
|
*/
|
|
*/
|
|
public void runTask(AppTaskBo t) {
|
|
public void runTask(AppTaskBo t) {
|
|
AppTaskLogPo appTaskLogPo = new AppTaskLogPo(t);
|
|
AppTaskLogPo appTaskLogPo = new AppTaskLogPo(t);
|
|
|
|
+ MDC.put("traceId", t.getTaskId());
|
|
|
|
+ Set<String> fileList = new HashSet<>();
|
|
|
|
+ Set<String> processQueryKeys = new HashSet<>();
|
|
Gson gson = new Gson();
|
|
Gson gson = new Gson();
|
|
try {
|
|
try {
|
|
- CompletableFuture.runAsync(() -> {
|
|
|
|
- log.info("开始执行应用任务: {}", appTaskLogPo);
|
|
|
|
|
|
+ Runnable runnable = () -> {
|
|
|
|
+ MDC.put("traceId", t.getTaskId());
|
|
|
|
+ log.info("开始执行应用任务: {}", t);
|
|
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
|
|
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
|
|
factory.setHost(t.getHost());
|
|
factory.setHost(t.getHost());
|
|
factory.setPort(t.getPort());
|
|
factory.setPort(t.getPort());
|
|
@@ -128,104 +173,325 @@ public class AppTaskService {
|
|
factory.setPassword(aesService.decrypt(t.getPassword()));
|
|
factory.setPassword(aesService.decrypt(t.getPassword()));
|
|
factory.setAllowUnknownKeys(true);
|
|
factory.setAllowUnknownKeys(true);
|
|
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
|
|
SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(factory);
|
|
- // 查找数据目录匹配的最新文件
|
|
|
|
- Pattern pattern = Pattern.compile(t.getFilePattern());
|
|
|
|
- Optional<DirEntryBo> dirEntryBoOptional = Arrays.stream(template.list(t.getDataDir()))
|
|
|
|
- .filter(tt -> pattern.matcher(tt.getFilename()).find())
|
|
|
|
- .map(DirEntryBo::new).max(Comparator.comparing(DirEntryBo::getModifyTime));
|
|
|
|
- DirEntryBo dirEntryBo = dirEntryBoOptional.orElseThrow(() -> new BizException("没有找到匹配的文件"));
|
|
|
|
- String filename = dirEntryBo.getFilename();
|
|
|
|
- // 读取文件
|
|
|
|
- String filePath = t.getDataDir() + (t.getDataDir().endsWith("/") ? "" : "/") + filename;
|
|
|
|
- appTaskLogPo.setFilePath(filePath);
|
|
|
|
- byte[] bytes;
|
|
|
|
- try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
|
|
|
- template.execute((SessionCallbackWithoutResult<SftpClient.DirEntry>) session
|
|
|
|
- -> session.read(filePath, os));
|
|
|
|
- bytes = os.toByteArray();
|
|
|
|
- } catch (IOException e) {
|
|
|
|
- throw new MyRuntimeException(e);
|
|
|
|
- }
|
|
|
|
- // 检查文件大小
|
|
|
|
- OpenService.checkFileSize(bytes.length);
|
|
|
|
- String fileExtension = OpenService.getFileExtension(filename);
|
|
|
|
- String type = OpenService.getUploadType(fileExtension);
|
|
|
|
- FileItem fileItem = new FileItem(filename, bytes);
|
|
|
|
- String robotCode = t.getRobotCode();
|
|
|
|
- String openConversationId = t.getOpenConversationId();
|
|
|
|
- String accessToken = openService.getAccessToken(robotCode, t.getRobotSecret());
|
|
|
|
- // 上传文件
|
|
|
|
- OapiMediaUploadResponse uploadResponse = DingTalkApiUtil.upload(type, fileItem, accessToken);
|
|
|
|
- String mediaId = uploadResponse.getMediaId();
|
|
|
|
- // 发送单聊消息
|
|
|
|
- if (t.getConversationType().equals(1)) {
|
|
|
|
- Set<String> phones = new HashSet<>();
|
|
|
|
- if (StringUtils.hasText(t.getPhones())) {
|
|
|
|
- phones = Arrays.stream(t.getPhones().split(",")).collect(Collectors.toSet());
|
|
|
|
- }
|
|
|
|
- Set<String> userIds = new HashSet<>();
|
|
|
|
- if (StringUtils.hasText(t.getUserIds())) {
|
|
|
|
- userIds = Arrays.stream(t.getUserIds().split(",")).collect(Collectors.toSet());
|
|
|
|
- }
|
|
|
|
- openService.checkPhonesUserIds(phones, userIds);
|
|
|
|
- Map<String, String> failPhones = new HashMap<>();
|
|
|
|
- List<String> userIdList = openService.getUserIds(userIds, phones, accessToken, failPhones);
|
|
|
|
- // 发送图片
|
|
|
|
- if ("image".equals(type)) {
|
|
|
|
- BatchSendOTOResponse response = DingTalkApiUtil.batchSendOtoSampleImageMsg(mediaId,
|
|
|
|
- accessToken, robotCode, userIdList);
|
|
|
|
- appTaskLogPo.setDetail(gson.toJson(response));
|
|
|
|
- } else {
|
|
|
|
- // 发送文件
|
|
|
|
- BatchSendOTOResponse response = DingTalkApiUtil.batchSendOtoSampleFile(mediaId, filename,
|
|
|
|
- fileExtension, accessToken, robotCode, userIdList);
|
|
|
|
- appTaskLogPo.setDetail(gson.toJson(response));
|
|
|
|
- }
|
|
|
|
|
|
+ // 没有子文件夹
|
|
|
|
+ if (t.getHasSubdirectory().equals(0)) {
|
|
|
|
+ findFile(t, template, t.getMasterDir(), fileList, processQueryKeys);
|
|
} else {
|
|
} else {
|
|
- // 发送群聊消息
|
|
|
|
- if ("image".equals(type)) {
|
|
|
|
- // 发送图片
|
|
|
|
- OrgGroupSendResponse response = DingTalkApiUtil.groupSendSampleImageMsg(mediaId,
|
|
|
|
- accessToken, openConversationId, robotCode);
|
|
|
|
- appTaskLogPo.setDetail(gson.toJson(response));
|
|
|
|
- } else {
|
|
|
|
- // 发送文件
|
|
|
|
- OrgGroupSendResponse response = DingTalkApiUtil.groupSendSampleFile(mediaId, filename,
|
|
|
|
- fileExtension, accessToken, openConversationId, robotCode);
|
|
|
|
- appTaskLogPo.setDetail(gson.toJson(response));
|
|
|
|
|
|
+ if (!template.exists(t.getMasterDir())) {
|
|
|
|
+ throw new BizException("远端服务器找不到路径" + t.getMasterDir());
|
|
|
|
+ }
|
|
|
|
+ SftpClient.DirEntry[] dirEntries = template.list(t.getMasterDir());
|
|
|
|
+ log.debug("dirEntries: {}", Arrays.stream(dirEntries)
|
|
|
|
+ .map(SftpClient.DirEntry::getFilename).sorted().toList());
|
|
|
|
+ // 正则表达式匹配最新子文件夹
|
|
|
|
+ if (SubdirectoryMethodEnum.REG_EXP.name().equals(t.getSubdirectoryMethod())) {
|
|
|
|
+ Pattern pattern = Pattern.compile(t.getSubdirectoryPattern());
|
|
|
|
+ Optional<SftpClient.DirEntry> dirEntryOptional = Arrays.stream(dirEntries)
|
|
|
|
+ .filter(tt -> !".".equals(tt.getFilename()) && !"..".equals(tt.getFilename())
|
|
|
|
+ && tt.getAttributes().isDirectory() && pattern.matcher(tt.getFilename()).find())
|
|
|
|
+ .max(Comparator.comparing(tt -> tt.getAttributes().getModifyTime()));
|
|
|
|
+ SftpClient.DirEntry dirEntry = dirEntryOptional.orElseThrow(() ->
|
|
|
|
+ new BizException("远端服务器没有找到匹配的子文件夹"));
|
|
|
|
+ String dirPath = t.getMasterDir() + (t.getMasterDir().endsWith("/") ? "" : "/")
|
|
|
|
+ + dirEntry.getFilename();
|
|
|
|
+ findFile(t, template, dirPath, fileList, processQueryKeys);
|
|
|
|
+ } else if (SubdirectoryMethodEnum.MONTH.name().equals(t.getSubdirectoryMethod())) {
|
|
|
|
+ // 按月匹配文件夹
|
|
|
|
+ LocalDate localDate = LocalDate.now().plusMonths(t.getDirTimeDelay());
|
|
|
|
+ String month = localDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
|
|
|
|
+ String dirPath = t.getMasterDir() + (t.getMasterDir().endsWith("/") ? "" : "/") + month;
|
|
|
|
+ findFile(t, template, dirPath, fileList, processQueryKeys);
|
|
|
|
+ } else if (SubdirectoryMethodEnum.DAY.name().equals(t.getSubdirectoryMethod())) {
|
|
|
|
+ // 按天匹配文件夹
|
|
|
|
+ LocalDate localDate = LocalDate.now().plusDays(t.getDirTimeDelay());
|
|
|
|
+ String month = localDate.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
|
|
|
+ String dirPath = t.getMasterDir() + (t.getMasterDir().endsWith("/") ? "" : "/") + month;
|
|
|
|
+ findFile(t, template, dirPath, fileList, processQueryKeys);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- log.info("结束应用任务: {}", appTaskLogPo);
|
|
|
|
- }).get(appConfig.getTaskTimeout(), TimeUnit.MINUTES);
|
|
|
|
|
|
+ log.info("结束应用任务: {}", t);
|
|
|
|
+ String content = t.getAppName() + " " + t.getTaskName() + " 执行成功" + " : "
|
|
|
|
+ + gson.toJson(processQueryKeys);
|
|
|
|
+ sendMessage(t, content);
|
|
|
|
+ };
|
|
|
|
+ if (t.getTaskTimeout() > 0) {
|
|
|
|
+ CompletableFuture.runAsync(runnable).get(t.getTaskTimeout(), TimeUnit.SECONDS);
|
|
|
|
+ } else {
|
|
|
|
+ CompletableFuture.runAsync(runnable).join();
|
|
|
|
+ }
|
|
} catch (InterruptedException e) {
|
|
} catch (InterruptedException e) {
|
|
appTaskLogPo.setStatus(0);
|
|
appTaskLogPo.setStatus(0);
|
|
appTaskLogPo.setDetail(e.toString());
|
|
appTaskLogPo.setDetail(e.toString());
|
|
- log.error("线程中断: {}, {}", appTaskLogPo, e, e);
|
|
|
|
|
|
+ log.error("线程中断: {}, {}", t, e, e);
|
|
Thread.currentThread().interrupt();
|
|
Thread.currentThread().interrupt();
|
|
|
|
+ retry(t);
|
|
|
|
+ alert(t, "任务意外中断");
|
|
|
|
+ } catch (TimeoutException e) {
|
|
|
|
+ appTaskLogPo.setStatus(0);
|
|
|
|
+ appTaskLogPo.setDetail(e.toString());
|
|
|
|
+ log.error("应用任务执行超时: {}, {}", t, e, e);
|
|
|
|
+ retry(t);
|
|
|
|
+ alert(t, "任务执行超时");
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
appTaskLogPo.setStatus(0);
|
|
appTaskLogPo.setStatus(0);
|
|
appTaskLogPo.setDetail(e.toString());
|
|
appTaskLogPo.setDetail(e.toString());
|
|
- log.error("应用任务执行失败: {}, {}", appTaskLogPo, e, e);
|
|
|
|
|
|
+ log.error("应用任务执行失败: {}, {}", t, e, e);
|
|
|
|
+ retry(t);
|
|
|
|
+ Throwable rootCause = e;
|
|
|
|
+ while (rootCause.getCause() != null && rootCause.getCause() != rootCause) {
|
|
|
|
+ rootCause = rootCause.getCause();
|
|
|
|
+ }
|
|
|
|
+ alert(t, rootCause.getMessage());
|
|
} finally {
|
|
} finally {
|
|
// 记录日志
|
|
// 记录日志
|
|
|
|
+ appTaskLogPo.setFiles(gson.toJson(fileList));
|
|
|
|
+ appTaskLogPo.setProcessQueryKeys(gson.toJson(processQueryKeys));
|
|
iAppTaskLogService.save(appTaskLogPo);
|
|
iAppTaskLogService.save(appTaskLogPo);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * 应用定时任务初始化
|
|
|
|
|
|
+ * 重试
|
|
|
|
+ * @param t 任务信息
|
|
*/
|
|
*/
|
|
- @PostConstruct
|
|
|
|
- public void init() {
|
|
|
|
- List<AppTaskBo> list = appTaskMapper.getRunningAppTasks(AppTaskStatusEnum.RUNNING.value);
|
|
|
|
- list.forEach(this::addTask);
|
|
|
|
|
|
+ public void retry(AppTaskBo t) {
|
|
|
|
+ try {
|
|
|
|
+ if (t.getRetryTimes() < t.getMaxRetryTimes()) {
|
|
|
|
+ Instant instant = Instant.now().plusSeconds(t.getRetryInterval());
|
|
|
|
+ RetryAppTaskPo retryAppTaskPo = new RetryAppTaskPo();
|
|
|
|
+ retryAppTaskPo.setRetryTime(instant);
|
|
|
|
+ retryAppTaskPo.setTaskId(t.getTaskId());
|
|
|
|
+ retryAppTaskPo.setRetryTimes(t.getRetryTimes());
|
|
|
|
+ iRetryAppTaskService.saveOrUpdate(retryAppTaskPo);
|
|
|
|
+ t.setRetryTimes(t.getRetryTimes() + 1);
|
|
|
|
+ ScheduledFuture<?> future = taskScheduler.schedule(() -> runTask(t), instant);
|
|
|
|
+ RETRY_TASKS.put(t.getTaskId(), future);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ log.error(e.toString(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 告警
|
|
|
|
+ *
|
|
|
|
+ * @param t 任务信息
|
|
|
|
+ * @param message 错误消息
|
|
|
|
+ */
|
|
|
|
+ public void alert(AppTaskBo t, String message) {
|
|
|
|
+ String content = t.getAppName() + " " + t.getTaskName() + " 执行异常" + " : " + message;
|
|
|
|
+ sendMessage(t, content);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 发送消息
|
|
|
|
+ * @param t 任务信息
|
|
|
|
+ * @param content 内容
|
|
|
|
+ */
|
|
|
|
+ public void sendMessage(AppTaskBo t, String content) {
|
|
|
|
+ String robotCode = t.getRobotCode();
|
|
|
|
+ String robotSecret = t.getRobotSecret();
|
|
|
|
+ String openConversationId = t.getOpenConversationId();
|
|
|
|
+ String phones = t.getPhones();
|
|
|
|
+ String userIds = t.getUserIds();
|
|
|
|
+ if (TaskAlertTypeEnum.DEFAULT.name().equals(t.getAlertType())) {
|
|
|
|
+ AlertConfigPo alertConfigPo = iAlertConfigService.getById(1);
|
|
|
|
+ if (alertConfigPo == null) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ robotCode = alertConfigPo.getRobotCode();
|
|
|
|
+ robotSecret = alertConfigPo.getRobotSecret();
|
|
|
|
+ openConversationId = alertConfigPo.getOpenConversationId();
|
|
|
|
+ phones = alertConfigPo.getPhones();
|
|
|
|
+ userIds = alertConfigPo.getUserIds();
|
|
|
|
+ } else if (TaskAlertTypeEnum.CUSTOM.name().equals(t.getAlertType())) {
|
|
|
|
+ robotCode = t.getAlertRobotCode();
|
|
|
|
+ robotSecret = t.getAlertRobotSecret();
|
|
|
|
+ openConversationId = t.getAlertOpenConversationId();
|
|
|
|
+ phones = t.getAlertPhones();
|
|
|
|
+ userIds = t.getAlertUserIds();
|
|
|
|
+ }
|
|
|
|
+ if (!StringUtils.hasText(robotCode)
|
|
|
|
+ || !StringUtils.hasText(robotSecret)
|
|
|
|
+ || (!StringUtils.hasText(openConversationId)
|
|
|
|
+ && !StringUtils.hasText(phones)
|
|
|
|
+ && !StringUtils.hasText(userIds))) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ String accessToken = openService.getAccessToken(robotCode, robotSecret);
|
|
|
|
+ if (StringUtils.hasText(openConversationId)) {
|
|
|
|
+ DingTalkApiUtil.groupSendSampleText(content, accessToken, openConversationId,
|
|
|
|
+ robotCode);
|
|
|
|
+ }
|
|
|
|
+ if (StringUtils.hasText(phones) || StringUtils.hasText(userIds)) {
|
|
|
|
+ List<String> userIdList = getUserIdList(phones, userIds,
|
|
|
|
+ accessToken);
|
|
|
|
+ DingTalkApiUtil.batchSendOtoSampleText(content, accessToken, robotCode, userIdList);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 匹配文件
|
|
|
|
+ *
|
|
|
|
+ * @param t 任务信息
|
|
|
|
+ * @param template sftp客户端
|
|
|
|
+ * @param dirPath 文件夹
|
|
|
|
+ * @param fileList 文件列表
|
|
|
|
+ * @param processQueryKeys 消息id列表
|
|
|
|
+ */
|
|
|
|
+ public void findFile(AppTaskBo t, SftpRemoteFileTemplate template, String dirPath, Set<String> fileList,
|
|
|
|
+ Set<String> processQueryKeys) {
|
|
|
|
+ if (!template.exists(dirPath)) {
|
|
|
|
+ throw new BizException("远端服务器找不到路径" + dirPath);
|
|
|
|
+ }
|
|
|
|
+ SftpClient.DirEntry[] dirEntries = template.list(dirPath);
|
|
|
|
+ log.debug("dirEntries: {}", Arrays.stream(dirEntries).map(SftpClient.DirEntry::getFilename).sorted().toList());
|
|
|
|
+ // 文件夹下所有文件
|
|
|
|
+ if (FileMethodEnum.ALL.name().equals(t.getFileMethod())) {
|
|
|
|
+ List<SftpClient.DirEntry> dirEntryList = Arrays.stream(dirEntries)
|
|
|
|
+ .filter(tt -> !".".equals(tt.getFilename()) && !"..".equals(tt.getFilename())
|
|
|
|
+ && tt.getAttributes().isRegularFile()).toList();
|
|
|
|
+ if (CollectionUtils.isEmpty(dirEntryList)) {
|
|
|
|
+ throw new BizException("远端服务器文件夹" + dirPath + "为空");
|
|
|
|
+ }
|
|
|
|
+ dirEntryList.forEach(tt -> sendFile(t, tt.getFilename(), dirPath, template, fileList, processQueryKeys));
|
|
|
|
+ } else if (FileMethodEnum.REG_EXP.name().equals(t.getFileMethod())) {
|
|
|
|
+ // 正则表达式匹配最新文件
|
|
|
|
+ Pattern pattern = Pattern.compile(t.getFilePattern());
|
|
|
|
+ Optional<SftpClient.DirEntry> dirEntryOptional = Arrays.stream(dirEntries)
|
|
|
|
+ .filter(tt -> !".".equals(tt.getFilename()) && !"..".equals(tt.getFilename())
|
|
|
|
+ && tt.getAttributes().isRegularFile() && pattern.matcher(tt.getFilename()).find())
|
|
|
|
+ .max(Comparator.comparing(tt -> tt.getAttributes().getModifyTime()));
|
|
|
|
+ SftpClient.DirEntry dirEntry = dirEntryOptional.orElseThrow(() -> new BizException("远端服务器文件夹"
|
|
|
|
+ + dirPath + "下没有找到匹配的文件"));
|
|
|
|
+ sendFile(t, dirEntry.getFilename(), dirPath, template, fileList, processQueryKeys);
|
|
|
|
+ } else if (FileMethodEnum.MONTH.name().equals(t.getFileMethod())) {
|
|
|
|
+ // 按月匹配文件
|
|
|
|
+ LocalDate localDate = LocalDate.now().plusMonths(t.getFileTimeDelay());
|
|
|
|
+ String month = localDate.format(DateTimeFormatter.ofPattern("yyyyMM"));
|
|
|
|
+ String filename = t.getFilePrefix() + month + "." + t.getFileExtension();
|
|
|
|
+ sendFile(t, filename, dirPath, template, fileList, processQueryKeys);
|
|
|
|
+ } else if (FileMethodEnum.DAY.name().equals(t.getFileMethod())) {
|
|
|
|
+ // 按日匹配文件
|
|
|
|
+ LocalDate localDate = LocalDate.now().plusDays(t.getFileTimeDelay());
|
|
|
|
+ String day = localDate.format(DateTimeFormatter.ofPattern("yyyyMMdd"));
|
|
|
|
+ String filename = t.getFilePrefix() + day + "." + t.getFileExtension();
|
|
|
|
+ sendFile(t, filename, dirPath, template, fileList, processQueryKeys);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 发送文件
|
|
|
|
+ *
|
|
|
|
+ * @param t 任务信息
|
|
|
|
+ * @param filename 文件名
|
|
|
|
+ * @param dirPath 文件夹
|
|
|
|
+ * @param template sftp客户端
|
|
|
|
+ * @param fileList 文件列表
|
|
|
|
+ * @param processQueryKeys 消息id列表
|
|
|
|
+ */
|
|
|
|
+ public void sendFile(AppTaskBo t, String filename, String dirPath, SftpRemoteFileTemplate template,
|
|
|
|
+ Set<String> fileList, Set<String> processQueryKeys) {
|
|
|
|
+ String filePath = dirPath + (dirPath.endsWith("/") ? "" : "/") + filename;
|
|
|
|
+ fileList.add(filePath);
|
|
|
|
+ if (!template.exists(filePath)) {
|
|
|
|
+ throw new BizException("远端服务器找不到文件" + filePath);
|
|
|
|
+ }
|
|
|
|
+ // 检查文件大小
|
|
|
|
+ long size = template.list(filePath)[0].getAttributes().getSize();
|
|
|
|
+ OpenService.checkFileSize(size);
|
|
|
|
+ byte[] bytes;
|
|
|
|
+ try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
|
|
|
|
+ template.execute((SessionCallbackWithoutResult<SftpClient.DirEntry>) session
|
|
|
|
+ -> session.read(filePath, os));
|
|
|
|
+ bytes = os.toByteArray();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ throw new MyRuntimeException(e);
|
|
|
|
+ }
|
|
|
|
+ String fileExtension = OpenService.getFileExtension(filename);
|
|
|
|
+ String type = OpenService.getUploadType(fileExtension);
|
|
|
|
+ String robotCode = t.getRobotCode();
|
|
|
|
+ String openConversationId = t.getOpenConversationId();
|
|
|
|
+ String accessToken = openService.getAccessToken(robotCode, t.getRobotSecret());
|
|
|
|
+ // 发送markdown消息
|
|
|
|
+ if (t.getFileToText().equals(1) && ("txt".equalsIgnoreCase(fileExtension) || "md".equalsIgnoreCase(fileExtension))) {
|
|
|
|
+ String content = new String(bytes, StandardCharsets.UTF_8);
|
|
|
|
+ // 发送单聊消息
|
|
|
|
+ if (t.getConversationType().equals(1)) {
|
|
|
|
+ List<String> userIdList = getUserIdList(t.getPhones(), t.getUserIds(), accessToken);
|
|
|
|
+ BatchSendOTOResponse r = DingTalkApiUtil.batchSendOtoSampleMarkdown(
|
|
|
|
+ org.apache.commons.lang3.StringUtils.substring(content, 0, 30), content,
|
|
|
|
+ accessToken, robotCode, userIdList);
|
|
|
|
+ processQueryKeys.add(r.body.processQueryKey);
|
|
|
|
+ } else {
|
|
|
|
+ // 发送群聊消息
|
|
|
|
+ OrgGroupSendResponse r = DingTalkApiUtil.groupSendSampleMarkdown(
|
|
|
|
+ org.apache.commons.lang3.StringUtils.substring(content, 0, 30), content,
|
|
|
|
+ accessToken, openConversationId, robotCode);
|
|
|
|
+ processQueryKeys.add(r.body.processQueryKey);
|
|
|
|
+ }
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // 上传文件
|
|
|
|
+ FileItem fileItem = new FileItem(filename, bytes);
|
|
|
|
+ OapiMediaUploadResponse uploadResponse = DingTalkApiUtil.upload(type, fileItem, accessToken);
|
|
|
|
+ String mediaId = uploadResponse.getMediaId();
|
|
|
|
+ // 发送单聊消息
|
|
|
|
+ if (t.getConversationType().equals(1)) {
|
|
|
|
+ List<String> userIdList = getUserIdList(t.getPhones(), t.getUserIds(), accessToken);
|
|
|
|
+ if ("image".equals(type)) {
|
|
|
|
+ // 发送图片
|
|
|
|
+ BatchSendOTOResponse r = DingTalkApiUtil.batchSendOtoSampleImageMsg(mediaId, accessToken, robotCode,
|
|
|
|
+ userIdList);
|
|
|
|
+ processQueryKeys.add(r.body.processQueryKey);
|
|
|
|
+ } else {
|
|
|
|
+ // 发送文件
|
|
|
|
+ BatchSendOTOResponse r = DingTalkApiUtil.batchSendOtoSampleFile(mediaId, filename, fileExtension, accessToken, robotCode,
|
|
|
|
+ userIdList);
|
|
|
|
+ processQueryKeys.add(r.body.processQueryKey);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ // 发送群聊消息
|
|
|
|
+ if ("image".equals(type)) {
|
|
|
|
+ // 发送图片
|
|
|
|
+ OrgGroupSendResponse r = DingTalkApiUtil.groupSendSampleImageMsg(mediaId, accessToken,
|
|
|
|
+ openConversationId, robotCode);
|
|
|
|
+ processQueryKeys.add(r.body.processQueryKey);
|
|
|
|
+ } else {
|
|
|
|
+ // 发送文件
|
|
|
|
+ OrgGroupSendResponse r = DingTalkApiUtil.groupSendSampleFile(mediaId, filename, fileExtension,
|
|
|
|
+ accessToken, openConversationId, robotCode);
|
|
|
|
+ processQueryKeys.add(r.body.processQueryKey);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 获取钉钉用户id
|
|
|
|
+ *
|
|
|
|
+ * @param phones 手机号
|
|
|
|
+ * @param userIds 用户id
|
|
|
|
+ * @param accessToken 钉钉接口访问凭证
|
|
|
|
+ */
|
|
|
|
+ public List<String> getUserIdList(String phones, String userIds, String accessToken) {
|
|
|
|
+ Set<String> phonesSet = new HashSet<>();
|
|
|
|
+ if (StringUtils.hasText(phones)) {
|
|
|
|
+ phonesSet = Arrays.stream(phones.split(",")).collect(Collectors.toSet());
|
|
|
|
+ }
|
|
|
|
+ Set<String> userIdsSet = new HashSet<>();
|
|
|
|
+ if (StringUtils.hasText(userIds)) {
|
|
|
|
+ userIdsSet = Arrays.stream(userIds.split(",")).collect(Collectors.toSet());
|
|
|
|
+ }
|
|
|
|
+ openService.checkPhonesUserIds(phonesSet, userIdsSet);
|
|
|
|
+ Map<String, String> failPhones = new HashMap<>();
|
|
|
|
+ return openService.getUserIds(userIdsSet, phonesSet, accessToken, failPhones);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* 检查任务参数
|
|
* 检查任务参数
|
|
* @param dto 任务参数
|
|
* @param dto 任务参数
|
|
*/
|
|
*/
|
|
- private static void checkAppTaskDto(AddAppTaskDto dto) {
|
|
|
|
|
|
+ public static void checkAppTaskDto(AddAppTaskDto dto) {
|
|
if (dto.getConversationType().equals(1)) {
|
|
if (dto.getConversationType().equals(1)) {
|
|
if (!StringUtils.hasText(dto.getPhones()) && !StringUtils.hasText(dto.getUserIds())) {
|
|
if (!StringUtils.hasText(dto.getPhones()) && !StringUtils.hasText(dto.getUserIds())) {
|
|
throw new BizException("phones或userIds必须有一个不能为空");
|
|
throw new BizException("phones或userIds必须有一个不能为空");
|
|
@@ -247,14 +513,79 @@ public class AppTaskService {
|
|
dto.setUserIds("");
|
|
dto.setUserIds("");
|
|
}
|
|
}
|
|
dto.setOpenConversationId("");
|
|
dto.setOpenConversationId("");
|
|
- }
|
|
|
|
- if (dto.getConversationType().equals(2)) {
|
|
|
|
|
|
+ } else {
|
|
if (!StringUtils.hasText(dto.getOpenConversationId())) {
|
|
if (!StringUtils.hasText(dto.getOpenConversationId())) {
|
|
throw new BizException("openConversationId不能为空");
|
|
throw new BizException("openConversationId不能为空");
|
|
}
|
|
}
|
|
dto.setPhones("");
|
|
dto.setPhones("");
|
|
dto.setUserIds("");
|
|
dto.setUserIds("");
|
|
}
|
|
}
|
|
|
|
+ if (dto.getHasSubdirectory().equals(1)) {
|
|
|
|
+ if (dto.getSubdirectoryMethod() == null) {
|
|
|
|
+ throw new BizException("subdirectoryMethod不能为空");
|
|
|
|
+ }
|
|
|
|
+ if (SubdirectoryMethodEnum.REG_EXP.equals(dto.getSubdirectoryMethod())) {
|
|
|
|
+ if (!StringUtils.hasText(dto.getSubdirectoryPattern())) {
|
|
|
|
+ throw new BizException("subdirectoryPattern不能为空");
|
|
|
|
+ }
|
|
|
|
+ dto.setDirTimeDelay(0);
|
|
|
|
+ } else {
|
|
|
|
+ dto.setSubdirectoryPattern("");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ dto.setSubdirectoryMethod(null);
|
|
|
|
+ dto.setSubdirectoryPattern("");
|
|
|
|
+ dto.setDirTimeDelay(0);
|
|
|
|
+ }
|
|
|
|
+ if (FileMethodEnum.DAY.equals(dto.getFileMethod()) || FileMethodEnum.MONTH.equals(dto.getFileMethod())) {
|
|
|
|
+ if (dto.getFileTimeDelay() == null) {
|
|
|
|
+ throw new BizException("fileMinusTime不能为空");
|
|
|
|
+ }
|
|
|
|
+ if (!StringUtils.hasText(dto.getFilePrefix())) {
|
|
|
|
+ throw new BizException("filePrefix不能为空");
|
|
|
|
+ }
|
|
|
|
+ if (!StringUtils.hasText(dto.getFileExtension())) {
|
|
|
|
+ throw new BizException("fileExtension不能为空");
|
|
|
|
+ }
|
|
|
|
+ dto.setFilePattern("");
|
|
|
|
+ }
|
|
|
|
+ if (FileMethodEnum.REG_EXP.equals(dto.getFileMethod())) {
|
|
|
|
+ if (!StringUtils.hasText(dto.getFilePattern())) {
|
|
|
|
+ throw new BizException("filePattern不能为空");
|
|
|
|
+ }
|
|
|
|
+ dto.setFilePrefix("");
|
|
|
|
+ dto.setFileExtension("");
|
|
|
|
+ dto.setFileTimeDelay(0);
|
|
|
|
+ }
|
|
|
|
+ if (TaskAlertTypeEnum.CUSTOM.equals(dto.getAlertType())) {
|
|
|
|
+ if (StringUtils.hasText(dto.getAlertPhones())) {
|
|
|
|
+ Matcher matcher = PHONE_LIST_PATTERN.matcher(dto.getPhones());
|
|
|
|
+ if (!matcher.matches()) {
|
|
|
|
+ throw new BizException("alertPhones格式不正确");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ dto.setAlertPhones("");
|
|
|
|
+ }
|
|
|
|
+ if (StringUtils.hasText(dto.getAlertUserIds())) {
|
|
|
|
+ Matcher matcher = USER_ID_LIST_PATTERN.matcher(dto.getUserIds());
|
|
|
|
+ if (!matcher.matches()) {
|
|
|
|
+ throw new BizException("alertUserIds格式不正确");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ dto.setAlertUserIds("");
|
|
|
|
+ }
|
|
|
|
+ if (!StringUtils.hasText(dto.getAlertRobotCode())) {
|
|
|
|
+ dto.setAlertRobotCode("");
|
|
|
|
+ }
|
|
|
|
+ if (!StringUtils.hasText(dto.getAlertOpenConversationId())) {
|
|
|
|
+ dto.setAlertOpenConversationId("");
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ dto.setAlertRobotCode("");
|
|
|
|
+ dto.setAlertOpenConversationId("");
|
|
|
|
+ dto.setAlertPhones("");
|
|
|
|
+ dto.setAlertUserIds("");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public R<PageVo<ListAppTaskVo>> listAppTask(ListAppTaskDto dto) {
|
|
public R<PageVo<ListAppTaskVo>> listAppTask(ListAppTaskDto dto) {
|
|
@@ -410,41 +741,4 @@ public class AppTaskService {
|
|
List<AppTaskLogPo> vos = iAppTaskLogService.list(page, wrapper);
|
|
List<AppTaskLogPo> vos = iAppTaskLogService.list(page, wrapper);
|
|
return R.ok(new PageVo<>(page.getTotal(), vos));
|
|
return R.ok(new PageVo<>(page.getTotal(), vos));
|
|
}
|
|
}
|
|
-
|
|
|
|
- @Data
|
|
|
|
- public static class DirEntryBo {
|
|
|
|
- private String filename;
|
|
|
|
- private String longFilename;
|
|
|
|
- private Set<SftpClient.Attribute> flags;
|
|
|
|
- private int type;
|
|
|
|
- private int perms;
|
|
|
|
- private int uid;
|
|
|
|
- private int gid;
|
|
|
|
- private String owner;
|
|
|
|
- private String group;
|
|
|
|
- private long size;
|
|
|
|
- private FileTime accessTime;
|
|
|
|
- private FileTime createTime;
|
|
|
|
- private FileTime modifyTime;
|
|
|
|
- private List<AclEntry> acl;
|
|
|
|
- private Map<String, byte[]> extensions;
|
|
|
|
-
|
|
|
|
- public DirEntryBo(SftpClient.DirEntry t) {
|
|
|
|
- this.filename = t.getFilename();
|
|
|
|
- this.longFilename = t.getLongFilename();
|
|
|
|
- this.flags = t.getAttributes().getFlags();
|
|
|
|
- this.type = t.getAttributes().getType();
|
|
|
|
- this.perms = t.getAttributes().getPermissions();
|
|
|
|
- this.uid = t.getAttributes().getUserId();
|
|
|
|
- this.gid = t.getAttributes().getGroupId();
|
|
|
|
- this.owner = t.getAttributes().getOwner();
|
|
|
|
- this.group = t.getAttributes().getGroup();
|
|
|
|
- this.size = t.getAttributes().getSize();
|
|
|
|
- this.accessTime = t.getAttributes().getAccessTime();
|
|
|
|
- this.createTime = t.getAttributes().getCreateTime();
|
|
|
|
- this.modifyTime = t.getAttributes().getModifyTime();
|
|
|
|
- this.acl = t.getAttributes().getAcl();
|
|
|
|
- this.extensions = t.getAttributes().getExtensions();
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
}
|
|
}
|