Ver Fonte

perf: 优化sshutil

weijianghai há 2 anos atrás
pai
commit
05afe78c8c

+ 197 - 198
sms_blk_interface/src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -57,9 +57,8 @@ public class SSHUtil {
      */
     @SuppressWarnings("rawtypes")
     public List<String> ls(String path) throws JSchException, SftpException {
-        session = getConnectSession();
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        channelSftp.connect();
+        getConnectSession();
+        channelSftpConnect();
         List<String> fileNameList = new ArrayList<>();
         Vector fileList = channelSftp.ls(path);
         for (Object o : fileList) {
@@ -68,8 +67,6 @@ public class SSHUtil {
                 continue;
             }
             fileNameList.add(fileName);
-            channelSftp.quit();
-            session.disconnect();
         }
 
         return fileNameList.stream().sorted().collect(Collectors.toList());
@@ -79,14 +76,10 @@ public class SSHUtil {
      * 下载文件
      */
     public void get(String src, String dst) throws JSchException, SftpException, IOException {
-        session = getConnectSession();
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        channelSftp.connect();
         try (OutputStream out = Files.newOutputStream(Paths.get(dst))) {
+            getConnectSession();
+            channelSftpConnect();
             channelSftp.get(src, out);
-        } finally {
-            channelSftp.quit();
-            session.disconnect();
         }
     }
 
@@ -94,12 +87,9 @@ public class SSHUtil {
      * 删除文件
      */
     public void rm(String path) throws JSchException, SftpException {
-        session = getConnectSession();
-        channelSftp = (ChannelSftp) session.openChannel("sftp");
-        channelSftp.connect();
+        getConnectSession();
+        channelSftpConnect();
         channelSftp.rm(path);
-        channelSftp.quit();
-        session.disconnect();
     }
 
     /**
@@ -107,41 +97,37 @@ public class SSHUtil {
      */
     public String exec(String command) throws JSchException, IOException {
         StringBuilder stringBuilder = new StringBuilder();
-        try {
-            session = getConnectSession();
-            channel = session.openChannel("exec");
-            // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
-            String execCommand;
-            // 在命令前添加 bash --login -c "command"以获取环境变量
-            // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
-            if (command.startsWith("bash --login -c")) {
-                execCommand = command;
-            } else {
-                execCommand = String.format("bash --login -c \"%s\"", command);
-            }
-            ((ChannelExec) channel).setCommand(execCommand);
-            channel.setInputStream(null);
-            ((ChannelExec) channel).setErrStream(System.err);
-            InputStream in = channel.getInputStream();
-            channel.connect();
-            byte[] tmp = new byte[1024];
-            while (true) {
-                while (in.available() > 0) {
-                    int i = in.read(tmp, 0, 1024);
-                    if (i < 0) {
-                        break;
-                    }
-                    stringBuilder.append(new String(tmp, 0, i));
-                }
-                if (channel.isClosed()) {
-                    if (in.available() > 0) {
-                        continue;
-                    }
+        getConnectSession();
+        channel = session.openChannel("exec");
+        // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
+        String execCommand;
+        // 在命令前添加 bash --login -c "command"以获取环境变量
+        // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
+        if (command.startsWith("bash --login -c")) {
+            execCommand = command;
+        } else {
+            execCommand = String.format("bash --login -c \"%s\"", command);
+        }
+        ((ChannelExec) channel).setCommand(execCommand);
+        channel.setInputStream(null);
+        ((ChannelExec) channel).setErrStream(System.err);
+        InputStream in = channel.getInputStream();
+        channel.connect();
+        byte[] tmp = new byte[1024];
+        while (true) {
+            while (in.available() > 0) {
+                int i = in.read(tmp, 0, 1024);
+                if (i < 0) {
                     break;
                 }
+                stringBuilder.append(new String(tmp, 0, i));
+            }
+            if (channel.isClosed()) {
+                if (in.available() > 0) {
+                    continue;
+                }
+                break;
             }
-        } finally {
-            disconnect();
         }
         return stringBuilder.toString();
     }
@@ -152,176 +138,185 @@ public class SSHUtil {
      * 注意,文件名不能包含中文
      */
     public boolean scpTo(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
-        try {
-            session = getConnectSession();
-            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
-            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
-            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
-            // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
-            String command = "scp " + "-t " + targetPath;
-            channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-            outputStream = channel.getOutputStream();
-            inputStream = channel.getInputStream();
-            channel.connect();
-            if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
-                XxlJobHelper.log("scpTo 执行失败");
-                return false;
-            }
-            File sourceFile = new File(sourceFilePath);
-            if (sourceFile.isDirectory()) {
-                log.error("sourceFilePath 必须是文件");
-                XxlJobHelper.log("sourceFilePath 必须是文件");
-                return false;
-            }
-            long fileSize = sourceFile.length();
-            command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
-            outputStream.write(command.getBytes());
-            outputStream.flush();
-            if (checkAck(inputStream) != 0) {
-                log.error("scpTo 执行失败");
-                XxlJobHelper.log("scpTo 执行失败");
-                return false;
-            }
-            fileInputStream = new FileInputStream(sourceFile);
-            byte[] buffer = new byte[1024];
-            while (true) {
-                int len = fileInputStream.read(buffer, 0, buffer.length);
-                if (len <= 0) {
-                    break;
-                }
-                outputStream.write(buffer, 0, len);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp " + "-t " + targetPath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        File sourceFile = new File(sourceFilePath);
+        if (sourceFile.isDirectory()) {
+            log.error("sourceFilePath 必须是文件");
+            XxlJobHelper.log("sourceFilePath 必须是文件");
+            return false;
+        }
+        long fileSize = sourceFile.length();
+        command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
+        outputStream.write(command.getBytes());
+        outputStream.flush();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        fileInputStream = new FileInputStream(sourceFile);
+        byte[] buffer = new byte[1024];
+        while (true) {
+            int len = fileInputStream.read(buffer, 0, buffer.length);
+            if (len <= 0) {
+                break;
             }
-            buffer[0] = 0;
-            outputStream.write(buffer, 0, 1);
-            outputStream.flush();
-            return checkAck(inputStream) == 0;
-        } finally {
-            disconnect();
+            outputStream.write(buffer, 0, len);
         }
+        buffer[0] = 0;
+        outputStream.write(buffer, 0, 1);
+        outputStream.flush();
+        return checkAck(inputStream) == 0;
     }
 
     /**
      * 使用scp把targetServer目录下的文件复制到本地
      */
     public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
-        try {
-            log.info(sourceFilePath);
-            XxlJobHelper.log(sourceFilePath);
-            session = getConnectSession();
-            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
-            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
-            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
-            // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
-            String command = "scp -f " + sourceFilePath;
-            Channel channel = session.openChannel("exec");
-            ((ChannelExec) channel).setCommand(command);
-            outputStream = channel.getOutputStream();
-            inputStream = channel.getInputStream();
-            channel.connect();
-            byte[] buf = new byte[1024];
-            // 发送指令 '0'
-            // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
-            // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
-            // 当文件接收完成后, 宿端会给源端发送一个'0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            // 接收C0644 这条消息携带了文件的信息
-            while (true) {
-                int c = checkAck(inputStream);
-                // 遇到C时跳出循环
-                if (c == 'C') {
-                    break;
-                }
+        log.info(sourceFilePath);
+        XxlJobHelper.log(sourceFilePath);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp -f " + sourceFilePath;
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        byte[] buf = new byte[1024];
+        // 发送指令 '0'
+        // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
+        // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
+        // 当文件接收完成后, 宿端会给源端发送一个'0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 接收C0644 这条消息携带了文件的信息
+        while (true) {
+            int c = checkAck(inputStream);
+            // 遇到C时跳出循环
+            if (c == 'C') {
+                break;
             }
-            // 接收 '0644 ' 这段字符表示文件的权限
-            inputStream.read(buf, 0, 5);
-            // 获取filesize
-            long filesize = 0L;
-            while (true) {
-                if (inputStream.read(buf, 0, 1) < 0) {
-                    break;
-                }
-                if (buf[0] == ' ') {
-                    break;
-                }
-                filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 接收 '0644 ' 这段字符表示文件的权限
+        inputStream.read(buf, 0, 5);
+        // 获取filesize
+        long filesize = 0L;
+        while (true) {
+            if (inputStream.read(buf, 0, 1) < 0) {
+                break;
             }
-            // 从 C0644命令读取文件名,命令中的文件名是不带路径的
-            String file = null;
-            for (int i = 0; ; i++) {
-                inputStream.read(buf, i, 1);
-                // 0x0a 是LF 换行符
-                if (buf[i] == (byte) 0x0a) {
-                    file = new String(buf, 0, i);
-                    break;
-                }
+            if (buf[0] == ' ') {
+                break;
             }
-            log.info("filesize={}, file={}", filesize, file);
-            XxlJobHelper.log("filesize={}, file={}", filesize, file);
-            // 发送 '0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            // 如果目标是目录,则需要加上文件名
-            File target = new File(targetPath);
-            if (target.isDirectory()) {
-                log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
-                XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
-                target = new File(targetPath + File.separator + file);
+            filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 从 C0644命令读取文件名,命令中的文件名是不带路径的
+        String file = null;
+        for (int i = 0; ; i++) {
+            inputStream.read(buf, i, 1);
+            // 0x0a 是LF 换行符
+            if (buf[i] == (byte) 0x0a) {
+                file = new String(buf, 0, i);
+                break;
             }
+        }
+        log.info("filesize={}, file={}", filesize, file);
+        XxlJobHelper.log("filesize={}, file={}", filesize, file);
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 如果目标是目录,则需要加上文件名
+        File target = new File(targetPath);
+        if (target.isDirectory()) {
+            log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            target = new File(targetPath + File.separator + file);
+        }
 
-            fileOutputStream = new FileOutputStream(target);
-            int foo;
-            while (true) {
-                if (buf.length < filesize) {
-                    foo = buf.length;
-                } else {
-                    foo = (int) filesize;
-                }
-                foo = inputStream.read(buf, 0, foo);
-                if (foo < 0) {
-                    break;
-                }
-                fileOutputStream.write(buf, 0, foo);
-                filesize -= foo;
-                if (filesize == 0L) {
-                    break;
-                }
+        fileOutputStream = new FileOutputStream(target);
+        int foo;
+        while (true) {
+            if (buf.length < filesize) {
+                foo = buf.length;
+            } else {
+                foo = (int) filesize;
+            }
+            foo = inputStream.read(buf, 0, foo);
+            if (foo < 0) {
+                break;
             }
-            if (checkAck(inputStream) != 0) {
-                return false;
+            fileOutputStream.write(buf, 0, foo);
+            filesize -= foo;
+            if (filesize == 0L) {
+                break;
             }
-            // 发送 '0'
-            buf[0] = 0;
-            outputStream.write(buf, 0, 1);
-            outputStream.flush();
-            log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
-            XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
-            return true;
-        } finally {
-            disconnect();
         }
+        if (checkAck(inputStream) != 0) {
+            return false;
+        }
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        return true;
     }
 
-    private Session getConnectSession() throws JSchException {
-        jSch = new JSch();
-        session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        session.setPassword(targetServer.getPassword());
-        session.setUserInfo(new UserInfoImpl());
-        // 不需要输入保存ssh安全密钥的yes或no
-        Properties properties = new Properties();
-        properties.put("StrictHostKeyChecking", "no");
-        session.setConfig(properties);
-        session.connect();
-        log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
-        return session;
+    public void getConnectSession() throws JSchException {
+        if (jSch == null) {
+            jSch = new JSch();
+        }
+
+        if (session == null) {
+            session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            session.setPassword(targetServer.getPassword());
+            session.setUserInfo(new UserInfoImpl());
+            // 不需要输入保存ssh安全密钥的yes或no
+            Properties properties = new Properties();
+            properties.put("StrictHostKeyChecking", "no");
+            session.setConfig(properties);
+        }
+
+        if (!session.isConnected()) {
+            session.connect();
+            log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        }
+    }
+
+    public void channelSftpConnect() throws JSchException {
+        if (channelSftp == null) {
+            channelSftp = (ChannelSftp) session.openChannel("sftp");
+        }
+
+        if (!channelSftp.isConnected()) {
+            channelSftp.connect();
+        }
     }
 
-    private void disconnect() throws IOException {
+    public void disconnect() throws IOException {
         if (fileOutputStream != null) {
             fileOutputStream.close();
             fileOutputStream = null;
@@ -338,11 +333,15 @@ public class SSHUtil {
             channel.disconnect();
             channel = null;
         }
+        if (channelSftp != null) {
+            channelSftp.quit();
+        }
         if (session != null) {
             session.disconnect();
             session = null;
         }
         jSch = null;
+        log.info("jsch disconnected");
     }
 
     /**

+ 37 - 40
sms_blk_interface/src/main/java/com/nokia/sms/task/SyncTask.java

@@ -21,27 +21,33 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.time.LocalDateTime;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 @Component
 @Slf4j
 public class SyncTask {
+    /**
+     * 无文件次数
+     */
+    public static final AtomicInteger NO_FILE_COUNT = new AtomicInteger(0);
     /**
      * 记录上次时间的文件路径
      */
     public static final String LAST_FILE = "/data1/sms_blk/sms_blk_interface/last.txt";
     /**
-     * 记录无文件次数的文件
+     * 无文件次数告警值
      */
-    public static final String COUNT_FILE = "/data1/sms_blk/sms_blk_interface/count.txt";
+    public static final int ALERT_COUNT = 15;
     /**
-     * 无文件次数告警
+     * 两个文件时间间隔告警阈
      */
-    public static final int ALERT_COUNT = 10;
+    public static final int INTERVAL = 6;
     public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
     @Value("${sms.blk.ssh.host:10.17.182.24}")
     private String host;
@@ -55,6 +61,7 @@ public class SyncTask {
     private String sourceDir;
     @Value("${sms.blk.ssh.targetDir:download/}")
     private String downloadTargetDir;
+    private SSHUtil sshUtil;
 
     private final JdbcTemplate jdbcTemplate;
 
@@ -67,33 +74,35 @@ public class SyncTask {
         try {
             // 创建文件夹
             Files.createDirectories(Paths.get(downloadTargetDir));
-            SSHUtil sshUtil = new SSHUtil(host, port, username, password);
+            sshUtil = new SSHUtil(host, port, username, password);
             // 获取文件列表
             List<String> list = sshUtil.ls(sourceDir);
             if (CollectionUtils.isEmpty(list)) {
-                log.info("没有文件");
-                XxlJobHelper.log("没有文件");
-                String countString = readCount();
-                int count = StringUtils.hasText(countString) ? Integer.parseInt(countString) : 0;
+                int count = NO_FILE_COUNT.incrementAndGet();
+                log.info("{}次没有文件", count);
+                XxlJobHelper.log("{}次没有文件", count);
+                // 无文件次数告警
                 if (count > ALERT_COUNT) {
                     log.error("超过{}分钟没有文件", ALERT_COUNT);
                     XxlJobHelper.log("超过{}分钟没有文件", ALERT_COUNT);
-                    updateCount(0);
-                } else {
-                    updateCount(count + 1);
+                    // 重置无文件计数
+                    NO_FILE_COUNT.set(0);
                 }
                 return;
             }
 
+            // 重置无文件计数
+            NO_FILE_COUNT.set(0);
             log.info("扫描到的文件: {}", list);
             XxlJobHelper.log("扫描到的文件: {}", list);
-            // 过滤临时文件
-            List<String> newList = list.stream().filter(t -> !t.endsWith(".tmp")).collect(Collectors.toList());
+            // 过滤文件
+            List<String> newList = list.stream().filter(t -> t.startsWith("Blacklist_SASS_") && !t.endsWith(".tmp"))
+                    .collect(Collectors.toList());
             // 检查文件
             String lastString = readLast();
             log.info("last: {}", lastString);
             LocalDateTime last = StringUtils.hasText(lastString)
-                    ? getLocalDateTime(lastString).plusMinutes(5)
+                    ? getLocalDateTime(lastString)
                     : getLocalDateTime(getTimeString(newList.get(0)));
             int size = list.size();
             checkFile(last, list, size, 0);
@@ -113,6 +122,14 @@ public class SyncTask {
             log.error("发生异常了: {}", e.getMessage(), e);
             XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
             XxlJobHelper.handleFail(e.getMessage());
+        } finally {
+            try {
+                sshUtil.disconnect();
+            } catch (IOException e) {
+                log.error("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.handleFail(e.getMessage());
+            }
         }
     }
 
@@ -130,20 +147,6 @@ public class SyncTask {
         return readFile(LAST_FILE);
     }
 
-    /**
-     * 更新无文件次数
-     */
-    public static void updateCount(int count) {
-        writeFile(COUNT_FILE, String.valueOf(count));
-    }
-
-    /**
-     * 获取无文件次数
-     */
-    public static String readCount() {
-        return readFile(COUNT_FILE);
-    }
-
     /**
      * 写文件
      * @param filename 文件名
@@ -190,17 +193,12 @@ public class SyncTask {
         }
 
         LocalDateTime t2 = getLocalDateTime(getTimeString(list.get(index)));
-        if (t1.isEqual(t2)) {
-            checkFile(t1.plusMinutes(5), list, size, index + 1);
-        } else if (t1.isAfter(t2)) {
-            log.error("{} 文件重复上传", getTimeString(t2));
-            XxlJobHelper.log("{} 文件重复上传", getTimeString(t2));
-            checkFile(t1, list, size, index + 1);
-        } else {
-            log.error("{} 文件未上传", getTimeString(t1));
-            XxlJobHelper.log("{} 文件未上传", getTimeString(t1));
-            checkFile(t1.plusMinutes(5), list, size, index);
+        long diff = Duration.between(t1, t2).toMinutes();
+        if (diff > INTERVAL) {
+            log.error("{} ~ {} 之间缺少文件", getTimeString(t1), getTimeString(t2));
         }
+
+        checkFile(t2, list, size, index + 1);
     }
 
     /**
@@ -242,7 +240,6 @@ public class SyncTask {
     public void download(String filename) throws JSchException, IOException, SftpException {
         log.info("下载文件: {}", filename);
         XxlJobHelper.log("下载文件: {}", filename);
-        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
         String src = sourceDir + filename;
         String dst = downloadTargetDir + filename;
         sshUtil.get(src, dst);

+ 3 - 3
sms_blk_interface/src/main/resources/logback-spring.xml

@@ -3,7 +3,7 @@
     <property name="PATH" value="./log"/>
     <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
-            <Pattern>%d{HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+            <Pattern>%d{HH:mm:ss.SSS} %highlight(%-5level) %yellow(%X{traceId}) %cyan(%logger:%line) %msg%n</Pattern>
         </encoder>
     </appender>
     <appender name="TRACE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -17,7 +17,7 @@
             <totalSizeCap>20GB</totalSizeCap>
         </rollingPolicy>
         <encoder>
-            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+            <Pattern>%d %highlight(%-5level) %yellow(%X{traceId}) %cyan(%logger:%line) %msg%n</Pattern>
         </encoder>
     </appender>
     <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
@@ -31,7 +31,7 @@
             <totalSizeCap>20GB</totalSizeCap>
         </rollingPolicy>
         <encoder>
-            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+            <Pattern>%d %highlight(%-5level) %yellow(%X{traceId}) %cyan(%logger:%line) %msg%n</Pattern>
         </encoder>
         <filter class="ch.qos.logback.classic.filter.LevelFilter">
             <level>ERROR</level>

+ 95 - 16
sms_blk_interface/src/test/java/com/nokia/sms/SyncTest.java

@@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.dao.EmptyResultDataAccessException;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
@@ -43,7 +44,7 @@ class SyncTest {
         Map<String, AtomicLong> phoneCount = new HashMap<>();
         try (CsvReader csvReader = CsvReader.builder().build(Paths.get(p1), StandardCharsets.UTF_8);
              OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(phoneFile)),
-                StandardCharsets.UTF_8);
+                     StandardCharsets.UTF_8);
              CsvWriter csvWriter = CsvWriter.builder().build(osw);) {
             for (CsvRow row : csvReader) {
                 if (row.getOriginalLineNumber() == 1) {
@@ -68,7 +69,7 @@ class SyncTest {
                      StandardCharsets.UTF_8);
              CsvWriter csvWriter1 = CsvWriter.builder().build(osw1);
              OutputStreamWriter osw2 = new OutputStreamWriter(Files.newOutputStream(Paths.get(r2)),
-                StandardCharsets.UTF_8);
+                     StandardCharsets.UTF_8);
              CsvWriter csvWriter2 = CsvWriter.builder().build(osw2);) {
             Map<String, List<String>> map = new HashMap<>();
             for (CsvRow csvRow : csvReader) {
@@ -129,6 +130,8 @@ class SyncTest {
         String r2 = TEST_DIR + "result2.csv";
         String d1 = TEST_DIR + "diff1.csv";
         String d2 = TEST_DIR + "diff2.csv";
+        String o1 = TEST_DIR + "output1.csv";
+        String o2 = TEST_DIR + "output2.csv";
         try {
             try (CsvReader csvReader = CsvReader.builder().build(Paths.get(p1), StandardCharsets.UTF_8)) {
                 for (CsvRow row : csvReader) {
@@ -168,13 +171,18 @@ class SyncTest {
                  OutputStreamWriter osw2 = new OutputStreamWriter(Files.newOutputStream(Paths.get(d1)),
                          StandardCharsets.UTF_8);
                  CsvWriter csvWriter2 = CsvWriter.builder().build(osw2);
-                 ) {
-                csvWriter1.writeRow("黑名单号码", "入黑名单原因", "开始时间", "结束时间", "解黑建议", "是否一致", "不一致原因", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型");
+                 OutputStreamWriter osw3 = new OutputStreamWriter(Files.newOutputStream(Paths.get(o1)),
+                         StandardCharsets.UTF_8);
+                 CsvWriter csvWriter3 = CsvWriter.builder().build(osw3);
+            ) {
+                csvWriter1.writeRow("黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型", "是否一致", "不一致原因");
+                csvWriter3.writeRow("不一致情况", "黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "入黑名单原因", "解黑建议", "开始时间", "结束时间");
                 for (Map<String, String> v1 : m1.values()) {
                     String phone = v1.get("phone");
                     Map<String, String> v2 = m2.getOrDefault(phone, new HashMap<>());
                     String flag = "是";
                     String diff = "";
+                    String situation = "";
                     if (!v1.get("reason").equals(v2.get("reason"))) {
                         diff = "入黑名单原因、";
                     }
@@ -189,23 +197,33 @@ class SyncTest {
                     }
                     if (CollectionUtils.isEmpty(v2)) {
                         diff = "数据库中没有记录";
+                        csvWriter3.writeRow(situation, phone, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v1.get("reason"), v1.get("suggestion"), v1.get("start_time"), v1.get("end_time"));
                     }
                     if (StringUtils.hasText(diff)) {
                         flag = "否";
-                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "开始时间", "结束时间", "解黑建议", "是否一致", "不一致原因", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型");
-                        csvWriter2.writeRow(phone, v1.get("reason"), v1.get("start_time"), v1.get("end_time"), v1.get("suggestion"), flag, diff, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"));
-                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "开始时间", "结束时间", "命中策略说明", "操作类型", "解黑建议", "同步文件名");
+                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型", "是否一致", "不一致原因");
+                        csvWriter2.writeRow(phone, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v1.get("reason"), v1.get("suggestion"), v1.get("start_time"), v1.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"), flag, diff);
+                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型", "同步文件名");
                         try (CsvReader csvReader = CsvReader.builder().build(Paths.get(record), StandardCharsets.UTF_8)) {
                             for (CsvRow row : csvReader) {
                                 if (row.getOriginalLineNumber() == 1 || !phone.equals(row.getField(0))) {
                                     continue;
                                 }
-                                csvWriter2.writeRow(row.getFields());
+                                csvWriter2.writeRow(
+                                        row.getField(0),
+                                        row.getField(1),
+                                        row.getField(6),
+                                        row.getField(2),
+                                        row.getField(3),
+                                        row.getField(4),
+                                        row.getField(5),
+                                        row.getField(7)
+                                );
                             }
                         }
                         csvWriter2.writeRow();
                     }
-                    csvWriter1.writeRow(phone, v1.get("reason"), v1.get("start_time"), v1.get("end_time"), v1.get("suggestion"), flag, diff, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"));
+                    csvWriter1.writeRow(phone, v1.get("reason"), v1.get("suggestion"), v1.get("start_time"), v1.get("end_time"), v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"), flag, diff);
                 }
             }
             try (OutputStreamWriter osw1 = new OutputStreamWriter(Files.newOutputStream(Paths.get(r2)),
@@ -214,8 +232,12 @@ class SyncTest {
                  OutputStreamWriter osw2 = new OutputStreamWriter(Files.newOutputStream(Paths.get(d2)),
                          StandardCharsets.UTF_8);
                  CsvWriter csvWriter2 = CsvWriter.builder().build(osw2);
-                 ) {
-                csvWriter1.writeRow("黑名单号码", "入黑名单原因", "开始时间", "结束时间", "解黑建议", "是否一致", "不一致原因", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型");
+                 OutputStreamWriter osw3 = new OutputStreamWriter(Files.newOutputStream(Paths.get(o2)),
+                         StandardCharsets.UTF_8);
+                 CsvWriter csvWriter3 = CsvWriter.builder().build(osw3);
+            ) {
+                csvWriter1.writeRow("黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型", "是否一致", "不一致原因");
+                csvWriter3.writeRow("不一致情况", "黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "入黑名单原因", "解黑建议", "开始时间", "结束时间");
                 for (Map<String, String> v2 : m2.values()) {
                     String phone = v2.get("phone");
                     Map<String, String> v1 = m1.getOrDefault(phone, new HashMap<>());
@@ -225,37 +247,53 @@ class SyncTest {
 
                     String flag = "是";
                     String diff = "";
+                    String situation = "";
                     if (!v2.get("reason").equals(v1.get("reason"))) {
                         diff = "入黑名单原因、";
+                        situation = "有同步记录但是数据不一致";
                     }
                     if (!v2.get("start_time").equals(v1.get("start_time"))) {
                         diff = diff + "开始时间、";
+                        situation = "有同步记录但是数据不一致";
                     }
                     if (!v2.get("end_time").equals(v1.get("end_time"))) {
                         diff = diff + "结束时间、";
+                        situation = "有同步记录但是数据不一致";
                     }
                     if (!v2.get("suggestion").equals(v1.get("suggestion"))) {
                         diff = diff + "解黑建议、";
+                        situation = "有同步记录但是数据不一致";
                     }
                     if (CollectionUtils.isEmpty(v1)) {
                         diff = "在黑数据中没有记录";
+                        situation = "数据库中存在但是在黑数据不存在";
                     }
                     if (StringUtils.hasText(diff)) {
                         flag = "否";
-                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "开始时间", "结束时间", "解黑建议", "是否一致", "不一致原因", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型");
-                        csvWriter2.writeRow(phone, v1.get("reason"), v1.get("start_time"), v1.get("end_time"), v1.get("suggestion"), flag, diff, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"));
-                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "开始时间", "结束时间", "命中策略说明", "操作类型", "解黑建议", "同步文件名");
+                        csvWriter3.writeRow(situation, phone, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v1.get("reason"), v1.get("suggestion"), v1.get("start_time"), v1.get("end_time"));
+                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型", "是否一致", "不一致原因");
+                        csvWriter2.writeRow(phone, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v1.get("reason"), v1.get("suggestion"), v1.get("start_time"), v1.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"), flag, diff);
+                        csvWriter2.writeRow("黑名单号码", "入黑名单原因", "解黑建议", "开始时间", "结束时间", "命中策略说明", "操作类型", "同步文件名");
                         try (CsvReader csvReader = CsvReader.builder().build(Paths.get(record), StandardCharsets.UTF_8)) {
                             for (CsvRow row : csvReader) {
                                 if (row.getOriginalLineNumber() == 1 || !phone.equals(row.getField(0))) {
                                     continue;
                                 }
-                                csvWriter2.writeRow(row.getFields());
+                                csvWriter2.writeRow(
+                                        row.getField(0),
+                                        row.getField(1),
+                                        row.getField(6),
+                                        row.getField(2),
+                                        row.getField(3),
+                                        row.getField(4),
+                                        row.getField(5),
+                                        row.getField(7)
+                                );
                             }
                         }
                         csvWriter2.writeRow();
                     }
-                    csvWriter1.writeRow(phone, v1.get("reason"), v1.get("start_time"), v1.get("end_time"), v1.get("suggestion"), flag, diff, v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"));
+                    csvWriter1.writeRow(phone, v1.get("reason"), v1.get("suggestion"), v1.get("start_time"), v1.get("end_time"), v2.get("reason"), v2.get("suggestion"), v2.get("start_time"), v2.get("end_time"), v2.get("hit_desc"), v2.get("operation_type"), flag, diff);
                 }
             }
         } catch (Exception e) {
@@ -321,4 +359,45 @@ class SyncTest {
             e.printStackTrace();
         }
     }
+
+    @Test
+    void testUpdate() {
+        Set<String> notExists = new HashSet<>();
+        try (CsvReader csvReader = CsvReader.builder().build(Paths.get(TEST_DIR + "data.csv"), StandardCharsets.UTF_8)) {
+            for (CsvRow row : csvReader) {
+                log.info("{} row: {}", row.getOriginalLineNumber(), row.getFields());
+                if (!StringUtils.hasText(row.getField(0))) {
+                    log.warn("跳过空行");
+                    continue;
+                }
+                String phone = StringUtils.trimAllWhitespace(row.getField(0));
+                Integer reason = Integer.valueOf(StringUtils.trimAllWhitespace(row.getField(1)));
+                String startTime = StringUtils.trimAllWhitespace(row.getField(2));
+                String endTime = StringUtils.trimAllWhitespace(row.getField(3));
+                Integer suggestion = Integer.valueOf(StringUtils.trimAllWhitespace(row.getField(4)));
+                log.info("{},{},{},{},{}", phone, reason, startTime, endTime, suggestion);
+                String sql = "select * from sms_blk.blacklist where phone = ?";
+                Map<String, Object> map = null;
+                try {
+                    map = jdbcTemplate.queryForMap(sql, phone);
+                } catch (EmptyResultDataAccessException e) {
+                    log.warn("{} not exist", phone);
+                    notExists.add(phone);
+                }
+
+                if (!CollectionUtils.isEmpty(map)) {
+                    if (startTime.equals(map.get("start_time"))
+                            || (Long.parseLong(startTime) > Long.parseLong(map.get("start_time").toString()))) {
+                        log.info("map: {}", map);
+                        sql = "update sms_blk.blacklist set reason = ?, start_time = ?, end_time = ?, suggestion = ?, operation_type = 1 where phone = ?";
+                        jdbcTemplate.update(sql, reason, startTime, endTime, suggestion, phone);
+                    }
+                }
+            }
+
+            log.info("{} notExists: {}", notExists.size(), notExists);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 }