Browse Source

fix: 修复任务回调失败

weijianghai 2 years ago
parent
commit
b3c87c6e69

+ 1 - 1
pom.xml

@@ -45,7 +45,7 @@
         <dependency>
             <groupId>com.xuxueli</groupId>
             <artifactId>xxl-job-core</artifactId>
-            <version>2.3.1</version>
+            <version>2.2.0</version>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>

+ 2 - 2
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -1,7 +1,7 @@
 package com.nokia.common.gpload;
 
 import com.nokia.common.gpload.entity.GploadResult;
-import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.log.XxlJobLogger;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.BufferedReader;
@@ -65,7 +65,7 @@ public class GploadUtil {
             result.setMessage(e.getMessage());
         } finally {
             log.debug("gpload的结果为: {}", result);
-            XxlJobHelper.log("gpload的结果为: {}", result);
+            XxlJobLogger.log("gpload的结果为: {}", result);
             destroy();
         }
         return result;

+ 10 - 10
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -5,7 +5,7 @@ import com.nokia.common.ssh.entity.SSHServer;
 import com.nokia.common.ssh.entity.UserInfoImpl;
 import com.nokia.common.ssh.exception.SSHUtilException;
 import com.nokia.common.ssh.exception.ScpAckErrorException;
-import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.log.XxlJobLogger;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
@@ -148,13 +148,13 @@ public class SSHUtil {
             channel.connect();
             if (checkAck(inputStream) != 0) {
                 log.error("scpTo 执行失败");
-                XxlJobHelper.handleFail("scpTo 执行失败");
+                XxlJobLogger.log("scpTo 执行失败");
                 return false;
             }
             File sourceFile = new File(sourceFilePath);
             if (sourceFile.isDirectory()) {
                 log.error("sourceFilePath 必须是文件");
-                XxlJobHelper.handleFail("sourceFilePath 必须是文件");
+                XxlJobLogger.log("sourceFilePath 必须是文件");
                 return false;
             }
             long fileSize = sourceFile.length();
@@ -163,7 +163,7 @@ public class SSHUtil {
             outputStream.flush();
             if (checkAck(inputStream) != 0) {
                 log.error("scpTo 执行失败");
-                XxlJobHelper.handleFail("scpTo 执行失败");
+                XxlJobLogger.log("scpTo 执行失败");
                 return false;
             }
             fileInputStream = new FileInputStream(sourceFile);
@@ -190,7 +190,7 @@ public class SSHUtil {
     public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
         try {
             log.debug(sourceFilePath);
-            XxlJobHelper.log(sourceFilePath);
+            XxlJobLogger.log(sourceFilePath);
             session = getConnectSession();
             // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
             // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
@@ -242,7 +242,7 @@ public class SSHUtil {
                 }
             }
             log.debug("filesize={}, file={}", filesize, file);
-            XxlJobHelper.log("filesize={}, file={}", filesize, file);
+            XxlJobLogger.log("filesize={}, file={}", filesize, file);
             // 发送 '0'
             buf[0] = 0;
             outputStream.write(buf, 0, 1);
@@ -251,7 +251,7 @@ public class SSHUtil {
             File target = new File(targetPath);
             if (target.isDirectory()) {
                 log.debug("{} 是目录,需要添加文件名", target.getAbsolutePath());
-                XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+                XxlJobLogger.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
                 target = new File(targetPath + File.separator + file);
             }
 
@@ -281,7 +281,7 @@ public class SSHUtil {
             outputStream.write(buf, 0, 1);
             outputStream.flush();
             log.debug("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
-            XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+            XxlJobLogger.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
             return true;
         } finally {
             disconnect();
@@ -299,7 +299,7 @@ public class SSHUtil {
         session.setConfig(properties);
         session.connect();
         log.debug("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        XxlJobLogger.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
         return session;
     }
 
@@ -354,7 +354,7 @@ public class SSHUtil {
                 sb.append((char) c);
             } while (c != '\n');
             log.debug("checkAck发现错误消息: ack={}-msg={}", b, sb);
-            XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            XxlJobLogger.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
             if (b == 1 && sb.toString().endsWith("No such file or directory")) {
                 throw new NoSuchFileException(sb.toString());
             } else {

+ 28 - 19
src/main/java/com/nokia/pm_interface_5g/task/FiveGPmTask.java

@@ -6,8 +6,9 @@ import com.nokia.common.gpload.GploadUtil;
 import com.nokia.common.gpload.entity.GploadResult;
 import com.nokia.common.ssh.SSHUtil;
 import com.nokia.common.ssh.exception.SSHUtilException;
-import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.csv.CSVFormat;
@@ -63,22 +64,30 @@ public class FiveGPmTask {
      *
      */
 //    @Scheduled(cron = "0 */10 * * * ?")
-    @XxlJob("exeHandPm5G")
-    public synchronized void cronTask() throws JSchException, SSHUtilException, IOException, SftpException {
-        // 创建文件夹
-        Files.createDirectories(Paths.get(downloadTargetDir));
-        Files.createDirectories(Paths.get(distinctTargetDir));
-        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
-        // 获取文件列表
-        List<String> list = sshUtil.ls(sourceDir);
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
+    @XxlJob("pm5gJobHandler")
+    public ReturnT<String> pm5gJobHandler(String param) {
+        try {
+            // 创建文件夹
+            Files.createDirectories(Paths.get(downloadTargetDir));
+            Files.createDirectories(Paths.get(distinctTargetDir));
+            SSHUtil sshUtil = new SSHUtil(host, port, username, password);
+            // 获取文件列表
+            List<String> list = sshUtil.ls(sourceDir);
+            if (CollectionUtils.isEmpty(list)) {
+                log.debug("没有文件");
+                XxlJobLogger.log("没有文件");
+                return ReturnT.SUCCESS;
+            }
+
+            log.debug("扫描到的文件: {}", list);
+            XxlJobLogger.log("扫描到的文件: {}", list);
+            for (String t : list) {
+                singleTask(t);
+            }
 
-        log.debug("扫描到的文件: {}", list);
-        XxlJobHelper.log("扫描到的文件: {}", list);
-        for (String t : list) {
-            singleTask(t);
+            return ReturnT.SUCCESS;
+        } catch (Exception e) {
+            return ReturnT.FAIL;
         }
     }
 
@@ -109,7 +118,7 @@ public class FiveGPmTask {
         boolean b = sshUtil.scpFrom(sourceFilePath, targetPath);
         if (b) {
             log.debug("文件 {} 下载成功...", targetPath);
-            XxlJobHelper.log("文件 {} 下载成功...", targetPath);
+            XxlJobLogger.log("文件 {} 下载成功...", targetPath);
             //删除远程文件
             sshUtil.delete(sourceFilePath);
         }
@@ -147,12 +156,12 @@ public class FiveGPmTask {
         GploadResult gpload = GploadUtil.gpload(gploadCommand);
         if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
             log.debug("gpload完成: {}", gpload);
-            XxlJobHelper.log("gpload完成: {}", gpload);
+            XxlJobLogger.log("gpload完成: {}", gpload);
             // 删除重排文件
             Files.deleteIfExists(Paths.get(distinctTargetDir + filename));
         } else {
             log.error("gpload 失败: {}", gpload.getMessage());
-            XxlJobHelper.handleFail("gpload 失败: " + gpload.getMessage());
+            XxlJobLogger.log("gpload 失败: {}", gpload.getMessage());
         }
     }
 

+ 0 - 1
src/main/resources/application.properties

@@ -3,7 +3,6 @@ spring.application.name=pm_interface_5g
 logging.level.com.nokia=debug
 ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
 xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
-xxl.job.springboot.
 ### xxl-job, access token
 xxl.job.accessToken=
 

+ 0 - 8
src/test/java/com/nokia/pm_interface_5g/PmInterface5gApplicationTests.java

@@ -1,8 +1,5 @@
 package com.nokia.pm_interface_5g;
 
-import com.jcraft.jsch.JSchException;
-import com.jcraft.jsch.SftpException;
-import com.nokia.common.ssh.exception.SSHUtilException;
 import com.nokia.pm_interface_5g.task.FiveGPmTask;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.csv.CSVFormat;
@@ -29,11 +26,6 @@ class PmInterface5gApplicationTests {
     @Autowired
     private FiveGPmTask fiveGPmTask;
 
-    @Test
-    void test() throws JSchException, IOException, SSHUtilException, SftpException {
-        fiveGPmTask.cronTask();
-    }
-
     @Test
     void testRerrange() {
         String readPath = "a/pm_5g_hour_2022072912.csv";