Quellcode durchsuchen

feat: 实现同步黑名单号码

weijianghai vor 2 Jahren
Ursprung
Commit
bddeeccdf2

+ 1 - 1
sms_blk_api/src/test/java/com/nokia/sms/ConcurrentTest.java

@@ -70,7 +70,7 @@ class ConcurrentTest {
             dto.setPhone(t);
             dto.setFromSystem("test");
             dto.setOperator("test");
-            threadPoolTaskScheduler.submit(new Task(restTemplate, t, url, dto, singlePath));
+            threadPoolTaskScheduler.scheduleWithFixedDelay(new Task(restTemplate, t, url, dto, singlePath), 10000);
         }
         // 等待所有任务执行结束
         while (threadPoolTaskScheduler.getActiveCount() > 0) {

+ 1 - 0
sms_blk_interface/build-dir/run.sh

@@ -0,0 +1 @@
+nohup java -jar /data1/sms_blk/sms_blk_interface/sms_blk_interface-exec.jar >/dev/null 2>&1 &

+ 5 - 0
sms_blk_interface/build-dir/stop.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+for i in $(ps -ef | grep sms_blk_interface-exec.jar | grep -v grep | awk '{print $2}'); do
+  kill -9 "$i"
+done

+ 23 - 3
sms_blk_interface/pom.xml

@@ -33,13 +33,33 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>fastjson</artifactId>
-            <scope>test</scope>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
         </dependency>
         <dependency>
             <groupId>de.siegmar</groupId>
             <artifactId>fastcsv</artifactId>
+            <scope>compile</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.postgresql/postgresql -->
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <version>42.4.1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.55</version>
+        </dependency>
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
             <scope>test</scope>
         </dependency>
     </dependencies>

+ 383 - 0
sms_blk_interface/src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -0,0 +1,383 @@
+package com.nokia.common.ssh;
+
+import com.jcraft.jsch.*;
+import com.nokia.common.ssh.entity.SSHServer;
+import com.nokia.common.ssh.entity.UserInfoImpl;
+import com.nokia.common.ssh.exception.SSHUtilException;
+import com.nokia.common.ssh.exception.ScpAckErrorException;
+import com.xxl.job.core.context.XxlJobHelper;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Vector;
+
+/**
+ * 使用jsch库实现的ssh的工具类
+ * <p>
+ * todo: scpTo和scpFrom 在本机和targetServer默认编码不一致的时候,文件名中的中文会乱码,但是不会影响到文件内容,
+ */
+
+@Slf4j
+public class SSHUtil {
+
+    @Getter
+    @Setter
+    private SSHServer targetServer = new SSHServer();
+    private Session session = null;
+    private Channel channel = null;
+    private JSch jSch = null;
+    private FileInputStream fileInputStream = null;
+    private FileOutputStream fileOutputStream = null;
+    private OutputStream outputStream = null;
+    private InputStream inputStream = null;
+    private ChannelSftp channelSftp = null;
+
+    public SSHUtil() {
+    }
+
+    public SSHUtil(String host, String user, String password) {
+        targetServer = new SSHServer(host, 22, user, password);
+    }
+
+    public SSHUtil(String host, Integer port, String user, String password) {
+        targetServer = new SSHServer(host, port, user, password);
+    }
+
+    /**
+     * 获取文件列表
+     */
+    public List<String> ls(String path) throws JSchException, SftpException {
+        session = getConnectSession();
+        channelSftp = (ChannelSftp) session.openChannel("sftp");
+        channelSftp.connect();
+        List<String> fileNameList = new ArrayList<>();
+        Vector fileList = channelSftp.ls(path);
+        for (Object o : fileList) {
+            String fileName = ((ChannelSftp.LsEntry) o).getFilename();
+            if (".".equals(fileName) || "..".equals(fileName)) {
+                continue;
+            }
+
+            fileNameList.add(fileName);
+            channelSftp.quit();
+            session.disconnect();
+        }
+
+        return fileNameList;
+    }
+
+    /**
+     * 下载文件
+     */
+    public void get(String src, String dst) throws JSchException, SftpException, IOException {
+        session = getConnectSession();
+        channelSftp = (ChannelSftp) session.openChannel("sftp");
+        channelSftp.connect();
+        try (OutputStream out = Files.newOutputStream(Paths.get(dst))) {
+            channelSftp.get(src, out);
+        } finally {
+            channelSftp.quit();
+            session.disconnect();
+        }
+    }
+
+    /**
+     * 删除文件
+     */
+    public void rm(String path) throws JSchException, SftpException {
+        session = getConnectSession();
+        channelSftp = (ChannelSftp) session.openChannel("sftp");
+        channelSftp.connect();
+        channelSftp.rm(path);
+        channelSftp.quit();
+        session.disconnect();
+    }
+
+    /**
+     * 远程执行指令
+     */
+    public String exec(String command) throws JSchException, IOException {
+        StringBuilder stringBuilder = new StringBuilder();
+        try {
+            session = getConnectSession();
+            channel = session.openChannel("exec");
+            // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
+            String execCommand;
+            // 在命令前添加 bash --login -c "command"以获取环境变量
+            // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
+            if (command.startsWith("bash --login -c")) {
+                execCommand = command;
+            } else {
+                execCommand = String.format("bash --login -c \"%s\"", command);
+            }
+            ((ChannelExec) channel).setCommand(execCommand);
+            channel.setInputStream(null);
+            ((ChannelExec) channel).setErrStream(System.err);
+            InputStream in = channel.getInputStream();
+            channel.connect();
+            byte[] tmp = new byte[1024];
+            while (true) {
+                while (in.available() > 0) {
+                    int i = in.read(tmp, 0, 1024);
+                    if (i < 0) {
+                        break;
+                    }
+                    stringBuilder.append(new String(tmp, 0, i));
+                }
+                if (channel.isClosed()) {
+                    if (in.available() > 0) {
+                        continue;
+                    }
+                    break;
+                }
+            }
+        } finally {
+            disconnect();
+        }
+        return stringBuilder.toString();
+    }
+
+    /**
+     * 使用SCP把本地文件推送到targetServer目录下
+     * <p>
+     * 注意,文件名不能包含中文
+     */
+    public boolean scpTo(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
+        try {
+            session = getConnectSession();
+            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+            // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
+            String command = "scp " + "-t " + targetPath;
+            channel = session.openChannel("exec");
+            ((ChannelExec) channel).setCommand(command);
+            outputStream = channel.getOutputStream();
+            inputStream = channel.getInputStream();
+            channel.connect();
+            if (checkAck(inputStream) != 0) {
+                log.error("scpTo 执行失败");
+                XxlJobHelper.log("scpTo 执行失败");
+                return false;
+            }
+            File sourceFile = new File(sourceFilePath);
+            if (sourceFile.isDirectory()) {
+                log.error("sourceFilePath 必须是文件");
+                XxlJobHelper.log("sourceFilePath 必须是文件");
+                return false;
+            }
+            long fileSize = sourceFile.length();
+            command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
+            outputStream.write(command.getBytes());
+            outputStream.flush();
+            if (checkAck(inputStream) != 0) {
+                log.error("scpTo 执行失败");
+                XxlJobHelper.log("scpTo 执行失败");
+                return false;
+            }
+            fileInputStream = new FileInputStream(sourceFile);
+            byte[] buffer = new byte[1024];
+            while (true) {
+                int len = fileInputStream.read(buffer, 0, buffer.length);
+                if (len <= 0) {
+                    break;
+                }
+                outputStream.write(buffer, 0, len);
+            }
+            buffer[0] = 0;
+            outputStream.write(buffer, 0, 1);
+            outputStream.flush();
+            return checkAck(inputStream) == 0;
+        } finally {
+            disconnect();
+        }
+    }
+
+    /**
+     * 使用scp把targetServer目录下的文件复制到本地
+     */
+    public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
+        try {
+            log.info(sourceFilePath);
+            XxlJobHelper.log(sourceFilePath);
+            session = getConnectSession();
+            // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+            // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+            // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+            // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
+            String command = "scp -f " + sourceFilePath;
+            Channel channel = session.openChannel("exec");
+            ((ChannelExec) channel).setCommand(command);
+            outputStream = channel.getOutputStream();
+            inputStream = channel.getInputStream();
+            channel.connect();
+            byte[] buf = new byte[1024];
+            // 发送指令 '0'
+            // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
+            // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
+            // 当文件接收完成后, 宿端会给源端发送一个'0'
+            buf[0] = 0;
+            outputStream.write(buf, 0, 1);
+            outputStream.flush();
+            // 接收C0644 这条消息携带了文件的信息
+            while (true) {
+                int c = checkAck(inputStream);
+                // 遇到C时跳出循环
+                if (c == 'C') {
+                    break;
+                }
+            }
+            // 接收 '0644 ' 这段字符表示文件的权限
+            inputStream.read(buf, 0, 5);
+            // 获取filesize
+            long filesize = 0L;
+            while (true) {
+                if (inputStream.read(buf, 0, 1) < 0) {
+                    break;
+                }
+                if (buf[0] == ' ') {
+                    break;
+                }
+                filesize = filesize * 10L + (long) (buf[0] - '0');
+            }
+            // 从 C0644命令读取文件名,命令中的文件名是不带路径的
+            String file = null;
+            for (int i = 0; ; i++) {
+                inputStream.read(buf, i, 1);
+                // 0x0a 是LF 换行符
+                if (buf[i] == (byte) 0x0a) {
+                    file = new String(buf, 0, i);
+                    break;
+                }
+            }
+            log.info("filesize={}, file={}", filesize, file);
+            XxlJobHelper.log("filesize={}, file={}", filesize, file);
+            // 发送 '0'
+            buf[0] = 0;
+            outputStream.write(buf, 0, 1);
+            outputStream.flush();
+            // 如果目标是目录,则需要加上文件名
+            File target = new File(targetPath);
+            if (target.isDirectory()) {
+                log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
+                XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+                target = new File(targetPath + File.separator + file);
+            }
+
+            fileOutputStream = new FileOutputStream(target);
+            int foo;
+            while (true) {
+                if (buf.length < filesize) {
+                    foo = buf.length;
+                } else {
+                    foo = (int) filesize;
+                }
+                foo = inputStream.read(buf, 0, foo);
+                if (foo < 0) {
+                    break;
+                }
+                fileOutputStream.write(buf, 0, foo);
+                filesize -= foo;
+                if (filesize == 0L) {
+                    break;
+                }
+            }
+            if (checkAck(inputStream) != 0) {
+                return false;
+            }
+            // 发送 '0'
+            buf[0] = 0;
+            outputStream.write(buf, 0, 1);
+            outputStream.flush();
+            log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+            XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+            return true;
+        } finally {
+            disconnect();
+        }
+    }
+
+    private Session getConnectSession() throws JSchException {
+        jSch = new JSch();
+        session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        session.setPassword(targetServer.getPassword());
+        session.setUserInfo(new UserInfoImpl());
+        // 不需要输入保存ssh安全密钥的yes或no
+        Properties properties = new Properties();
+        properties.put("StrictHostKeyChecking", "no");
+        session.setConfig(properties);
+        session.connect();
+        log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        return session;
+    }
+
+    private void disconnect() throws IOException {
+        if (fileOutputStream != null) {
+            fileOutputStream.close();
+            fileOutputStream = null;
+        }
+        if (fileInputStream != null) {
+            fileInputStream.close();
+            fileInputStream = null;
+        }
+        if (outputStream != null) {
+            outputStream.close();
+            outputStream = null;
+        }
+        if (channel != null) {
+            channel.disconnect();
+            channel = null;
+        }
+        if (session != null) {
+            session.disconnect();
+            session = null;
+        }
+        jSch = null;
+    }
+
+    /**
+     * 来自源端的每条消息和每个传输完毕的文件都需要宿端的确认和响应.
+     * 宿端会返回三种确认消息: 0(正常), 1(警告)或2(严重错误, 将中断连接).
+     * 消息1和2可以跟一个字符串和一个换行符, 这个字符串将显示在scp的源端. 无论这个字符串是否为空, 换行符都是不可缺少的.
+     */
+    private static int checkAck(InputStream in) throws IOException, SSHUtilException {
+        int b = in.read();
+        // b 取值为0表示成功
+        if (b == 0) {
+            return b;
+        }
+        if (b == -1) {
+            return b;
+        }
+
+        // 1表示警告 2表示严重错误,将中断连接
+        // 1和2 后面会携带一条错误信息,以\n结尾
+        if (b == 1 || b == 2) {
+            // 打印消息后面跟的字符串
+            StringBuilder sb = new StringBuilder();
+            int c;
+            do {
+                // 读取字符串直到遇到换行符
+                c = in.read();
+                sb.append((char) c);
+            } while (c != '\n');
+            log.info("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            if (b == 1 && sb.toString().endsWith("No such file or directory")) {
+                throw new NoSuchFileException(sb.toString());
+            } else {
+                throw new ScpAckErrorException(sb.toString());
+            }
+        }
+        return b;
+    }
+}

