weijianghai 2 jaren geleden
bovenliggende
commit
e502edf47f

+ 3 - 3
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -19,6 +19,8 @@ public class GploadUtil {
         GploadResult result = new GploadResult();
         try {
             Process process = Runtime.getRuntime().exec(gploadCommand);
+            int exitCode = process.waitFor();
+            result.setTaskStatus(exitCode == 0);
             reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
             List<String> lines = new ArrayList<>();
             String line;
@@ -43,9 +45,7 @@ public class GploadUtil {
                         result.setInsertedCount(Integer.parseInt(s1));
                         result.setUpdatedCount(Integer.parseInt(s2));
                         result.setErrorCount(Integer.parseInt(s3));
-                        result.setTaskStatus(true);
                     } else {
-                        result.setTaskStatus(false);
                         StringBuilder sb = new StringBuilder();
                         for (int j = i - 1; j > 0; j--) {
                             sb.insert(0, lines.get(i));
@@ -58,7 +58,7 @@ public class GploadUtil {
                     }
                 }
             }
-        } catch (IOException e) {
+        } catch (IOException | InterruptedException e) {
             e.printStackTrace();
             result.setTaskStatus(false);
             result.setMessage(e.getMessage());

+ 62 - 9
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -11,7 +11,10 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.*;
 import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
+import java.util.Vector;
 
 /**
  * 使用jsch库实现的ssh的工具类
@@ -32,6 +35,7 @@ public class SSHUtil {
     private FileOutputStream fileOutputStream = null;
     private OutputStream outputStream = null;
     private InputStream inputStream = null;
+    private ChannelSftp channelSftp = null;
 
     public SSHUtil() {
     }
@@ -44,13 +48,48 @@ public class SSHUtil {
         targetServer = new SSHServer(host, port, user, password);
     }
 
+    /**
+     * 获取文件列表
+     */
+    public List<String> ls(String path) throws JSchException, SftpException {
+        session = getConnectSession();
+        channelSftp = (ChannelSftp) session.openChannel("sftp");
+        channelSftp.connect();
+        List<String> fileNameList = new ArrayList<>();
+        Vector fileList = channelSftp.ls(path);
+        for (Object o : fileList) {
+            String fileName = ((ChannelSftp.LsEntry) o).getFilename();
+            if (".".equals(fileName) || "..".equals(fileName)) {
+                continue;
+            }
+
+            fileNameList.add(fileName);
+            channelSftp.quit();
+            session.disconnect();
+        }
+
+        return fileNameList;
+    }
+
+    /**
+     * 删除文件
+     */
+    public void delete(String fileName) throws JSchException, SftpException {
+        session = getConnectSession();
+        channelSftp = (ChannelSftp) session.openChannel("sftp");
+        channelSftp.connect();
+        channelSftp.rm(fileName);
+        channelSftp.quit();
+        session.disconnect();
+    }
+
     /**
      * 远程执行指令
      */
     public String exec(String command) throws JSchException, IOException {
         StringBuilder stringBuilder = new StringBuilder();
         try {
-            Session session = getConnectSession();
+            session = getConnectSession();
             channel = session.openChannel("exec");
             // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
             String execCommand;
@@ -70,11 +109,15 @@ public class SSHUtil {
             while (true) {
                 while (in.available() > 0) {
                     int i = in.read(tmp, 0, 1024);
-                    if (i < 0) break;
+                    if (i < 0) {
+                        break;
+                    }
                     stringBuilder.append(new String(tmp, 0, i));
                 }
                 if (channel.isClosed()) {
-                    if (in.available() > 0) continue;
+                    if (in.available() > 0) {
+                        continue;
+                    }
                     break;
                 }
             }
@@ -123,7 +166,9 @@ public class SSHUtil {
             byte[] buffer = new byte[1024];
             while (true) {
                 int len = fileInputStream.read(buffer, 0, buffer.length);
-                if (len <= 0) break;
+                if (len <= 0) {
+                    break;
+                }
                 outputStream.write(buffer, 0, len);
             }
             buffer[0] = 0;
@@ -176,7 +221,9 @@ public class SSHUtil {
                 if (inputStream.read(buf, 0, 1) < 0) {
                     break;
                 }
-                if (buf[0] == ' ') break;
+                if (buf[0] == ' ') {
+                    break;
+                }
                 filesize = filesize * 10L + (long) (buf[0] - '0');
             }
             // 从 C0644命令读取文件名,命令中的文件名是不带路径的
@@ -215,7 +262,9 @@ public class SSHUtil {
                 }
                 fileOutputStream.write(buf, 0, foo);
                 filesize -= foo;
-                if (filesize == 0L) break;
+                if (filesize == 0L) {
+                    break;
+                }
             }
             if (checkAck(inputStream) != 0) {
                 return false;
@@ -277,8 +326,12 @@ public class SSHUtil {
     private static int checkAck(InputStream in) throws IOException, SSHUtilException {
         int b = in.read();
         // b 取值为0表示成功
-        if (b == 0) return b;
-        if (b == -1) return b;
+        if (b == 0) {
+            return b;
+        }
+        if (b == -1) {
+            return b;
+        }
 
         // 1表示警告 2表示严重错误,将中断连接
         // 1和2 后面会携带一条错误信息,以\n结尾
@@ -291,7 +344,7 @@ public class SSHUtil {
                 c = in.read();
                 sb.append((char) c);
             } while (c != '\n');
-            log.debug("checkAck发现错误消息: ack={}-msg={}", b, sb.toString());
+            log.debug("checkAck发现错误消息: ack={}-msg={}", b, sb);
             if (b == 1 && sb.toString().endsWith("No such file or directory")) {
                 throw new NoSuchFileException(sb.toString());
             } else {

+ 48 - 17
src/main/java/com/nokia/task/LtePmTask.java

@@ -1,5 +1,7 @@
 package com.nokia.task;
+
 import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.SftpException;
 import com.nokia.common.gpload.GploadUtil;
 import com.nokia.common.gpload.entity.GploadResult;
 import com.nokia.common.ssh.SSHUtil;
@@ -12,18 +14,24 @@ import org.apache.commons.csv.CSVPrinter;
 import org.apache.commons.csv.CSVRecord;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 @Slf4j
 /*@Task*/
 @Component
 public class LtePmTask {
-
     @Value("${lte.pm.download.host:10.17.180.55}")
     private String host;
     @Value("${lte.pm.download.port:22}")
@@ -42,24 +50,47 @@ public class LtePmTask {
     private String distinctTargetDir;
 
 
-    @XxlJob("execHandler")
-    public void xxlJobCall(){
-        System.out.println("java-----------");
-    }
+//    @XxlJob("execHandler")
+//    public void xxlJobCall(){
+//        System.out.println("java-----------");
+//    }
 
     /**
      * 扫描文件
      */
-    @XxlJob("")
-    public void scan() throws JSchException, SSHUtilException, IOException {
+//    @XxlJob("")
+//    public void scan() throws JSchException, SSHUtilException, IOException {
+//        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
+//        String exec = sshUtil.exec(sourceDir);
+//        File targetFile = new File(exec);
+//        String[] list = targetFile.list();
+//        for (String s : list) {
+//            String[] split = s.split("\\.");
+//            if (split[1].equals("csv")){
+//                singleTask(split[0]);
+//            }
+//        }
+//    }
+
+    @XxlJob("execHandler")
+    public void cronTask() throws JSchException, SSHUtilException, IOException, SftpException {
+        final Pattern p = Pattern.compile("(?<=" + filePrefix + ")\\d{10}(?=.csv)");
+        // 创建文件夹
+        Files.createDirectories(Paths.get(downloadTargetDir));
+        Files.createDirectories(Paths.get(distinctTargetDir));
         SSHUtil sshUtil = new SSHUtil(host, port, username, password);
-        String exec = sshUtil.exec(sourceDir);
-        File targetFile = new File(exec);
-        String[] list = targetFile.list();
-        for (String s : list) {
-            String[] split = s.split("\\.");
-            if (split[1].equals("csv")){
-                singleTask(split[0]);
+        // 获取文件列表
+        List<String> list = sshUtil.ls(sourceDir);
+        if (CollectionUtils.isEmpty(list)) {
+            return;
+        }
+
+        log.debug("扫描到的文件: {}", list);
+        for (String t : list) {
+            Matcher m = p.matcher(t);
+            if (m.find()) {
+                String hourString = m.group(0);
+                singleTask(hourString);
             }
         }
     }
@@ -99,7 +130,7 @@ public class LtePmTask {
         String inputFilePath = downloadTargetDir ;
         String outputFilePath = distinctTargetDir;
         CSVFormat format = CSVFormat.DEFAULT.builder().build();
-        CSVParser records = format.parse(new InputStreamReader(new FileInputStream(inputFilePath), StandardCharsets.UTF_8));
+        CSVParser records = format.parse(new InputStreamReader(Files.newInputStream(Paths.get(inputFilePath)), StandardCharsets.UTF_8));
         Map<String, CSVRecord> recordMap = new HashMap<>();
         int count = 0;
         for (CSVRecord record : records) {
@@ -118,7 +149,7 @@ public class LtePmTask {
     public void gpload(String hourString) {
         String gploadCommand = "sh /data1/pm/gpload/pm_lte_gpload.sh " + hourString;
         GploadResult gpload = GploadUtil.gpload(gploadCommand);
-        if (gpload.getTaskStatus()) {
+        if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
             log.debug("gpload完成: {}", gpload);
         } else {
             log.error("gpload 失败: {}", gpload.getMessage());