Explorar o código

feat: 优化gploadutil和sshutil

weijianghai %!s(int64=2) %!d(string=hai) anos
pai
achega
d7cff5489d

+ 14 - 0
doc/readme.md

@@ -15,3 +15,17 @@ nokia   Nokia*123
 
 admin
 Richr00t#
+
+## 统计各小时数据
+
+```sql
+select
+	rpt_time,
+	count(*)
+from
+	tsfx.dw_sa_omc_ci_h
+group by
+	rpt_time
+order by
+	rpt_time desc
+```

+ 54 - 40
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -3,6 +3,7 @@ package com.nokia.common.gpload;
 import com.nokia.common.gpload.entity.GploadResult;
 import com.xxl.job.core.context.XxlJobHelper;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.CollectionUtils;
 
 import java.io.BufferedReader;
 import java.io.IOException;
@@ -15,66 +16,79 @@ import java.util.regex.Pattern;
 @Slf4j
 public class GploadUtil {
     public static final Pattern PATTERN = Pattern.compile("= (\\d+)");
-    private static Process process = null;
-    private static BufferedReader reader = null;
 
     public static GploadResult gpload(String gploadCommand) {
+        Process process = null;
         GploadResult result = new GploadResult();
         result.setTaskStatus(true);
         try {
             process = Runtime.getRuntime().exec(gploadCommand);
-            reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
-            List<String> lines = new ArrayList<>();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                lines.add(line);
-            }
-            log.info("gpload result: " + lines);
-            XxlJobHelper.log("gpload result: " + lines);
-            for (String s : lines) {
-                if (s.contains("|rows Inserted")) {
-                    result.setInsertedCount(getNum(s));
-                }
-                if (s.contains("|rows Updated")) {
-                    result.setUpdatedCount(getNum(s));
-                }
-                if (s.contains("|data formatting errors")) {
-                    result.setErrorCount(getNum(s));
+            try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                 BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
+                String line;
+                // 读取错误流
+                List<String> error = new ArrayList<>();
+                while ((line = errorReader.readLine()) != null) {
+                    error.add(line);
                 }
-                if (s.contains("ERROR") || s.contains("gpload failed")) {
+                if (!CollectionUtils.isEmpty(error)) {
+                    log.info("gpload errorStream: " + error);
+                    XxlJobHelper.log("gpload errorStream: " + error);
                     result.setTaskStatus(false);
+                    result.setMessage(error.toString());
+                }
+
+                // 读取标准流
+                List<String> input = new ArrayList<>();
+                while ((line = inputReader.readLine()) != null) {
+                    input.add(line);
                 }
-                if (s.contains("ERROR")) {
-                    result.setMessage(s);
+                if (!CollectionUtils.isEmpty(input)) {
+                    log.info("gpload inputStream: " + input);
+                    XxlJobHelper.log("gpload inputStream: " + input);
+                    for (String s : input) {
+                        if (s.contains("|rows Inserted")) {
+                            result.setInsertedCount(getNum(s));
+                        }
+                        if (s.contains("|rows Updated")) {
+                            result.setUpdatedCount(getNum(s));
+                        }
+                        if (s.contains("|data formatting errors")) {
+                            result.setErrorCount(getNum(s));
+                        }
+                        if (s.contains("ERROR") || s.contains("gpload failed")) {
+                            result.setTaskStatus(false);
+                        }
+                        if (s.contains("ERROR")) {
+                            result.setMessage(result.getMessage() + "\n" + s);
+                        }
+                    }
                 }
             }
+
+            int code = process.waitFor();
+            if (code != 0) {
+                result.setTaskStatus(false);
+            }
         } catch (IOException e) {
             log.error("gpload发生异常: {}", e.getMessage(), e);
             XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
             result.setTaskStatus(false);
             result.setMessage(e.getMessage());
+        } catch (InterruptedException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
+            XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
+            result.setTaskStatus(false);
+            result.setMessage(e.getMessage());
+            Thread.currentThread().interrupt();
         } finally {
-            log.debug("gpload的结果为: {}", result);
+            log.info("gpload的结果为: {}", result);
             XxlJobHelper.log("gpload的结果为: {}", result);
-            destroy();
-        }
-        return result;
-    }
-
-    private static void destroy() {
-        if (reader != null) {
-            try {
-                reader.close();
-            } catch (IOException e) {
-                log.error("close reader error: {}", e.getMessage(), e);
-                XxlJobHelper.log("close reader error: {}", e.getMessage(), e);
+            if (process != null) {
+                process.destroy();
             }
-            reader = null;
-        }
-        if (process != null) {
-            process.destroy();
-            process = null;
         }
+        return result;
     }
 
     /**

+ 1 - 1
src/main/java/com/nokia/common/gpload/entity/GploadResult.java

@@ -8,5 +8,5 @@ public class GploadResult {
     private Integer insertedCount;
     private Integer updatedCount;
     private Integer errorCount;
-    private String message;
+    private String message = "";
 }

+ 212 - 196
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -11,7 +11,9 @@ import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.*;
+import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
@@ -53,10 +55,10 @@ public class SSHUtil {
     /**
      * 获取文件列表
      */
+    @SuppressWarnings("rawtypes")
     public List<String> ls(String path) throws JSchException, SftpException {
-        session = getConnectSession();
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        channelSftp.connect();
+        getConnectSession();
+        channelSftpConnect();
         List<String> fileNameList = new ArrayList<>();
         Vector fileList = channelSftp.ls(path);
         for (Object o : fileList) {
@@ -64,25 +66,30 @@ public class SSHUtil {
             if (".".equals(fileName) || "..".equals(fileName)) {
                 continue;
             }
-
             fileNameList.add(fileName);
-            channelSftp.quit();
-            session.disconnect();
         }
 
         return fileNameList.stream().sorted().collect(Collectors.toList());
     }
 
+    /**
+     * 下载文件
+     */
+    public void get(String src, String dst) throws JSchException, SftpException, IOException {
+        try (OutputStream out = Files.newOutputStream(Paths.get(dst))) {
+            getConnectSession();
+            channelSftpConnect();
+            channelSftp.get(src, out);
+        }
+    }
+
     /**
      * 删除文件
      */
-    public void delete(String fileName) throws JSchException, SftpException {
-        session = getConnectSession();
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        channelSftp.connect();
-        channelSftp.rm(fileName);
-        channelSftp.quit();
-        session.disconnect();
+    public void rm(String path) throws JSchException, SftpException {
+        getConnectSession();
+        channelSftpConnect();
+        channelSftp.rm(path);
     }
 
     /**
@@ -90,41 +97,37 @@ public class SSHUtil {
      */
     public String exec(String command) throws JSchException, IOException {
         StringBuilder stringBuilder = new StringBuilder();
-        try {
-            session = getConnectSession();
-            channel = session.openChannel("exec");
-            // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
-            String execCommand;
-            // 在命令前添加 bash --login -c "command"以获取环境变量
-            // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
-            if (command.startsWith("bash --login -c")) {
-                execCommand = command;
-            } else {
-                execCommand = String.format("bash --login -c \"%s\"", command);
-            }
-            ((ChannelExec) channel).setCommand(execCommand);
-            channel.setInputStream(null);
-            ((ChannelExec) channel).setErrStream(System.err);
-            InputStream in = channel.getInputStream();
-            channel.connect();
-            byte[] tmp = new byte[1024];
-            while (true) {
-                while (in.available() > 0) {
-                    int i = in.read(tmp, 0, 1024);
-                    if (i < 0) {
-                        break;
-                    }
-                    stringBuilder.append(new String(tmp, 0, i));
-                }
-                if (channel.isClosed()) {
-                    if (in.available() > 0) {
-                        continue;
-                    }
+        getConnectSession();
+        channel = session.openChannel("exec");
+        // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
+        String execCommand;
+        // 在命令前添加 bash --login -c "command"以获取环境变量
+        // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
+        if (command.startsWith("bash --login -c")) {
+            execCommand = command;
+        } else {
+            execCommand = String.format("bash --login -c \"%s\"", command);
+        }
+        ((ChannelExec) channel).setCommand(execCommand);
+        channel.setInputStream(null);
+        ((ChannelExec) channel).setErrStream(System.err);
+        InputStream in = channel.getInputStream();
+        channel.connect();
+        byte[] tmp = new byte[1024];
+        while (true) {
+            while (in.available() > 0) {
+                int i = in.read(tmp, 0, 1024);
+                if (i < 0) {
                     break;
                 }
+                stringBuilder.append(new String(tmp, 0, i));
+            }
+            if (channel.isClosed()) {
+                if (in.available() > 0) {
+                    continue;
+                }
+                break;
             }
-        } finally {
-            disconnect();
         }
         return stringBuilder.toString();
     }
@@ -135,176 +138,185 @@ public class SSHUtil {
      * 注意,文件名不能包含中文
      */
     public boolean scpTo(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
-        try {
-            session = getConnectSession();
-            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
-            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
-            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
-            // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
-            String command = "scp " + "-t " + targetPath;
-            channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-            outputStream = channel.getOutputStream();
-            inputStream = channel.getInputStream();
-            channel.connect();
-            if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
-                XxlJobHelper.log("scpTo 执行失败");
-                return false;
-            }
-            File sourceFile = new File(sourceFilePath);
-            if (sourceFile.isDirectory()) {
-                log.error("sourceFilePath 必须是文件");
-                XxlJobHelper.log("sourceFilePath 必须是文件");
-                return false;
-            }
-            long fileSize = sourceFile.length();
-            command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
-            outputStream.write(command.getBytes());
-            outputStream.flush();
-            if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
-                XxlJobHelper.log("scpTo 执行失败");
-                return false;
-            }
-            fileInputStream = new FileInputStream(sourceFile);
-            byte[] buffer = new byte[1024];
-            while (true) {
-                int len = fileInputStream.read(buffer, 0, buffer.length);
-                if (len <= 0) {
-                    break;
-                }
-                outputStream.write(buffer, 0, len);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp " + "-t " + targetPath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        File sourceFile = new File(sourceFilePath);
+        if (sourceFile.isDirectory()) {
+            log.error("sourceFilePath 必须是文件");
+            XxlJobHelper.log("sourceFilePath 必须是文件");
+            return false;
+        }
+        long fileSize = sourceFile.length();
+        command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
+        outputStream.write(command.getBytes());
+        outputStream.flush();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        fileInputStream = new FileInputStream(sourceFile);
+        byte[] buffer = new byte[1024];
+        while (true) {
+            int len = fileInputStream.read(buffer, 0, buffer.length);
+            if (len <= 0) {
+                break;
             }
-            buffer[0] = 0;
-            outputStream.write(buffer, 0, 1);
-            outputStream.flush();
-            return checkAck(inputStream) == 0;
-        } finally {
-            disconnect();
+            outputStream.write(buffer, 0, len);
         }
+        buffer[0] = 0;
+        outputStream.write(buffer, 0, 1);
+        outputStream.flush();
+        return checkAck(inputStream) == 0;
     }
 
     /**
      * 使用scp把targetServer目录下的文件复制到本地
      */
     public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
-        try {
-            log.debug(sourceFilePath);
-            XxlJobHelper.log(sourceFilePath);
-            session = getConnectSession();
-            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
-            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
-            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
-            // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
-            String command = "scp -f " + sourceFilePath;
-            Channel channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-            outputStream = channel.getOutputStream();
-            inputStream = channel.getInputStream();
-            channel.connect();
-            byte[] buf = new byte[1024];
-            // 发送指令 '0'
-            // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
-            // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
-            // 当文件接收完成后, 宿端会给源端发送一个'0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            // 接收C0644 这条消息携带了文件的信息
-            while (true) {
-                int c = checkAck(inputStream);
-                // 遇到C时跳出循环
-                if (c == 'C') {
-                    break;
-                }
+        log.info(sourceFilePath);
+        XxlJobHelper.log(sourceFilePath);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp -f " + sourceFilePath;
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        byte[] buf = new byte[1024];
+        // 发送指令 '0'
+        // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
+        // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
+        // 当文件接收完成后, 宿端会给源端发送一个'0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 接收C0644 这条消息携带了文件的信息
+        while (true) {
+            int c = checkAck(inputStream);
+            // 遇到C时跳出循环
+            if (c == 'C') {
+                break;
             }
-            // 接收 '0644 ' 这段字符表示文件的权限
-            inputStream.read(buf, 0, 5);
-            // 获取filesize
-            long filesize = 0L;
-            while (true) {
-                if (inputStream.read(buf, 0, 1) < 0) {
-                    break;
-                }
-                if (buf[0] == ' ') {
-                    break;
-                }
-                filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 接收 '0644 ' 这段字符表示文件的权限
+        inputStream.read(buf, 0, 5);
+        // 获取filesize
+        long filesize = 0L;
+        while (true) {
+            if (inputStream.read(buf, 0, 1) < 0) {
+                break;
             }
-            // 从 C0644命令读取文件名,命令中的文件名是不带路径的
-            String file = null;
-            for (int i = 0; ; i++) {
-                inputStream.read(buf, i, 1);
-                // 0x0a 是LF 换行符
-                if (buf[i] == (byte) 0x0a) {
-                    file = new String(buf, 0, i);
-                    break;
-                }
+            if (buf[0] == ' ') {
+                break;
             }
-            log.debug("filesize={}, file={}", filesize, file);
-            XxlJobHelper.log("filesize={}, file={}", filesize, file);
-            // 发送 '0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            // 如果目标是目录,则需要加上文件名
-            File target = new File(targetPath);
-            if (target.isDirectory()) {
-                log.debug("{} 是目录,需要添加文件名", target.getAbsolutePath());
-                XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
-                target = new File(targetPath + File.separator + file);
+            filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 从 C0644命令读取文件名,命令中的文件名是不带路径的
+        String file = null;
+        for (int i = 0; ; i++) {
+            inputStream.read(buf, i, 1);
+            // 0x0a 是LF 换行符
+            if (buf[i] == (byte) 0x0a) {
+                file = new String(buf, 0, i);
+                break;
             }
+        }
+        log.info("filesize={}, file={}", filesize, file);
+        XxlJobHelper.log("filesize={}, file={}", filesize, file);
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 如果目标是目录,则需要加上文件名
+        File target = new File(targetPath);
+        if (target.isDirectory()) {
+            log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            target = new File(targetPath + File.separator + file);
+        }
 
-            fileOutputStream = new FileOutputStream(target);
-            int foo;
-            while (true) {
-                if (buf.length < filesize) {
-                    foo = buf.length;
-                } else {
-                    foo = (int) filesize;
-                }
-                foo = inputStream.read(buf, 0, foo);
-                if (foo < 0) {
-                    break;
-                }
-                fileOutputStream.write(buf, 0, foo);
-                filesize -= foo;
-                if (filesize == 0L) {
-                    break;
-                }
+        fileOutputStream = new FileOutputStream(target);
+        int foo;
+        while (true) {
+            if (buf.length < filesize) {
+                foo = buf.length;
+            } else {
+                foo = (int) filesize;
+            }
+            foo = inputStream.read(buf, 0, foo);
+            if (foo < 0) {
+                break;
             }
-            if (checkAck(inputStream) != 0) {
-                return false;
+            fileOutputStream.write(buf, 0, foo);
+            filesize -= foo;
+            if (filesize == 0L) {
+                break;
             }
-            // 发送 '0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            log.debug("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
-            XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
-            return true;
-        } finally {
-            disconnect();
         }
+        if (checkAck(inputStream) != 0) {
+            return false;
+        }
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        return true;
     }
 
-    private Session getConnectSession() throws JSchException {
-        jSch = new JSch();
-        session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        session.setPassword(targetServer.getPassword());
-        session.setUserInfo(new UserInfoImpl());
-        // 不需要输入保存ssh安全密钥的yes或no
-        Properties properties = new Properties();
-        properties.put("StrictHostKeyChecking", "no");
-        session.setConfig(properties);
-        session.connect();
-        log.debug("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        return session;
+    public void getConnectSession() throws JSchException {
+        if (jSch == null) {
+            jSch = new JSch();
+        }
+
+        if (session == null) {
+            session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            session.setPassword(targetServer.getPassword());
+            session.setUserInfo(new UserInfoImpl());
+            // 不需要输入保存ssh安全密钥的yes或no
+            Properties properties = new Properties();
+            properties.put("StrictHostKeyChecking", "no");
+            session.setConfig(properties);
+        }
+
+        if (!session.isConnected()) {
+            session.connect();
+            log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        }
     }
 
-    private void disconnect() throws IOException {
+    public void channelSftpConnect() throws JSchException {
+        if (channelSftp == null) {
+            channelSftp = (ChannelSftp) session.openChannel("sftp");
+        }
+
+        if (!channelSftp.isConnected()) {
+            channelSftp.connect();
+        }
+    }
+
+    public void disconnect() throws IOException {
         if (fileOutputStream != null) {
             fileOutputStream.close();
             fileOutputStream = null;
@@ -321,11 +333,15 @@ public class SSHUtil {
             channel.disconnect();
             channel = null;
         }
+        if (channelSftp != null) {
+            channelSftp.quit();
+        }
         if (session != null) {
             session.disconnect();
             session = null;
         }
         jSch = null;
+        log.info("jsch disconnected");
     }
 
     /**
@@ -354,7 +370,7 @@ public class SSHUtil {
                 c = in.read();
                 sb.append((char) c);
             } while (c != '\n');
-            log.debug("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            log.info("checkAck发现错误消息: ack={}-msg={}", b, sb);
             XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
             if (b == 1 && sb.toString().endsWith("No such file or directory")) {
                 throw new NoSuchFileException(sb.toString());

+ 27 - 24
src/main/java/com/nokia/pm_interface_5g/task/FiveGPmTask.java

@@ -19,7 +19,6 @@ import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
@@ -50,7 +49,7 @@ public class FiveGPmTask {
     private String username;
     @Value("${fiveg.pm.download.password:Nokia*123}")
     private String password;
-    @Value("${fiveg.pm.download.sourceDir:/data/out2/pm_5g_hour}")
+    @Value("${fiveg.pm.download.sourceDir:/data/out2/pm_5g_hour/}")
     private String sourceDir;
     @Value("${fiveg.pm.download.targetDir:download/}")
     private String downloadTargetDir;
@@ -59,6 +58,7 @@ public class FiveGPmTask {
     @Value("${fiveg.pm.distinct.targetDir:distinct/}")
     private String distinctTargetDir;
     private final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
+    private SSHUtil sshUtil;
 
     /**
      * 定时任务
@@ -71,16 +71,16 @@ public class FiveGPmTask {
             // 创建文件夹
             Files.createDirectories(Paths.get(downloadTargetDir));
             Files.createDirectories(Paths.get(distinctTargetDir));
-            SSHUtil sshUtil = new SSHUtil(host, port, username, password);
+            sshUtil = new SSHUtil(host, port, username, password);
             // 获取文件列表
             List<String> list = sshUtil.ls(sourceDir);
             if (CollectionUtils.isEmpty(list)) {
-                log.debug("没有文件");
+                log.info("没有文件");
                 XxlJobHelper.log("没有文件");
                 return;
             }
 
-            log.debug("扫描到的文件: {}", list);
+            log.info("扫描到的文件: {}", list);
             XxlJobHelper.log("扫描到的文件: {}", list);
             for (String t : list) {
                 singleTask(t);
@@ -89,6 +89,14 @@ public class FiveGPmTask {
             log.error("发生异常了: {}", e.getMessage(), e);
             XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
             XxlJobHelper.handleFail(e.getMessage());
+        } finally {
+            try {
+                sshUtil.disconnect();
+            } catch (IOException e) {
+                log.error("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.handleFail(e.getMessage());
+            }
         }
     }
 
@@ -129,20 +137,15 @@ public class FiveGPmTask {
      * @param filename 文件名
      */
     public void download(String filename) throws JSchException, SSHUtilException, IOException, SftpException {
-        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
-        String sourceFilePath = sourceDir + "/" + filename;
-        File targetFile = new File(downloadTargetDir + filename);
-        if (!targetFile.exists()) {
-            targetFile.getParentFile().mkdirs();
-        }
-        String targetPath = targetFile.getAbsolutePath();
-        boolean b = sshUtil.scpFrom(sourceFilePath, targetPath);
-        if (b) {
-            log.debug("文件 {} 下载成功...", targetPath);
-            XxlJobHelper.log("文件 {} 下载成功...", targetPath);
-            //删除远程文件
-            sshUtil.delete(sourceFilePath);
-        }
+        log.info("下载文件: {}", filename);
+        XxlJobHelper.log("下载文件: {}", filename);
+        String src = sourceDir + filename;
+        String dst = downloadTargetDir + filename;
+        sshUtil.get(src, dst);
+        // 删除远程文件
+        log.info("删除远程文件: {}", filename);
+        XxlJobHelper.log("删除远程文件: {}", filename);
+        sshUtil.rm(src);
     }
 
     /**
@@ -151,7 +154,7 @@ public class FiveGPmTask {
      * @param filename 文件名
      */
     public void distinct(String filename) throws IOException {
-        log.debug("文件 {} 去重...", filename);
+        log.info("文件 {} 去重...", filename);
         XxlJobHelper.log("文件 {} 去重...", filename);
         String inputFilePath = downloadTargetDir + filename;
         String outputFilePath = distinctTargetDir + filename;
@@ -170,11 +173,11 @@ public class FiveGPmTask {
             rerrange(printer, map);
         }
 
-        log.debug("文件 {} 去重完成...", filename);
+        log.info("文件 {} 去重完成...", filename);
         XxlJobHelper.log("文件 {} 去重完成...", filename);
         // 删除本地源文件
         Files.deleteIfExists(inputPath);
-        log.debug("删除本地源文件 {}...", filename);
+        log.info("删除本地源文件 {}...", filename);
         XxlJobHelper.log("删除本地源文件 {}...", filename);
     }
 
@@ -182,12 +185,12 @@ public class FiveGPmTask {
         String gploadCommand = "sh /data1/pm_5g/gpload/pm_nr_gpload.sh " + filename;
         GploadResult gpload = GploadUtil.gpload(gploadCommand);
         if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
-            log.debug("gpload完成: {}", gpload);
+            log.info("gpload完成: {}", gpload);
             XxlJobHelper.log("gpload完成: {}", gpload);
             // 删除重排文件
             Files.deleteIfExists(Paths.get(distinctTargetDir + filename));
         } else {
-            throw new RuntimeException(gpload.getMessage());
+            throw new RuntimeException("gpload失败: " + gpload.getMessage());
         }
     }
 

+ 1 - 1
src/main/resources/application.properties

@@ -1,6 +1,6 @@
 server.port=12092
 spring.application.name=pm_interface_5g
-logging.level.com.nokia=debug
+logging.level.com.nokia=info
 ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
 xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
 ### xxl-job, access token

+ 3 - 3
src/main/resources/logback-spring.xml

@@ -3,7 +3,7 @@
     <property name="PATH" value="./log"/>
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
-            <Pattern>%d{HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+            <Pattern>%d{HH:mm:ss.SSS} %highlight(%-5level) %yellow(%X{traceId}) %cyan(%logger:%line) %msg%n</Pattern>
         </encoder>
     </appender>
     <appender name="TRACE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -17,7 +17,7 @@
             <totalSizeCap>20GB</totalSizeCap>
         </rollingPolicy>
         <encoder>
-            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+            <Pattern>%d %highlight(%-5level) %yellow(%X{traceId}) %cyan(%logger:%line) %msg%n</Pattern>
         </encoder>
     </appender>
     <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -31,7 +31,7 @@
             <totalSizeCap>20GB</totalSizeCap>
         </rollingPolicy>
         <encoder>
-            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+            <Pattern>%d %highlight(%-5level) %yellow(%X{traceId}) %cyan(%logger:%line) %msg%n</Pattern>
         </encoder>
         <filter class="ch.qos.logback.classic.filter.LevelFilter">
             <level>ERROR</level>