+ 15 - 0
sms_blk_interface/src/main/java/com/nokia/common/ssh/entity/SSHServer.java

@@ -0,0 +1,15 @@
+package com.nokia.common.ssh.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class SSHServer {
+    private String host;
+    private int port = 22;
+    private String user;
+    private String password;
+}

+ 35 - 0
sms_blk_interface/src/main/java/com/nokia/common/ssh/entity/UserInfoImpl.java

@@ -0,0 +1,35 @@
+package com.nokia.common.ssh.entity;
+
+import com.jcraft.jsch.UserInfo;
+
+public class UserInfoImpl implements UserInfo {
+    @Override
+    public String getPassphrase() {
+        return null;
+    }
+
+    @Override
+    public String getPassword() {
+        return null;
+    }
+
+    @Override
+    public boolean promptPassword(String s) {
+        return false;
+    }
+
+    @Override
+    public boolean promptPassphrase(String s) {
+        return false;
+    }
+
+    @Override
+    public boolean promptYesNo(String s) {
+        return false;
+    }
+
+    @Override
+    public void showMessage(String s) {
+
+    }
+}

+ 8 - 0
sms_blk_interface/src/main/java/com/nokia/common/ssh/exception/SSHUtilException.java

@@ -0,0 +1,8 @@
+package com.nokia.common.ssh.exception;
+
+public class SSHUtilException extends Exception{
+
+    public SSHUtilException(String message) {
+        super(message);
+    }
+}

