weijianghai 2 жил өмнө
parent
commit
b58178fb75

+ 5 - 0
pom.xml

@@ -42,6 +42,11 @@
             <artifactId>lombok</artifactId>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+            <version>2.3.1</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-csv</artifactId>

+ 2 - 1
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -1,6 +1,7 @@
 package com.nokia.common.gpload;
 
 import com.nokia.common.gpload.entity.GploadResult;
+import com.xxl.job.core.context.XxlJobHelper;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.BufferedReader;
@@ -63,7 +64,7 @@ public class GploadUtil {
             result.setTaskStatus(false);
             result.setMessage(e.getMessage());
         } finally {
-            log.debug("gpload的结果为: {}", result);
+            XxlJobHelper.log("gpload的结果为: {}", result);
             destroy();
         }
         return result;

+ 14 - 10
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -5,13 +5,17 @@ import com.nokia.common.ssh.entity.SSHServer;
 import com.nokia.common.ssh.entity.UserInfoImpl;
 import com.nokia.common.ssh.exception.SSHUtilException;
 import com.nokia.common.ssh.exception.ScpAckErrorException;
+import com.xxl.job.core.context.XxlJobHelper;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.*;
 import java.nio.file.NoSuchFileException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Vector;
 
 /**
  * 使用jsch库实现的ssh的工具类
@@ -143,12 +147,12 @@ public class SSHUtil {
             inputStream = channel.getInputStream();
             channel.connect();
             if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
+                XxlJobHelper.handleFail("scpTo 执行失败");
                 return false;
             }
             File sourceFile = new File(sourceFilePath);
             if (sourceFile.isDirectory()) {
-                log.error("sourceFilePath 必须是文件");
+                XxlJobHelper.handleFail("sourceFilePath 必须是文件");
                 return false;
             }
             long fileSize = sourceFile.length();
@@ -156,7 +160,7 @@ public class SSHUtil {
             outputStream.write(command.getBytes());
             outputStream.flush();
             if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
+                XxlJobHelper.handleFail("scpTo 执行失败");
                 return false;
             }
             fileInputStream = new FileInputStream(sourceFile);
@@ -182,7 +186,7 @@ public class SSHUtil {
      */
     public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
         try {
-            log.debug(sourceFilePath);
+            XxlJobHelper.log(sourceFilePath);
             session = getConnectSession();
             // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
             // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
@@ -233,7 +237,7 @@ public class SSHUtil {
                     break;
                 }
             }
-            log.debug("filesize={}, file={}", filesize, file);
+            XxlJobHelper.log("filesize={}, file={}", filesize, file);
             // 发送 '0'
             buf[0] = 0;
             outputStream.write(buf, 0, 1);
