فهرست منبع

2024-04-25更新

gtj 9 ماه پیش
والد
کامیت
6bf939c0c0

+ 2 - 2
other/gpload/pm_lte_gpload.sh

@@ -6,13 +6,13 @@ new_date=$1
 
 # 修改gpload配置文件
 
-sed -i 's/\/data1\/pm\/pm_4g\/distinct\/pm_4g_hour_[0-9]\{10\}.csv/\/data1\/pm\/distinct\/pm_4g_hour_'${new_date}'.csv/' /data1/pm/pm_4g/pm_lte_gpload2.yml
+sed -i 's/\/data\/pm\/pm_4g\/distinct\/pm_4g_hour_[0-9]\{10\}.csv/\/data\/pm\/distinct\/pm_4g_hour_'${new_date}'.csv/' /data/pm/pm_4g/pm_lte_gpload2.yml
 # 由于gpload需要输入密码,这里需要使用expect执行
 password=sqmdb_1QAZ
 
 expect -c "
 set timeout 300
-spawn gpload -f /data1/pm/pm_4g/pm_lte_gpload2.yml
+spawn gpload -f /data/pm/pm_4g/pm_lte_gpload2.yml
 expect {
 \"connecting (yes/no)?\" { send \"yes\n\";exp_continue }
 \"Password:\" { send \"${password}\n\"; exp_continue}

+ 2 - 2
other/gpload/pm_lte_gpload2.yml

@@ -1,7 +1,7 @@
 VERSION: 1.0.0.1
 DATABASE: sqmmt
 USER: sqmdb
-HOST: 192.168.50.5
+HOST: 192.168.70.109
 PORT: 5432
 GPLOAD:
   INPUT:
@@ -10,7 +10,7 @@ GPLOAD:
           - 192.168.10.7
         PORT: 54321
         FILE:
-          - /data1/pm/pm_4g/distinct/pm_4g_hour_2022052409.csv
+          - data/pm/pm_4g/distinct/pm_4g_hour_2022052409.csv
     - FORMAT: csv
     - DELIMITER: ','
     - HEADER: true

+ 5 - 0
pom.xml

@@ -62,6 +62,11 @@
             <artifactId>spring-boot-autoconfigure</artifactId>
             <version>2.3.0.RELEASE</version>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

+ 38 - 0
src/main/java/com/nokia/common/controller/LtePmTaskController.java

@@ -0,0 +1,38 @@
+package com.nokia.common.controller;
+
+import com.nokia.task.LtePmTask;
+import lombok.extern.log4j.Log4j2;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.web.bind.annotation.*;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+@Log4j2
+@RestController
+@RequestMapping("/ltePmTask")
+public class LtePmTaskController {
+
+    @Autowired
+    LtePmTask ltePmTask;
+
+    @GetMapping("/ltePmTaskReissue")
+    public String reissue(@RequestParam String hostString) {
+
+        // 空值检查
+        if (ltePmTask == null || hostString == null) {
+            return "参数不能为空";
+        }
+        try {
+            // 执行任务
+            ltePmTask.gpload(hostString);
+            return "补发成功";
+        } catch (IOException e) {
+            // 异常处理,可以记录日志或者返回特定错误信息
+            return "补发失败,发生IO异常:" + e.getMessage();
+        }
+    }
+}

+ 0 - 86
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -1,86 +0,0 @@
-package com.nokia.common.gpload;
-
-import com.nokia.common.gpload.entity.GploadResult;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.List;
-
-@Slf4j
-public class GploadUtil {
-
-    private static Process process = null;
-    private static BufferedReader reader = null;
-
-    public static GploadResult gpload(String gploadCommand) {
-        GploadResult result = new GploadResult();
-        try {
-            Process process = Runtime.getRuntime().exec(gploadCommand);
-            int exitCode = process.waitFor();
-            result.setTaskStatus(exitCode == 0);
-            reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
-            List<String> lines = new ArrayList<>();
-            String line;
-            while ((line = reader.readLine()) != null) {
-                lines.add(line);
-            }
-            for (int i = 0; i < lines.size(); i++) {
-                if (lines.get(i).contains("|INFO|rows Inserted")) {
-                    // 找到第一行 对应的是插入数据的数量
-                    // s4可以判断gpload是否成功
-                    String s4 = lines.get(i + 3);
-                    if (s4.endsWith("succeeded")) {
-                        // 插入数量
-                        String s1 = lines.get(i);
-                        s1 = s1.substring(s1.indexOf('=') + 2);
-                        // 更新数量
-                        String s2 = lines.get(i + 1);
-                        s2 = s2.substring(s2.indexOf('=') + 2);
-                        // 错误数量
-                        String s3 = lines.get(i + 2);
-                        s3 = s3.substring(s3.indexOf('=') + 2);
-                        result.setInsertedCount(Integer.parseInt(s1));
-                        result.setUpdatedCount(Integer.parseInt(s2));
-                        result.setErrorCount(Integer.parseInt(s3));
-                    } else {
-                        StringBuilder sb = new StringBuilder();
-                        for (int j = i - 1; j > 0; j--) {
-                            sb.insert(0, lines.get(i));
-                            if (lines.get(i).contains("|ERROR|ERROR:")) {
-                                sb.insert(0, lines.get(i));
-                                break;
-                            }
-                        }
-                        result.setMessage(sb.toString());
-                    }
-                }
-            }
-        } catch (IOException | InterruptedException e) {
-            e.printStackTrace();
-            result.setTaskStatus(false);
-            result.setMessage(e.getMessage());
-        } finally {
-            log.debug("gpload的结果为: {}", result);
-            destroy();
-        }
-        return result;
-    }
-
-    private static void destroy() {
-        if (reader != null) {
-            try {
-                reader.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            reader = null;
-        }
-        if (process != null) {
-            process.destroy();
-            process = null;
-        }
-    }
-}

+ 88 - 0
src/main/java/com/nokia/common/gpload/entity/GploadUtil.java

@@ -0,0 +1,88 @@
+package com.nokia.common.gpload.entity;
+
+import com.xxl.job.core.context.XxlJobHelper;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Slf4j
+public class GploadUtil {
+    public static final Pattern PATTERN = Pattern.compile("= (\\d+)");
+    private static Process process = null;
+    private static BufferedReader reader = null;
+
+    public static GploadResult gpload(String gploadCommand) {
+        GploadResult result = new GploadResult();
+        result.setTaskStatus(true);
+        try {
+            process = Runtime.getRuntime().exec(gploadCommand);
+            reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            List<String> lines = new ArrayList<>();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                lines.add(line);
+            }
+            log.info("gpload result: " + lines);
+            XxlJobHelper.log("gpload result: " + lines);
+            for (String s : lines) {
+                if (s.contains("|rows Inserted")) {
+                    result.setInsertedCount(getNum(s));
+                }
+                if (s.contains("|rows Updated")) {
+                    result.setUpdatedCount(getNum(s));
+                }
+                if (s.contains("|data formatting errors")) {
+                    result.setErrorCount(getNum(s));
+                }
+                if (s.contains("ERROR") || s.contains("gpload failed")) {
+                    result.setTaskStatus(false);
+                }
+                if (s.contains("ERROR")) {
+                    result.setMessage(s);
+                }
+            }
+        } catch (IOException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
+            XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
+            result.setTaskStatus(false);
+            result.setMessage(e.getMessage());
+        } finally {
+            log.debug("gpload的结果为: {}", result);
+            XxlJobHelper.log("gpload的结果为: {}", result);
+            destroy();
+        }
+        return result;
+    }
+
+    private static void destroy() {
+        if (reader != null) {
+            try {
+                reader.close();
+            } catch (IOException e) {
+                log.error("close reader error: {}", e.getMessage(), e);
+                XxlJobHelper.log("close reader error: {}", e.getMessage(), e);
+            }
+            reader = null;
+        }
+        if (process != null) {
+            process.destroy();
+            process = null;
+        }
+    }
+    /**
+     * 获取gpload统计条数
+     */
+    private static int getNum(String s) {
+        Matcher matcher = PATTERN.matcher(s);
+        if (matcher.find()) {
+            return Integer.parseInt(matcher.group(1));
+        }
+        return 0;
+    }
+}

+ 217 - 198
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -5,16 +5,20 @@ import com.nokia.common.ssh.entity.SSHServer;
 import com.nokia.common.ssh.entity.UserInfoImpl;
 import com.nokia.common.ssh.exception.SSHUtilException;
 import com.nokia.common.ssh.exception.ScpAckErrorException;
+import com.xxl.job.core.context.XxlJobHelper;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.*;
+import java.nio.file.Files;
 import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
 import java.util.Vector;
+import java.util.stream.Collectors;
 
 /**
  * 使用jsch库实现的ssh的工具类
@@ -51,41 +55,41 @@ public class SSHUtil {
     /**
      * 获取文件列表
      */
+    @SuppressWarnings("rawtypes")
     public List<String> ls(String path) throws JSchException, SftpException {
-        log.info("ls1");
-        session = getConnectSession();
-        log.info("ls2");
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        log.info("ls3");
-        channelSftp.connect();
-        log.info("ls4");
+        getConnectSession();
+        channelSftpConnect();
         List<String> fileNameList = new ArrayList<>();
-        log.info("ls5");
         Vector fileList = channelSftp.ls(path);
         for (Object o : fileList) {
             String fileName = ((ChannelSftp.LsEntry) o).getFilename();
             if (".".equals(fileName) || "..".equals(fileName)) {
                 continue;
             }
-
             fileNameList.add(fileName);
-            channelSftp.quit();
-            session.disconnect();
         }
-        return fileNameList;
+
+        return fileNameList.stream().sorted().collect(Collectors.toList());
+    }
+
+    /**
+     * 下载文件
+     */
+    public void get(String src, String dst) throws JSchException, SftpException, IOException {
+        try (OutputStream out = Files.newOutputStream(Paths.get(dst))) {
+            getConnectSession();
+            channelSftpConnect();
+            channelSftp.get(src, out);
+        }
     }
 
     /**
      * 删除文件
      */
-    public void delete(String fileName) throws JSchException, SftpException {
-        session = getConnectSession();
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        channelSftp.connect();
-        System.out.println(fileName);
-        channelSftp.rm(fileName);
-        channelSftp.quit();
-        session.disconnect();
+    public void rm(String path) throws JSchException, SftpException {
+        getConnectSession();
+        channelSftpConnect();
+        channelSftp.rm(path);
     }
 
     /**
@@ -93,41 +97,37 @@ public class SSHUtil {
      */
     public String exec(String command) throws JSchException, IOException {
         StringBuilder stringBuilder = new StringBuilder();
-        try {
-            session = getConnectSession();
-            channel = session.openChannel("exec");
-            // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
-            String execCommand;
-            // 在命令前添加 bash --login -c "command"以获取环境变量
-            // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
-            if (command.startsWith("bash --login -c")) {
-                execCommand = command;
-            } else {
-                execCommand = String.format("bash --login -c \"%s\"", command);
-            }
-            ((ChannelExec) channel).setCommand(execCommand);
-            channel.setInputStream(null);
-            ((ChannelExec) channel).setErrStream(System.err);
-            InputStream in = channel.getInputStream();
-            channel.connect();
-            byte[] tmp = new byte[1024];
-            while (true) {
-                while (in.available() > 0) {
-                    int i = in.read(tmp, 0, 1024);
-                    if (i < 0) {
-                        break;
-                    }
-                    stringBuilder.append(new String(tmp, 0, i));
-                }
-                if (channel.isClosed()) {
-                    if (in.available() > 0) {
-                        continue;
-                    }
+        getConnectSession();
+        channel = session.openChannel("exec");
+        // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
+        String execCommand;
+        // 在命令前添加 bash --login -c "command"以获取环境变量
+        // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
+        if (command.startsWith("bash --login -c")) {
+            execCommand = command;
+        } else {
+            execCommand = String.format("bash --login -c \"%s\"", command);
+        }
+        ((ChannelExec) channel).setCommand(execCommand);
+        channel.setInputStream(null);
+        ((ChannelExec) channel).setErrStream(System.err);
+        InputStream in = channel.getInputStream();
+        channel.connect();
+        byte[] tmp = new byte[1024];
+        while (true) {
+            while (in.available() > 0) {
+                int i = in.read(tmp, 0, 1024);
+                if (i < 0) {
                     break;
                 }
+                stringBuilder.append(new String(tmp, 0, i));
+            }
+            if (channel.isClosed()) {
+                if (in.available() > 0) {
+                    continue;
+                }
+                break;
             }
-        } finally {
-            disconnect();
         }
         return stringBuilder.toString();
     }
@@ -138,171 +138,185 @@ public class SSHUtil {
      * 注意,文件名不能包含中文
      */
     public boolean scpTo(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
-        try {
-            session = getConnectSession();
-            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
-            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去
-            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
-            // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
-            String command = "scp " + "-t " + targetPath;
-            channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-            outputStream = channel.getOutputStream();
-            inputStream = channel.getInputStream();
-            channel.connect();
-            if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
-                return false;
-            }
-            File sourceFile = new File(sourceFilePath);
-            if (sourceFile.isDirectory()) {
-                log.error("sourceFilePath 必须是文件");
-                return false;
-            }
-            long fileSize = sourceFile.length();
-            command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
-            outputStream.write(command.getBytes());
-            outputStream.flush();
-            if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
-                return false;
-            }
-            fileInputStream = new FileInputStream(sourceFile);
-            byte[] buffer = new byte[1024];
-            while (true) {
-                int len = fileInputStream.read(buffer, 0, buffer.length);
-                if (len <= 0) {
-                    break;
-                }
-                outputStream.write(buffer, 0, len);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的
+        // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp " + "-t " + targetPath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        File sourceFile = new File(sourceFilePath);
+        if (sourceFile.isDirectory()) {
+            log.error("sourceFilePath 必须是文件");
+            XxlJobHelper.log("sourceFilePath 必须是文件");
+            return false;
+        }
+        long fileSize = sourceFile.length();
+        command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
+        outputStream.write(command.getBytes());
+        outputStream.flush();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        fileInputStream = new FileInputStream(sourceFile);
+        byte[] buffer = new byte[1024];
+        while (true) {
+            int len = fileInputStream.read(buffer, 0, buffer.length);
+            if (len <= 0) {
+                break;
             }
-            buffer[0] = 0;
-            outputStream.write(buffer, 0, 1);
-            outputStream.flush();
-            return checkAck(inputStream) == 0;
-        } finally {
-            disconnect();
+            outputStream.write(buffer, 0, len);
         }
+        buffer[0] = 0;
+        outputStream.write(buffer, 0, 1);
+        outputStream.flush();
+        return checkAck(inputStream) == 0;
     }
 
     /**
      * 使用scp把targetServer目录下的文件复制到本地
      */
     public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
-        try {
-            log.debug(sourceFilePath);
-            session = getConnectSession();
-            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
-            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
-            // 需要说明的是,。
-            // -f 指定对端为from 也就是源端模式 指定的对象就是se这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的ssion对应的连接对象targetServer
-            String command = "scp -f " + sourceFilePath;
-            Channel channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-            outputStream = channel.getOutputStream();
-            inputStream = channel.getInputStream();
-            channel.connect();
-            byte[] buf = new byte[1024];
-            // 发送指令 '0'
-            // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
-            // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
-            // 当文件接收完成后, 宿端会给源端发送一个'0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            // 接收C0644 这条消息携带了文件的信息
-            while (true) {
-                int c = checkAck(inputStream);
-                // 遇到C时跳出循环
-                if (c == 'C') {
-                    break;
-                }
+        log.info(sourceFilePath);
+        XxlJobHelper.log(sourceFilePath);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp -f " + sourceFilePath;
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        byte[] buf = new byte[1024];
+        // 发送指令 '0'
+        // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
+        // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
+        // 当文件接收完成后, 宿端会给源端发送一个'0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 接收C0644 这条消息携带了文件的信息
+        while (true) {
+            int c = checkAck(inputStream);
+            // 遇到C时跳出循环
+            if (c == 'C') {
+                break;
             }
-            // 接收 '0644 ' 这段字符表示文件的权限
-            inputStream.read(buf, 0, 5);
-            // 获取filesize
-            long filesize = 0L;
-            while (true) {
-                if (inputStream.read(buf, 0, 1) < 0) {
-                    break;
-                }
-                if (buf[0] == ' ') {
-                    break;
-                }
-                filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 接收 '0644 ' 这段字符表示文件的权限
+        inputStream.read(buf, 0, 5);
+        // 获取filesize
+        long filesize = 0L;
+        while (true) {
+            if (inputStream.read(buf, 0, 1) < 0) {
+                break;
             }
-            // 从 C0644命令读取文件名,命令中的文件名是不带路径的
-            String file = null;
-            for (int i = 0; ; i++) {
-                inputStream.read(buf, i, 1);
-                // 0x0a 是LF 换行符
-                if (buf[i] == (byte) 0x0a) {
-                    file = new String(buf, 0, i);
-                    break;
-                }
+            if (buf[0] == ' ') {
+                break;
             }
-            log.debug("filesize={}, file={}", filesize, file);
-            // 发送 '0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            // 如果目标是目录,则需要加上文件名
-            File target = new File(targetPath);
-            if (target.isDirectory()) {
-                log.debug("{} 是目录,需要添加文件名", target.getAbsolutePath());
-                target = new File(targetPath + File.separator + file);
+            filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 从 C0644命令读取文件名,命令中的文件名是不带路径的
+        String file = null;
+        for (int i = 0; ; i++) {
+            inputStream.read(buf, i, 1);
+            // 0x0a 是LF 换行符
+            if (buf[i] == (byte) 0x0a) {
+                file = new String(buf, 0, i);
+                break;
             }
+        }
+        log.info("filesize={}, file={}", filesize, file);
+        XxlJobHelper.log("filesize={}, file={}", filesize, file);
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 如果目标是目录,则需要加上文件名
+        File target = new File(targetPath);
+        if (target.isDirectory()) {
+            log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            target = new File(targetPath + File.separator + file);
+        }
 
-            fileOutputStream = new FileOutputStream(target);
-            int foo;
-            while (true) {
-                if (buf.length < filesize) {
-                    foo = buf.length;
-                } else {
-                    foo = (int) filesize;
-                }
-                foo = inputStream.read(buf, 0, foo);
-                if (foo < 0) {
-                    break;
-                }
-                fileOutputStream.write(buf, 0, foo);
-                filesize -= foo;
-                if (filesize == 0L) {
-                    break;
-                }
+        fileOutputStream = new FileOutputStream(target);
+        int foo;
+        while (true) {
+            if (buf.length < filesize) {
+                foo = buf.length;
+            } else {
+                foo = (int) filesize;
+            }
+            foo = inputStream.read(buf, 0, foo);
+            if (foo < 0) {
+                break;
             }
-            if (checkAck(inputStream) != 0) {
-                return false;
+            fileOutputStream.write(buf, 0, foo);
+            filesize -= foo;
+            if (filesize == 0L) {
+                break;
             }
-            // 发送 '0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            log.debug("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
-            return true;
-        } finally {
-            disconnect();
         }
+        if (checkAck(inputStream) != 0) {
+            return false;
+        }
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        return true;
     }
 
-    private Session getConnectSession() throws JSchException {
-        log.info("user: {}, host: {}, port: {}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        jSch = new JSch();
-        session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        log.info("session {}",session);
-        session.setPassword(targetServer.getPassword());
-        session.setUserInfo(new UserInfoImpl());
-        // 不需要输入保存ssh安全密钥的yes或no
-        Properties properties = new Properties();
-        properties.put("StrictHostKeyChecking", "no");
-        session.setConfig(properties);
-        log.info("session {}",session);
-        session.connect();
-        log.debug("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        return session;
+    public void getConnectSession() throws JSchException {
+        if (jSch == null) {
+            jSch = new JSch();
+        }
+
+        if (session == null) {
+            session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            session.setPassword(targetServer.getPassword());
+            session.setUserInfo(new UserInfoImpl());
+            // 不需要输入保存ssh安全密钥的yes或no
+            Properties properties = new Properties();
+            properties.put("StrictHostKeyChecking", "no");
+            session.setConfig(properties);
+        }
+
+        if (!session.isConnected()) {
+            session.connect();
+            log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        }
     }
 
-    private void disconnect() throws IOException {
+    public void channelSftpConnect() throws JSchException {
+        if (channelSftp == null) {
+            channelSftp = (ChannelSftp) session.openChannel("sftp");
+        }
+
+        if (!channelSftp.isConnected()) {
+            channelSftp.connect();
+        }
+    }
+
+    public void disconnect() throws IOException {
         if (fileOutputStream != null) {
             fileOutputStream.close();
             fileOutputStream = null;
@@ -319,11 +333,15 @@ public class SSHUtil {
             channel.disconnect();
             channel = null;
         }
+        if (channelSftp != null) {
+            channelSftp.quit();
+        }
         if (session != null) {
             session.disconnect();
             session = null;
         }
         jSch = null;
+        log.info("jsch disconnected");
     }
 
     /**
@@ -352,7 +370,8 @@ public class SSHUtil {
                 c = in.read();
                 sb.append((char) c);
             } while (c != '\n');
-            log.debug("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            log.info("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
             if (b == 1 && sb.toString().endsWith("No such file or directory")) {
                 throw new NoSuchFileException(sb.toString());
             } else {

+ 65 - 43
src/main/java/com/nokia/task/LtePmTask.java

@@ -2,8 +2,9 @@ package com.nokia.task;
 
 import com.jcraft.jsch.JSchException;
 import com.jcraft.jsch.SftpException;
-import com.nokia.common.gpload.GploadUtil;
+
 import com.nokia.common.gpload.entity.GploadResult;
+import com.nokia.common.gpload.entity.GploadUtil;
 import com.nokia.common.ssh.SSHUtil;
 import com.nokia.common.ssh.exception.SSHUtilException;
 import com.xxl.job.core.context.XxlJobHelper;
@@ -16,12 +17,12 @@ import org.apache.commons.csv.CSVRecord;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.List;
@@ -47,32 +48,38 @@ public class LtePmTask {
     private String filePrefix;
     @Value("${lte.pm.distinct.targetDir:distinct/}")
     private String distinctTargetDir;
-
- /*   @XxlJob("execHandler")
+    SSHUtil sshUtil =null;
      public void Test(){
-        System.out.println("java------------------");
+        System.out.println(distinctTargetDir+filePrefix);
     }
-*/
 
     @XxlJob("execHandlerPm4G")
-    public void cronTask() throws JSchException, SSHUtilException, IOException, SftpException {
-        // 创建文件夹
-        Files.createDirectories(Paths.get(downloadTargetDir));
-        Files.createDirectories(Paths.get(distinctTargetDir));
-        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
-        // 获取文件列表
-        List<String> list = sshUtil.ls(sourceDir);
-        if (CollectionUtils.isEmpty(list)) {
-            return;
-        }
-        log.debug("扫描到的文件: {}", list);
-        XxlJobHelper.log("扫描到的文件: {}",list);
-        for (String s : list) {
-            String[] s1 = s.split("_");
-            String[] split = s1[3].split("\\.");
-            if (split[1].equals("csv")){
-                singleTask(split[0]);
+    public void cronTask() throws IOException {
+        try {
+            // 创建文件夹
+            Files.createDirectories(Paths.get(downloadTargetDir));
+            Files.createDirectories(Paths.get(distinctTargetDir));
+             sshUtil = new SSHUtil(host, port, username, password);
+            // 获取文件列表
+            List<String> list = sshUtil.ls(sourceDir);
+            if (CollectionUtils.isEmpty(list)) {
+                return;
+            }
+            log.debug("扫描到的文件: {}", list);
+            XxlJobHelper.log("扫描到的文件: {}",list);
+            for (String s : list) {
+                String[] s1 = s.split("_");
+                String[] split = s1[3].split("\\.");
+                if (split[1].equals("csv")){
+                    singleTask(split[0]);
+                }
             }
+        }catch (Exception e){
+            log.error("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.handleFail(e.getMessage());
+        } finally {
+            sshUtil.disconnect();
         }
     }
 
@@ -85,13 +92,13 @@ public class LtePmTask {
 
     /*@AllowedTaskType({TaskType.TIMING, TaskType.IMMEDIATE})*/
     public void singleTask(String hourString) throws JSchException, SSHUtilException, IOException, SftpException {
+        log.info("----------------------------------------hourString {} ",hourString);
         download(hourString);
         distinct(hourString);
         gpload(hourString);
     }
 
     public void download(String hourString) throws JSchException, SSHUtilException, IOException, SftpException {
-        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
         String downloadFileName = filePrefix + hourString + ".csv";
         String sourceFilePath = sourceDir + "/" + downloadFileName;
         File targetFile = new File(downloadTargetDir + downloadFileName);
@@ -103,41 +110,56 @@ public class LtePmTask {
         if (b) {
             log.debug("文件 {} 下载成功...", targetPath);
             XxlJobHelper.log("文件 {} 下载成功...",targetPath);
-           sshUtil.delete(sourceFilePath);
+            sshUtil.rm(sourceFilePath);
         }
     }
-
-    public void distinct(String hourString) throws IOException {
+    /**
+     * 去重
+     *
+     * @param hourString 文件名
+     */
+   public void distinct(String hourString) throws IOException {
         String fileName = filePrefix + hourString + ".csv";
         String inputFilePath = downloadTargetDir + fileName;
         String outputFilePath = distinctTargetDir + fileName;
+        Path path = Paths.get(hourString);
         CSVFormat format = CSVFormat.DEFAULT.builder().build();
-        CSVParser records = format.parse(new InputStreamReader(Files.newInputStream(Paths.get(inputFilePath)), StandardCharsets.UTF_8));
-        Map<String, CSVRecord> recordMap = new HashMap<>();
-        int count = 0;
-        for (CSVRecord record : records) {
-            recordMap.put(record.get(1), record);
-            count++;
-        }
-        CSVPrinter printer = format.print(new File(outputFilePath), StandardCharsets.UTF_8);
-        for (CSVRecord record : recordMap.values()) {
-            printer.printRecord(record);
+        try (CSVParser records = format.parse(new InputStreamReader(Files.newInputStream(Paths.get(inputFilePath)), StandardCharsets.UTF_8));
+             CSVPrinter printer = format.print(new File(outputFilePath), StandardCharsets.UTF_8)
+        ) {
+            Map<String, CSVRecord> recordMap = new HashMap<>();
+            int count = 0;
+            for (CSVRecord record : records) {
+                recordMap.put(record.get(2), record);
+                count++;
+            }
+            for (CSVRecord record : recordMap.values()) {
+                printer.printRecord(record);
+            }
+            //删除本地源文件
+            Files.deleteIfExists(path);
+            log.debug("去重完成,原文件{}条数据,去重后{}条数据", count, recordMap.size());
+            XxlJobHelper.log("去重完成,原文件{}条数据,去重后{}条数据",count, recordMap.size());
         }
-        printer.flush();
-        printer.close();
-        log.debug("去重完成,原文件{}条数据,去重后{}条数据", count, recordMap.size());
-        XxlJobHelper.log("去重完成,原文件{}条数据,去重后{}条数据",count, recordMap.size());
     }
 
-    public void gpload(String hourString) {
-        String gploadCommand = "sh /data1/pm/pm_4g/pm_lte_gpload.sh " + hourString;
+    public void gpload(String hourString) throws IOException {
+        String fileName = filePrefix + hourString + ".csv";
+        String gploadCommand = "sh /data/pm/pm_4g/pm_lte_gpload.sh " + hourString;
         GploadResult gpload = GploadUtil.gpload(gploadCommand);
         if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
             log.debug("gpload 完成: {}", gpload);
             XxlJobHelper.log("gpload 完成: {}", gpload);
+            File file = new File("/data/pm/pm_4g/" + distinctTargetDir + fileName);
+            File file1 = new File("/data/pm/pm_4g/file/" + fileName);
+            if (file.renameTo(file1)){
+                file.delete();
+            }
+            // Files.deleteIfExists(Paths.get(distinctTargetDir + fileName));
         } else {
             log.error("gpload 失败: {}", gpload.getMessage());
             XxlJobHelper.log("gpload 失败: {}", gpload);
+            throw new RuntimeException(gpload.getMessage());
         }
     }
 }

+ 3 - 2
src/main/resources/application.properties

@@ -16,7 +16,8 @@ scheduling.scheduler.pool.size=3
 # pm下载相关配置
 
 ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
-xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+##xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+xxl.job.admin.addresses=http://127.0.0.1:8087/xxl-job-admin
 ### xxl-job, access token
 xxl.job.accessToken=
 
@@ -28,6 +29,6 @@ xxl.job.executor.address=
 xxl.job.executor.ip=
 xxl.job.executor.port=9090
 ### xxl-job executor log-path
-xxl.job.executor.logpath=/data1/pm/pm_4g/log
+xxl.job.executor.logpath=/data/pm/pm_4g/log
 ### xxl-job executor log-retention-days
 xxl.job.executor.logretentiondays=30

+ 22 - 7
src/test/java/com/nokia/task/LtePmTaskTest.java

@@ -5,13 +5,18 @@ import com.jcraft.jsch.SftpException;
 import com.nokia.PmInterfaceApplication;
 import com.nokia.common.ssh.exception.SSHUtilException;
 import org.junit.jupiter.api.Test;
+import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import java.io.IOException;
-
-import static org.junit.jupiter.api.Assertions.*;
-
+import java.text.SimpleDateFormat;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.util.Date;
+import java.util.TimeZone;
+@RunWith(SpringJUnit4ClassRunner.class)
 @SpringBootTest(classes = {PmInterfaceApplication.class})
 class LtePmTaskTest {
 
@@ -23,14 +28,24 @@ class LtePmTaskTest {
         task.download("2022053007");
     }
 
+  */
+
     @Test
     void distinct() throws IOException {
-        task.distinct("2022053007");
+        //task.distinct("C:\\Users\\DELL\\Desktop\\pm_4g_hour_2022102422.csv");
+        //获取当前日期和时间
+        LocalDateTime currentDateTime = LocalDateTime.now();
+        //定义想要的时间格式
+        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMddHH");
+        //格式化当前日期和时间
+        String formattedDateTime = currentDateTime.format(formatter);
+        //输出格式化后的时间
+        System.out.println("Formatted DateTime: " + formattedDateTime);
+
     }
-    */
 
     @Test
-    public void Test4g() throws SftpException, SSHUtilException, JSchException, IOException {
-        task.cronTask();
+    public void Test4g(){
+
     }
 }