+ 7 - 0
sms_blk_interface/src/main/java/com/nokia/common/ssh/exception/ScpAckErrorException.java

@@ -0,0 +1,7 @@
+package com.nokia.common.ssh.exception;
+
+public class ScpAckErrorException extends SSHUtilException {
+    public ScpAckErrorException(String message) {
+        super(message);
+    }
+}

+ 12 - 0
sms_blk_interface/src/main/java/com/nokia/sms/SmsBlkInterfaceApplication.java

@@ -0,0 +1,12 @@
+package com.nokia.sms;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class SmsBlkInterfaceApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(SmsBlkInterfaceApplication.class, args);
+    }
+}

+ 78 - 0
sms_blk_interface/src/main/java/com/nokia/sms/config/XxlJobConfig.java

@@ -0,0 +1,78 @@
+package com.nokia.sms.config;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * xxl-job config
+ *
+ * @author xuxueli 2017-04-28
+ */
+@Configuration
+public class XxlJobConfig {
+    private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
+
+    @Value("${xxl.job.admin.addresses}")
+    private String adminAddresses;
+
+    @Value("${xxl.job.accessToken}")
+    private String accessToken;
+
+    @Value("${xxl.job.executor.appname}")
+    private String appname;
+
+    @Value("${xxl.job.executor.address}")
+    private String address;
+
+    @Value("${xxl.job.executor.ip}")
+    private String ip;
+
+    @Value("${xxl.job.executor.port}")
+    private int port;
+
+    @Value("${xxl.job.executor.logpath}")
+    private String logPath;
+
+    @Value("${xxl.job.executor.logretentiondays}")
+    private int logRetentionDays;
+
+
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor() {
+        logger.info(">>>>>>>>>>> xxl-job config init.");
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
+        xxlJobSpringExecutor.setAppname(appname);
+        xxlJobSpringExecutor.setAddress(address);
+        xxlJobSpringExecutor.setIp(ip);
+        xxlJobSpringExecutor.setPort(port);
+        xxlJobSpringExecutor.setAccessToken(accessToken);
+        xxlJobSpringExecutor.setLogPath(logPath);
+        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
+
+        return xxlJobSpringExecutor;
+    }
+
+    /**
+     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
+     *
+     *      1、引入依赖:
+     *          <dependency>
+     *             <groupId>org.springframework.cloud</groupId>
+     *             <artifactId>spring-cloud-commons</artifactId>
+     *             <version>${version}</version>
+     *         </dependency>
+     *
+     *      2、配置文件,或者容器启动变量
+     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
+     *
+     *      3、获取IP
+     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
+     */
+
+
+}