@@ -241,7 +245,7 @@ public class SSHUtil {
             // 如果目标是目录,则需要加上文件名
             File target = new File(targetPath);
             if (target.isDirectory()) {
-                log.debug("{} 是目录,需要添加文件名", target.getAbsolutePath());
+                XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
                 target = new File(targetPath + File.separator + file);
             }
 
@@ -270,7 +274,7 @@ public class SSHUtil {
             buf[0] = 0;
             outputStream.write(buf, 0, 1);
             outputStream.flush();
-            log.debug("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+            XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
             return true;
         } finally {
             disconnect();
@@ -287,7 +291,7 @@ public class SSHUtil {
         properties.put("StrictHostKeyChecking", "no");
         session.setConfig(properties);
         session.connect();
-        log.debug("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
         return session;
     }
 
@@ -341,7 +345,7 @@ public class SSHUtil {
                 c = in.read();
                 sb.append((char) c);
             } while (c != '\n');
-            log.debug("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
             if (b == 1 && sb.toString().endsWith("No such file or directory")) {
                 throw new NoSuchFileException(sb.toString());
             } else {

+ 1 - 2
src/main/java/com/nokia/pm_interface_5g/PmInterface5gApplication.java

@@ -2,9 +2,8 @@ package com.nokia.pm_interface_5g;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.scheduling.annotation.EnableScheduling;
 
-@EnableScheduling
+//@EnableScheduling
 @SpringBootApplication
 public class PmInterface5gApplication {
 

+ 78 - 0
src/main/java/com/nokia/pm_interface_5g/config/XxlJobConfig.java

@@ -0,0 +1,78 @@
+package com.nokia.pm_interface_5g.config;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * xxl-job config
+ *
+ * @author xuxueli 2017-04-28
+ */
+@Configuration
+public class XxlJobConfig {
+    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
+
+    @Value("${xxl.job.admin.addresses}")
+    private String adminAddresses;
+
+    @Value("${xxl.job.accessToken}")
+    private String accessToken;
+
+    @Value("${xxl.job.executor.appname}")
+    private String appname;
+
+    @Value("${xxl.job.executor.address}")
+    private String address;
+
+    @Value("${xxl.job.executor.ip}")
+    private String ip;
+
+    @Value("${xxl.job.executor.port}")
+    private int port;
+
+    @Value("${xxl.job.executor.logpath}")
+    private String logPath;
+
+    @Value("${xxl.job.executor.logretentiondays}")
+    private int logRetentionDays;
+
+
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor() {
+        logger.info(">>>>>>>>>>> xxl-job config init.");
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
+        xxlJobSpringExecutor.setAppname(appname);
+        xxlJobSpringExecutor.setAddress(address);
+        xxlJobSpringExecutor.setIp(ip);
+        xxlJobSpringExecutor.setPort(port);
+        xxlJobSpringExecutor.setAccessToken(accessToken);
+        xxlJobSpringExecutor.setLogPath(logPath);
+        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
+
+        return xxlJobSpringExecutor;
+    }
+
+    /**
+     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
+     *
+     *      1、引入依赖:
+     *          <dependency>
+     *             <groupId>org.springframework.cloud</groupId>
+     *             <artifactId>spring-cloud-commons</artifactId>
+     *             <version>${version}</version>
+     *         </dependency>
+     *
+     *      2、配置文件,或者容器启动变量
+     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
+     *
+     *      3、获取IP
+     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
+     */
+
+
+}

+ 8 - 6
src/main/java/com/nokia/pm_interface_5g/task/FiveGPmTask.java

@@ -6,6 +6,8 @@ import com.nokia.common.gpload.GploadUtil;
 import com.nokia.common.gpload.entity.GploadResult;
 import com.nokia.common.ssh.SSHUtil;
 import com.nokia.common.ssh.exception.SSHUtilException;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.csv.CSVFormat;
@@ -13,7 +15,6 @@ import org.apache.commons.csv.CSVParser;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.commons.csv.CSVRecord;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
@@ -61,7 +62,8 @@ public class FiveGPmTask {
      * 定时任务
      *
      */
-    @Scheduled(cron = "0 */10 * * * ?")
+//    @Scheduled(cron = "0 */10 * * * ?")
+    @XxlJob("exeHandPm5G")
     public void cronTask() throws JSchException, SSHUtilException, IOException, SftpException {
         // 创建文件夹
         Files.createDirectories(Paths.get(downloadTargetDir));
@@ -73,7 +75,7 @@ public class FiveGPmTask {
             return;
         }
 
-        log.debug("扫描到的文件: {}", list);
+        XxlJobHelper.log("扫描到的文件: {}", list);
         for (String t : list) {
             singleTask(t);
         }
@@ -105,7 +107,7 @@ public class FiveGPmTask {
         String targetPath = targetFile.getAbsolutePath();
         boolean b = sshUtil.scpFrom(sourceFilePath, targetPath);
         if (b) {
-            log.debug("文件 {} 下载成功...", targetPath);
+            XxlJobHelper.log("文件 {} 下载成功...", targetPath);
             //删除远程文件
             sshUtil.delete(sourceFilePath);
         }
@@ -142,11 +144,11 @@ public class FiveGPmTask {
         String gploadCommand = "sh /data1/pm_5g/gpload/pm_nr_gpload.sh " + filename;
         GploadResult gpload = GploadUtil.gpload(gploadCommand);
         if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
-            log.debug("gpload完成: {}", gpload);
+            XxlJobHelper.log("gpload完成: {}", gpload);
             // 删除重排文件
             Files.deleteIfExists(Paths.get(distinctTargetDir + filename));
         } else {
-            log.error("gpload 失败: {}", gpload.getMessage());
+            XxlJobHelper.handleFail("gpload 失败: " + gpload.getMessage());
         }
     }
 

+ 17 - 0
src/main/resources/application.properties

@@ -1,3 +1,20 @@
 server.port=12092
 spring.application.name=pm_interface_5g
 logging.level.com.nokia=debug
+### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
+xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+xxl.job.springboot.
+### xxl-job, access token
+xxl.job.accessToken=
+
+### xxl-job executor appname
+xxl.job.executor.appname=xxl-job-executor-sample2
+### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
+xxl.job.executor.address=
+### xxl-job executor server-info
+xxl.job.executor.ip=
+xxl.job.executor.port=9999
+### xxl-job executor log-path
+xxl.job.executor.logpath=/data1/pm_5g/pm5Glog/
+### xxl-job executor log-retention-days
+xxl.job.executor.logretentiondays=30