+ 165 - 0
sms_blk_interface/src/main/java/com/nokia/sms/task/SyncTask.java

@@ -0,0 +1,165 @@
+package com.nokia.sms.task;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.SftpException;
+import com.nokia.common.ssh.SSHUtil;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import de.siegmar.fastcsv.reader.CsvReader;
+import de.siegmar.fastcsv.reader.CsvRow;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+@Slf4j
+public class SyncTask {
+    @Value("${sms.blk.ssh.host:10.17.182.24}")
+    private String host;
+    @Value("${sms.blk.ssh.port:22}")
+    private Integer port;
+    @Value("${sms.blk.ssh.username:sassftp}")
+    private String username;
+    @Value("${sms.blk.ssh.password:Flzx3qc!}")
+    private String password;
+    @Value("${sms.blk.ssh.sourceDir:files/black_list/}")
+    private String sourceDir;
+    @Value("${sms.blk.ssh.targetDir:download/}")
+    private String downloadTargetDir;
+
+    private final JdbcTemplate jdbcTemplate;
+
+    public SyncTask(JdbcTemplate jdbcTemplate) {
+        this.jdbcTemplate = jdbcTemplate;
+    }
+
+    @XxlJob("smsBlackListSync")
+    public void smsBlackListSync() {
+        try {
+            // 创建文件夹
+            Files.createDirectories(Paths.get(downloadTargetDir));
+            SSHUtil sshUtil = new SSHUtil(host, port, username, password);
+            // 获取文件列表
+            List<String> list = sshUtil.ls(sourceDir);
+            if (CollectionUtils.isEmpty(list)) {
+                log.info("没有文件");
+                XxlJobHelper.log("没有文件");
+                return;
+            }
+
+            log.info("扫描到的文件: {}", list);
+            XxlJobHelper.log("扫描到的文件: {}", list);
+            for (String t : list) {
+                // 跳过临时文件
+                if (t.endsWith(".tmp")) {
+                    continue;
+                }
+
+                // 删除空文件
+                if (t.endsWith("_0")) {
+                    log.info("删除空文件: {}", t);
+                    XxlJobHelper.log("删除空文件: {}", t);
+                    sshUtil.rm(sourceDir + t);
+                    continue;
+                }
+                singleTask(t);
+            }
+        } catch (Exception e) {
+            log.error("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.handleFail(e.getMessage());
+        }
+    }
+
+    /**
+     * 单一任务
+     *
+     * @param filename 文件名
+     */
+    public void singleTask(String filename) throws JSchException, IOException, SftpException {
+        download(filename);
+        importData(filename);
+    }
+
+    /**
+     * 下载文件
+     *
+     * @param filename 文件名
+     */
+    public void download(String filename) throws JSchException, IOException, SftpException {
+        log.info("下载文件: {}", filename);
+        XxlJobHelper.log("下载文件: {}", filename);
+        SSHUtil sshUtil = new SSHUtil(host, port, username, password);
+        String src = sourceDir + filename;
+        String dst = downloadTargetDir + filename;
+        sshUtil.get(src, dst);
+        // 删除远程文件
+        log.info("删除远程文件: {}", filename);
+        XxlJobHelper.log("删除远程文件: {}", filename);
+        sshUtil.rm(src);
+    }
+
+    /**
+     * 导入数据
+     *
+     * @param filename 文件名
+     * @throws IOException ioexception
+     */
+    public void importData(String filename) throws IOException {
+        List<Object[]> l1 = new ArrayList<>();
+        List<Object[]> l2 = new ArrayList<>();
+        log.info("导入: {}", filename);
+        XxlJobHelper.log("导入: {}", filename);
+        Path path = Paths.get(downloadTargetDir + filename);
+        try (CsvReader csvReader = CsvReader.builder().build(path, StandardCharsets.UTF_8)) {
+            for (CsvRow row : csvReader) {
+                Object[] o1 = new Object[]{
+                        row.getField(0),
+                        Integer.valueOf(row.getField(1)),
+                        row.getField(2),
+                        row.getField(3),
+                        row.getField(4),
+                        Integer.valueOf(row.getField(5)),
+                        Integer.valueOf(row.getField(6)),
+                };
+                l1.add(o1);
+                Object[] o2 = new Object[]{
+                        row.getField(0),
+                        Integer.valueOf(row.getField(1)),
+                        row.getField(2),
+                        row.getField(3),
+                        row.getField(4),
+                        Integer.valueOf(row.getField(5)),
+                        Integer.valueOf(row.getField(6)),
+                        filename
+                };
+                l2.add(o2);
+            }
+        }
+        String sql = "insert into sms_blk.blacklist " +
+                "(phone, reason, start_time, end_time, hit_desc, operation_type, suggestion) " +
+                "values (?, ?, ?, ?, ?, ?, ?) " +
+                "on conflict (phone) " +
+                "do update set (reason, start_time, end_time, hit_desc, operation_type, suggestion) = " +
+                "(excluded.reason, excluded.start_time, excluded.end_time, excluded.hit_desc, " +
+                "excluded.operation_type, excluded.suggestion)";
+        jdbcTemplate.batchUpdate(sql, l1);
+        sql = "insert into sms_blk.blacklist_file " +
+                "(phone, reason, start_time, end_time, hit_desc, operation_type, suggestion, filename) " +
+                "values (?, ?, ?, ?, ?, ?, ?, ?)";
+        jdbcTemplate.batchUpdate(sql, l2);
+        // todo: 删除本地文件
+//        Files.deleteIfExists(path);
+//        log.info("删除本地文件: {}", filename);
+    }
+}

+ 21 - 0
sms_blk_interface/src/main/resources/application.properties

@@ -0,0 +1,21 @@
+server.port=12121
+logging.level.com.nokia=info
+spring.datasource.url=jdbc:postgresql://192.168.50.4:5432/sqmmt
+spring.datasource.username=do
+spring.datasource.password=Richr00t
+spring.datasource.driverClassName=org.postgresql.Driver
+### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
+xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+### xxl-job, access token
+xxl.job.accessToken=
+### xxl-job executor appname
+xxl.job.executor.appname=sms-blk-interface
+### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
+xxl.job.executor.address=
+### xxl-job executor server-info
+xxl.job.executor.ip=
+xxl.job.executor.port=9999
+### xxl-job executor log-path
+xxl.job.executor.logpath=/data1/sms_blk/sms_blk_interface/xxl/
+### xxl-job executor log-retention-days
+xxl.job.executor.logretentiondays=30

+ 51 - 0
sms_blk_interface/src/main/resources/logback-spring.xml

@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+    <property name="PATH" value="./log"/>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <Pattern>%d{HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+        </encoder>
+    </appender>
+    <appender name="TRACE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${PATH}/trace.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <!-- rollover daily -->
+            <fileNamePattern>${PATH}/trace.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <!-- each file should be at most 100MB, keep 60 days worth of history, but at most 20GB -->
+            <maxFileSize>10MB</maxFileSize>
+            <maxHistory>60</maxHistory>
+            <totalSizeCap>20GB</totalSizeCap>
+        </rollingPolicy>
+        <encoder>
+            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+        </encoder>
+    </appender>
+    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${PATH}/error.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <!-- rollover daily -->
+            <fileNamePattern>${PATH}/error.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <!-- each file should be at most 100MB, keep 60 days worth of history, but at most 20GB -->
+            <maxFileSize>10MB</maxFileSize>
+            <maxHistory>60</maxHistory>
+            <totalSizeCap>20GB</totalSizeCap>
+        </rollingPolicy>
+        <encoder>
+            <Pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %blue(%logger:%line) %msg%n</Pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>ERROR</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <root level="TRACE">
+        <appender-ref ref="TRACE_FILE"/>
+    </root>
+    <root level="ERROR">
+        <appender-ref ref="ERROR_FILE"/>
+    </root>
+</configuration>

+ 86 - 0
sms_blk_interface/src/test/java/com/nokia/sms/SyncTest.java

@@ -0,0 +1,86 @@
+package com.nokia.sms;
+
+import com.nokia.sms.task.SyncTask;
+import de.siegmar.fastcsv.reader.CsvReader;
+import de.siegmar.fastcsv.reader.CsvRow;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@Slf4j
+@SpringBootTest
+class SyncTest {
+    @Autowired
+    private JdbcTemplate jdbcTemplate;
+    @Autowired
+    private SyncTask syncTask;
+
+    @Test
+    void test() {}
+
+    @Test
+    void testImport() {
+        try(Stream<Path> stream = Files.list(Paths.get("z:/5k"))) {
+            List<Path> pathList = stream.collect(Collectors.toList());
+            log.info("pathList: {}", pathList);
+            for (Path t : pathList) {
+                List<Object[]> l1 = new ArrayList<>();
+                List<Object[]> l2 = new ArrayList<>();
+                log.info("file: {}", t.getFileName().toString());
+                try (CsvReader csvReader = CsvReader.builder().build(t, StandardCharsets.UTF_8)) {
+                    for (CsvRow row : csvReader) {
+                        Object[] o1 = new Object[]{
+                                row.getField(0),
+                                Integer.valueOf(row.getField(1)),
+                                row.getField(2),
+                                row.getField(3),
+                                row.getField(4),
+                                Integer.valueOf(row.getField(5)),
+                                Integer.valueOf(row.getField(6)),
+                        };
+                        l1.add(o1);
+                        Object[] o2 = new Object[]{
+                                row.getField(0),
+                                Integer.valueOf(row.getField(1)),
+                                row.getField(2),
+                                row.getField(3),
+                                row.getField(4),
+                                Integer.valueOf(row.getField(5)),
+                                Integer.valueOf(row.getField(6)),
+                                t.getFileName().toString()
+                        };
+                        l2.add(o2);
+//                        log.info("{}: {}", row.getOriginalLineNumber(), o1);
+                    }
+                }
+                log.info("blacklist");
+                String sql = "insert into sms_blk.blacklist " +
+                        "(phone, reason, start_time, end_time, hit_desc, operation_type, suggestion) " +
+                        "values (?, ?, ?, ?, ?, ?, ?) " +
+                        "on conflict (phone) " +
+                        "do update set (reason, start_time, end_time, hit_desc, operation_type, suggestion) = " +
+                        "(excluded.reason, excluded.start_time, excluded.end_time, excluded.hit_desc, " +
+                        "excluded.operation_type, excluded.suggestion)";
+                jdbcTemplate.batchUpdate(sql, l1);
+                log.info("blacklist_file");
+                sql = "insert into sms_blk.blacklist_file " +
+                        "(phone, reason, start_time, end_time, hit_desc, operation_type, suggestion, filename) " +
+                        "values (?, ?, ?, ?, ?, ?, ?, ?)";
+                jdbcTemplate.batchUpdate(sql, l2);
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}