Browse Source

feat: 添加同步top用户信息定时任务

weijianghai 2 years ago
parent
commit
b69e91aee3

+ 15 - 0
doc/部署文档/测试环境/gpload/gpload.sh

@@ -0,0 +1,15 @@
+#!/bin/bash
+
+source /usr/local/greenplum-db-clients/greenplum_loaders_path.sh
+
+# 由于gpload需要输入密码,这里需要使用expect执行
+password=sqmdb_1QAZ
+expect -c "
+set timeout 300
+spawn gpload -f /data1/acl/gpload/gpload.yml
+expect {
+\"connecting (yes/no)?\" { send \"yes\n\";exp_continue }
+\"Password:\" { send \"${password}\n\"; exp_continue}
+timeout { puts \"超时\" exit 2}
+}
+"

+ 24 - 0
doc/部署文档/测试环境/gpload/gpload.yml

@@ -0,0 +1,24 @@
+VERSION: 1.0.0.1
+DATABASE: sqmmt
+USER: sqmdb
+HOST: 192.168.50.5
+PORT: 5432
+GPLOAD:
+  PRELOAD:
+    - TRUNCATE: true
+    - REUSE_TABLES: false
+  INPUT:
+    - SOURCE:
+        LOCAL_HOSTNAME:
+          - 192.168.50.3
+        PORT: 54331
+        FILE:
+          - /data1/acl/top-user.csv
+    - FORMAT: csv
+    - DELIMITER: ','
+    - ENCODING: utf-8
+    - ERROR_LIMIT: 20000000
+    - LOG_ERRORS: true
+  OUTPUT:
+    - TABLE: sqmdb_rpt.acl_top_user
+    - MODE: insert

+ 30 - 14
pom.xml

@@ -17,14 +17,6 @@
         <skipTests>true</skipTests>
     </properties>
 
-    <repositories>
-        <repository>
-            <id>nexus-maven</id>
-            <name>nexus-maven</name>
-            <url>https://oss.sonatype.org/content/repositories/snapshots/</url>
-        </repository>
-    </repositories>
-
     <dependencyManagement>
         <dependencies>
             <dependency>
@@ -39,12 +31,26 @@
                 <version>4.10.0</version>
                 <scope>compile</scope>
             </dependency>
+            <!-- https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-ui -->
+            <dependency>
+                <groupId>org.springdoc</groupId>
+                <artifactId>springdoc-openapi-ui</artifactId>
+                <version>1.6.9</version>
+            </dependency>
             <dependency>
-                <groupId>com.github.xiaoymin</groupId>
-                <artifactId>knife4j-dependencies</artifactId>
-                <version>4.0.0-SNAPSHOT</version>
-                <type>pom</type>
-                <scope>import</scope>
+                <groupId>org.checkerframework</groupId>
+                <artifactId>checker-qual</artifactId>
+                <version>3.8.0</version>
+            </dependency>
+            <dependency>
+                <groupId>io.swagger.core.v3</groupId>
+                <artifactId>swagger-annotations</artifactId>
+                <version>2.2.2</version>
+            </dependency>
+            <dependency>
+                <groupId>io.swagger.core.v3</groupId>
+                <artifactId>swagger-models</artifactId>
+                <version>2.2.2</version>
             </dependency>
         </dependencies>
     </dependencyManagement>
@@ -103,7 +109,6 @@
         <dependency>
             <groupId>org.springdoc</groupId>
             <artifactId>springdoc-openapi-ui</artifactId>
-            <version>1.6.13</version>
         </dependency>
         <!-- https://mvnrepository.com/artifact/io.minio/minio -->
         <dependency>
@@ -117,9 +122,20 @@
             <artifactId>hutool-all</artifactId>
             <version>5.8.10</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/com.github.xiaoymin/knife4j-openapi3-spring-boot-starter -->
         <dependency>
             <groupId>com.github.xiaoymin</groupId>
             <artifactId>knife4j-openapi3-spring-boot-starter</artifactId>
+            <version>4.0.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+            <version>2.3.0</version>
         </dependency>
     </dependencies>
 

+ 105 - 0
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -0,0 +1,105 @@
+package com.nokia.common.gpload;
+
+import com.nokia.common.gpload.entity.GploadResult;
+import com.xxl.job.core.context.XxlJobHelper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.CollectionUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Slf4j
+public class GploadUtil {
+    public static final Pattern PATTERN = Pattern.compile("= (\\d+)");
+
+    public static GploadResult gpload(String gploadCommand) {
+        Process process = null;
+        GploadResult result = new GploadResult();
+        result.setTaskStatus(true);
+        try {
+            process = Runtime.getRuntime().exec(gploadCommand);
+            try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                 BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
+                String line;
+                // 读取错误流
+                List<String> error = new ArrayList<>();
+                while ((line = errorReader.readLine()) != null) {
+                    error.add(line);
+                }
+                if (!CollectionUtils.isEmpty(error)) {
+                    log.info("gpload errorStream: " + error);
+                    XxlJobHelper.log("gpload errorStream: " + error);
+                    result.setTaskStatus(false);
+                    result.setMessage(error.toString());
+                }
+
+                // 读取标准流
+                List<String> input = new ArrayList<>();
+                while ((line = inputReader.readLine()) != null) {
+                    input.add(line);
+                }
+                if (!CollectionUtils.isEmpty(input)) {
+                    log.info("gpload inputStream: " + input);
+                    XxlJobHelper.log("gpload inputStream: " + input);
+                    for (String s : input) {
+                        if (s.contains("|rows Inserted")) {
+                            result.setInsertedCount(getNum(s));
+                        }
+                        if (s.contains("|rows Updated")) {
+                            result.setUpdatedCount(getNum(s));
+                        }
+                        if (s.contains("|data formatting errors")) {
+                            result.setErrorCount(getNum(s));
+                        }
+                        if (s.contains("ERROR") || s.contains("gpload failed")) {
+                            result.setTaskStatus(false);
+                        }
+                        if (s.contains("ERROR")) {
+                            result.setMessage(result.getMessage() + "\n" + s);
+                        }
+                    }
+                }
+            }
+
+            int code = process.waitFor();
+            if (code != 0) {
+                result.setTaskStatus(false);
+            }
+        } catch (IOException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
+            XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
+            result.setTaskStatus(false);
+            result.setMessage(e.getMessage());
+        } catch (InterruptedException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
+            XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
+            result.setTaskStatus(false);
+            result.setMessage(e.getMessage());
+            Thread.currentThread().interrupt();
+        } finally {
+            log.info("gpload的结果为: {}", result);
+            XxlJobHelper.log("gpload的结果为: {}", result);
+            if (process != null) {
+                process.destroy();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * 获取gpload统计条数
+     */
+    private static int getNum(String s) {
+        Matcher matcher = PATTERN.matcher(s);
+        if (matcher.find()) {
+            return Integer.parseInt(matcher.group(1));
+        }
+
+        return 0;
+    }
+}

+ 12 - 0
src/main/java/com/nokia/common/gpload/entity/GploadResult.java

@@ -0,0 +1,12 @@
+package com.nokia.common.gpload.entity;
+
+import lombok.Data;
+
+@Data
+public class GploadResult {
+    private Boolean taskStatus;
+    private Integer insertedCount;
+    private Integer updatedCount;
+    private Integer errorCount;
+    private String message = "";
+}

+ 383 - 0
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -0,0 +1,383 @@
+package com.nokia.common.ssh;
+
+import com.jcraft.jsch.*;
+import com.nokia.common.ssh.entity.SSHServer;
+import com.nokia.common.ssh.entity.UserInfoImpl;
+import com.nokia.common.ssh.exception.SSHUtilException;
+import com.nokia.common.ssh.exception.ScpAckErrorException;
+import com.xxl.job.core.context.XxlJobHelper;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.stream.Collectors;
+
+/**
+ * 使用jsch库实现的ssh的工具类
+ * <p>
+ * todo: scpTo和scpFrom 在本机和targetServer默认编码不一致的时候,文件名中的中文会乱码,但是不会影响到文件内容,
+ */
+
+@Slf4j
+public class SSHUtil {
+
+    @Getter
+    @Setter
+    private SSHServer targetServer = new SSHServer();
+    private Session session = null;
+    private Channel channel = null;
+    private JSch jSch = null;
+    private FileInputStream fileInputStream = null;
+    private FileOutputStream fileOutputStream = null;
+    private OutputStream outputStream = null;
+    private InputStream inputStream = null;
+    private ChannelSftp channelSftp = null;
+
+    public SSHUtil() {
+    }
+
+    public SSHUtil(String host, String user, String password) {
+        targetServer = new SSHServer(host, 22, user, password);
+    }
+
+    public SSHUtil(String host, Integer port, String user, String password) {
+        targetServer = new SSHServer(host, port, user, password);
+    }
+
+    /**
+     * 获取文件列表
+     */
+    @SuppressWarnings("rawtypes")
+    public List<String> ls(String path) throws JSchException, SftpException {
+        getConnectSession();
+        channelSftpConnect();
+        List<String> fileNameList = new ArrayList<>();
+        Vector fileList = channelSftp.ls(path);
+        for (Object o : fileList) {
+            String fileName = ((ChannelSftp.LsEntry) o).getFilename();
+            if (".".equals(fileName) || "..".equals(fileName)) {
+                continue;
+            }
+            fileNameList.add(fileName);
+        }
+
+        return fileNameList.stream().sorted().collect(Collectors.toList());
+    }
+
+    /**
+     * 下载文件
+     */
+    public void get(String src, String dst) throws JSchException, SftpException, IOException {
+        try (OutputStream out = Files.newOutputStream(Paths.get(dst))) {
+            getConnectSession();
+            channelSftpConnect();
+            channelSftp.get(src, out);
+        }
+    }
+
+    /**
+     * 删除文件
+     */
+    public void rm(String path) throws JSchException, SftpException {
+        getConnectSession();
+        channelSftpConnect();
+        channelSftp.rm(path);
+    }
+
+    /**
+     * 远程执行指令
+     */
+    public String exec(String command) throws JSchException, IOException {
+        StringBuilder stringBuilder = new StringBuilder();
+        getConnectSession();
+        channel = session.openChannel("exec");
+        // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
+        String execCommand;
+        // 在命令前添加 bash --login -c "command"以获取环境变量
+        // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
+        if (command.startsWith("bash --login -c")) {
+            execCommand = command;
+        } else {
+            execCommand = String.format("bash --login -c \"%s\"", command);
+        }
+        ((ChannelExec) channel).setCommand(execCommand);
+        channel.setInputStream(null);
+        ((ChannelExec) channel).setErrStream(System.err);
+        InputStream in = channel.getInputStream();
+        channel.connect();
+        byte[] tmp = new byte[1024];
+        while (true) {
+            while (in.available() > 0) {
+                int i = in.read(tmp, 0, 1024);
+                if (i < 0) {
+                    break;
+                }
+                stringBuilder.append(new String(tmp, 0, i));
+            }
+            if (channel.isClosed()) {
+                if (in.available() > 0) {
+                    continue;
+                }
+                break;
+            }
+        }
+        return stringBuilder.toString();
+    }
+
+    /**
+     * 使用SCP把本地文件推送到targetServer目录下
+     * <p>
+     * 注意,文件名不能包含中文
+     */
+    public boolean scpTo(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp " + "-t " + targetPath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        File sourceFile = new File(sourceFilePath);
+        if (sourceFile.isDirectory()) {
+            log.error("sourceFilePath 必须是文件");
+            XxlJobHelper.log("sourceFilePath 必须是文件");
+            return false;
+        }
+        long fileSize = sourceFile.length();
+        command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
+        outputStream.write(command.getBytes());
+        outputStream.flush();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        fileInputStream = new FileInputStream(sourceFile);
+        byte[] buffer = new byte[1024];
+        while (true) {
+            int len = fileInputStream.read(buffer, 0, buffer.length);
+            if (len <= 0) {
+                break;
+            }
+            outputStream.write(buffer, 0, len);
+        }
+        buffer[0] = 0;
+        outputStream.write(buffer, 0, 1);
+        outputStream.flush();
+        return checkAck(inputStream) == 0;
+    }
+
+    /**
+     * 使用scp把targetServer目录下的文件复制到本地
+     */
+    public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
+        log.info(sourceFilePath);
+        XxlJobHelper.log(sourceFilePath);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp -f " + sourceFilePath;
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        byte[] buf = new byte[1024];
+        // 发送指令 '0'
+        // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
+        // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
+        // 当文件接收完成后, 宿端会给源端发送一个'0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 接收C0644 这条消息携带了文件的信息
+        while (true) {
+            int c = checkAck(inputStream);
+            // 遇到C时跳出循环
+            if (c == 'C') {
+                break;
+            }
+        }
+        // 接收 '0644 ' 这段字符表示文件的权限
+        inputStream.read(buf, 0, 5);
+        // 获取filesize
+        long filesize = 0L;
+        while (true) {
+            if (inputStream.read(buf, 0, 1) < 0) {
+                break;
+            }
+            if (buf[0] == ' ') {
+                break;
+            }
+            filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 从 C0644命令读取文件名,命令中的文件名是不带路径的
+        String file = null;
+        for (int i = 0; ; i++) {
+            inputStream.read(buf, i, 1);
+            // 0x0a 是LF 换行符
+            if (buf[i] == (byte) 0x0a) {
+                file = new String(buf, 0, i);
+                break;
+            }
+        }
+        log.info("filesize={}, file={}", filesize, file);
+        XxlJobHelper.log("filesize={}, file={}", filesize, file);
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 如果目标是目录,则需要加上文件名
+        File target = new File(targetPath);
+        if (target.isDirectory()) {
+            log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            target = new File(targetPath + File.separator + file);
+        }
+
+        fileOutputStream = new FileOutputStream(target);
+        int foo;
+        while (true) {
+            if (buf.length < filesize) {
+                foo = buf.length;
+            } else {
+                foo = (int) filesize;
+            }
+            foo = inputStream.read(buf, 0, foo);
+            if (foo < 0) {
+                break;
+            }
+            fileOutputStream.write(buf, 0, foo);
+            filesize -= foo;
+            if (filesize == 0L) {
+                break;
+            }
+        }
+        if (checkAck(inputStream) != 0) {
+            return false;
+        }
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        return true;
+    }
+
+    public void getConnectSession() throws JSchException {
+        if (jSch == null) {
+            jSch = new JSch();
+        }
+
+        if (session == null) {
+            session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            session.setPassword(targetServer.getPassword());
+            session.setUserInfo(new UserInfoImpl());
+            // 不需要输入保存ssh安全密钥的yes或no
+            Properties properties = new Properties();
+            properties.put("StrictHostKeyChecking", "no");
+            session.setConfig(properties);
+        }
+
+        if (!session.isConnected()) {
+            session.connect();
+            log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        }
+    }
+
+    public void channelSftpConnect() throws JSchException {
+        if (channelSftp == null) {
+            channelSftp = (ChannelSftp) session.openChannel("sftp");
+        }
+
+        if (!channelSftp.isConnected()) {
+            channelSftp.connect();
+        }
+    }
+
+    public void disconnect() throws IOException {
+        if (fileOutputStream != null) {
+            fileOutputStream.close();
+            fileOutputStream = null;
+        }
+        if (fileInputStream != null) {
+            fileInputStream.close();
+            fileInputStream = null;
+        }
+        if (outputStream != null) {
+            outputStream.close();
+            outputStream = null;
+        }
+        if (channel != null) {
+            channel.disconnect();
+            channel = null;
+        }
+        if (channelSftp != null) {
+            channelSftp.quit();
+        }
+        if (session != null) {
+            session.disconnect();
+            session = null;
+        }
+        jSch = null;
+        log.info("jsch disconnected");
+    }
+
+    /**
+     * 来自源端的每条消息和每个传输完毕的文件都需要宿端的确认和响应.
+     * 宿端会返回三种确认消息: 0(正常), 1(警告)或2(严重错误, 将中断连接).
+     * 消息1和2可以跟一个字符串和一个换行符, 这个字符串将显示在scp的源端. 无论这个字符串是否为空, 换行符都是不可缺少的.
+     */
+    private static int checkAck(InputStream in) throws IOException, SSHUtilException {
+        int b = in.read();
+        // b 取值为0表示成功
+        if (b == 0) {
+            return b;
+        }
+        if (b == -1) {
+            return b;
+        }
+
+        // 1表示警告 2表示严重错误,将中断连接
+        // 1和2 后面会携带一条错误信息,以\n结尾
+        if (b == 1 || b == 2) {
+            // 打印消息后面跟的字符串
+            StringBuilder sb = new StringBuilder();
+            int c;
+            do {
+                // 读取字符串直到遇到换行符
+                c = in.read();
+                sb.append((char) c);
+            } while (c != '\n');
+            log.info("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            if (b == 1 && sb.toString().endsWith("No such file or directory")) {
+                throw new NoSuchFileException(sb.toString());
+            } else {
+                throw new ScpAckErrorException(sb.toString());
+            }
+        }
+        return b;
+    }
+}

+ 15 - 0
src/main/java/com/nokia/common/ssh/entity/SSHServer.java

@@ -0,0 +1,15 @@
+package com.nokia.common.ssh.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class SSHServer {
+    private String host;
+    private int port = 22;
+    private String user;
+    private String password;
+}

+ 35 - 0
src/main/java/com/nokia/common/ssh/entity/UserInfoImpl.java

@@ -0,0 +1,35 @@
+package com.nokia.common.ssh.entity;
+
+import com.jcraft.jsch.UserInfo;
+
+public class UserInfoImpl implements UserInfo {
+    @Override
+    public String getPassphrase() {
+        return null;
+    }
+
+    @Override
+    public String getPassword() {
+        return null;
+    }
+
+    @Override
+    public boolean promptPassword(String s) {
+        return false;
+    }
+
+    @Override
+    public boolean promptPassphrase(String s) {
+        return false;
+    }
+
+    @Override
+    public boolean promptYesNo(String s) {
+        return false;
+    }
+
+    @Override
+    public void showMessage(String s) {
+
+    }
+}

+ 8 - 0
src/main/java/com/nokia/common/ssh/exception/SSHUtilException.java

@@ -0,0 +1,8 @@
+package com.nokia.common.ssh.exception;
+
+public class SSHUtilException extends Exception{
+
+    public SSHUtilException(String message) {
+        super(message);
+    }
+}

+ 7 - 0
src/main/java/com/nokia/common/ssh/exception/ScpAckErrorException.java

@@ -0,0 +1,7 @@
+package com.nokia.common.ssh.exception;
+
+public class ScpAckErrorException extends SSHUtilException {
+    public ScpAckErrorException(String message) {
+        super(message);
+    }
+}

+ 76 - 0
src/main/java/com/nokia/config/XxlJobConfig.java

@@ -0,0 +1,76 @@
+package com.nokia.config;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * xxl-job config
+ *
+ * @author xuxueli 2017-04-28
+ */
+@Slf4j
+@Configuration
+public class XxlJobConfig {
+    @Value("${xxl.job.admin.addresses}")
+    private String adminAddresses;
+
+    @Value("${xxl.job.accessToken}")
+    private String accessToken;
+
+    @Value("${xxl.job.executor.appname}")
+    private String appname;
+
+    @Value("${xxl.job.executor.address}")
+    private String address;
+
+    @Value("${xxl.job.executor.ip}")
+    private String ip;
+
+    @Value("${xxl.job.executor.port}")
+    private int port;
+
+    @Value("${xxl.job.executor.logpath}")
+    private String logPath;
+
+    @Value("${xxl.job.executor.logretentiondays}")
+    private int logRetentionDays;
+
+
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor() {
+        log.info(">>>>>>>>>>> xxl-job config init.");
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
+        xxlJobSpringExecutor.setAppname(appname);
+        xxlJobSpringExecutor.setAddress(address);
+        xxlJobSpringExecutor.setIp(ip);
+        xxlJobSpringExecutor.setPort(port);
+        xxlJobSpringExecutor.setAccessToken(accessToken);
+        xxlJobSpringExecutor.setLogPath(logPath);
+        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
+
+        return xxlJobSpringExecutor;
+    }
+
+    /**
+     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
+     *
+     *      1、引入依赖:
+     *          <dependency>
+     *             <groupId>org.springframework.cloud</groupId>
+     *             <artifactId>spring-cloud-commons</artifactId>
+     *             <version>${version}</version>
+     *         </dependency>
+     *
+     *      2、配置文件,或者容器启动变量
+     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
+     *
+     *      3、获取IP
+     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
+     */
+
+
+}

+ 214 - 0
src/main/java/com/nokia/task/SyncTask.java

@@ -0,0 +1,214 @@
+package com.nokia.task;
+
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.SftpException;
+import com.nokia.common.exception.MyRuntimeException;
+import com.nokia.common.gpload.GploadUtil;
+import com.nokia.common.gpload.entity.GploadResult;
+import com.nokia.common.ssh.SSHUtil;
+import com.nokia.dao.AreaDao;
+import com.nokia.pojo.Area;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Component
+@Slf4j
+@ConfigurationProperties("task")
+public class SyncTask {
+
+    private String host;
+    private Integer port;
+    private String username;
+    private String password;
+    private String sourceDir;
+    private String downloadDir;
+    private String distinctFilename;
+    private final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
+    private SSHUtil sshUtil;
+    private final AreaDao areaDao;
+
+    public SyncTask(AreaDao areaDao) {
+        this.areaDao = areaDao;
+    }
+
+    /**
+     * 同步top用户信息定时任务
+     *
+     */
+    @XxlJob("syncTopUser")
+    public void syncTopUser() {
+        try {
+            // 创建文件夹
+            Files.createDirectories(Paths.get(downloadDir));
+            sshUtil = new SSHUtil(host, port, username, password);
+            // 获取文件列表
+            List<String> list = sshUtil.ls(sourceDir);
+            if (CollectionUtils.isEmpty(list)) {
+                log.error("没有文件");
+                XxlJobHelper.log("没有文件");
+                XxlJobHelper.handleFail("没有文件");
+                return;
+            }
+
+            String filename = list.get(list.size() - 1);
+            log.info("同步的文件: {}", filename);
+            XxlJobHelper.log("同步的文件: {}", filename);
+            singleTask(filename);
+        } catch (Exception e) {
+            log.error("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.handleFail(e.getMessage());
+        } finally {
+            try {
+                sshUtil.disconnect();
+            } catch (IOException e) {
+                log.error("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.handleFail(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * 单一任务
+     *
+     * @param filename 文件名
+     */
+    public void singleTask(String filename) throws JSchException, IOException, SftpException {
+        download(filename);
+        distinct(filename);
+        gpload();
+    }
+
+    /**
+     * 下载文件
+     *
+     * @param filename 文件名
+     */
+    public void download(String filename) throws JSchException, IOException, SftpException {
+        log.info("下载文件: {}", filename);
+        XxlJobHelper.log("下载文件: {}", filename);
+        String src = sourceDir + filename;
+        String dst = downloadDir + filename;
+        sshUtil.get(src, dst);
+    }
+
+    /**
+     * 去重
+     *
+     * @param filename 文件名
+     */
+    public void distinct(String filename) throws IOException {
+        log.info("文件 {} 去重", filename);
+        XxlJobHelper.log("文件 {} 去重", filename);
+        String inputFilePath = downloadDir + filename;
+        Path inputPath = Paths.get(inputFilePath);
+        try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
+                .parse(new InputStreamReader(Files.newInputStream(inputPath), "gbk"));
+             OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(distinctFilename)),
+                     StandardCharsets.UTF_8);
+             CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
+            Map<String, CSVRecord> map = new HashMap<>();
+            // 去重
+            for (CSVRecord t : parser) {
+                if (t.getRecordNumber() == 1) {
+                    continue;
+                }
+                map.put(t.get(4), t);
+            }
+            log.info("{} 条数据", map.size());
+            XxlJobHelper.log("{} 条数据", map.size());
+            if (CollectionUtils.isEmpty(map)) {
+                throw new MyRuntimeException("数据为空");
+            }
+            // 查询地区
+            List<Area> areas = areaDao.getAll();
+            Map<String, Area> cityMap = new HashMap<>();
+            Map<Integer, List<Area>> areaMap = new HashMap<>();
+            for (Area t : areas) {
+                if (t.getTypeCode().equals(2)) {
+                    cityMap.put(t.getAreaName(), t);
+                    areaMap.putIfAbsent(t.getAreaId(), new ArrayList<>());
+                    for (Area tt : areas) {
+                        if (tt.getTypeCode().equals(3) && tt.getParentId().equals(t.getAreaId())) {
+                            areaMap.get(t.getAreaId()).add(tt);
+                        }
+                    }
+                }
+            }
+            for (CSVRecord t : map.values()) {
+                String orgId = t.get(0);
+                String orgName = t.get(1);
+                String userId = t.get(2);
+                String userName = t.get(3);
+                String loginName = t.get(4);
+                String phone = t.get(5);
+                String employeeCode = t.get(6);
+                Integer provinceId = -1;
+                Integer cityId = null;
+                Integer areaId = null;
+                for (Map.Entry<String, Area> tt : cityMap.entrySet()) {
+                    // 匹配地市
+                    if (orgName.contains(tt.getKey())) {
+                        Area area = tt.getValue();
+                        cityId = area.getAreaId();
+                        List<Area> areaList = areaMap.get(area.getAreaId());
+                        for (Area ttt : areaList) {
+                            // 匹配区县
+                            if (orgName.contains(ttt.getAreaName())) {
+                                areaId = ttt.getAreaId();
+                                break;
+                            }
+                        }
+                        break;
+                    }
+                }
+                printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId, cityId, areaId);
+            }
+        }
+
+        log.info("文件 {} 去重完成", filename);
+        XxlJobHelper.log("文件 {} 去重完成", filename);
+        // 删除本地源文件
+        Files.deleteIfExists(inputPath);
+        log.info("删除本地源文件 {}", filename);
+        XxlJobHelper.log("删除本地源文件 {}", filename);
+    }
+
+    public void gpload() throws IOException {
+        String gploadCommand = "sh /data1/acl/gpload/gpload.sh";
+        GploadResult gpload = GploadUtil.gpload(gploadCommand);
+        if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
+            log.info("gpload完成: {}", gpload);
+            XxlJobHelper.log("gpload完成: {}", gpload);
+            // 删除文件
+            Files.deleteIfExists(Paths.get(distinctFilename));
+        } else {
+            throw new MyRuntimeException("gpload失败: " + gpload.getMessage());
+        }
+    }
+}

+ 22 - 0
src/main/resources/application-test.properties

@@ -23,3 +23,25 @@ minio.bucket=acl-tousu-test
 minio.accessKey=QBZSEFMteKaz69Sh
 minio.secretKey=6hNzjymqo3PCX7oQRQ8ESGePzK3b52Dc
 minio.expiry=15
+task.host=192.168.70.130
+task.port=22
+task.username=do
+task.password=Richr00t
+task.sourceDir=/data1/esbftp/top_user/
+task.downloadDir=download/
+task.distinctFilename=top-user.csv
+### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
+xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+### xxl-job, access token
+xxl.job.accessToken=
+### xxl-job executor appname
+xxl.job.executor.appname=acl-tousu-test
+### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
+xxl.job.executor.address=
+### xxl-job executor server-info
+xxl.job.executor.ip=
+xxl.job.executor.port=9998
+### xxl-job executor log-path
+xxl.job.executor.logpath=/data1/acl/xxl/
+### xxl-job executor log-retention-days
+xxl.job.executor.logretentiondays=30

+ 90 - 3
src/test/java/com/nokia/OtherTest.java

@@ -1,8 +1,12 @@
 package com.nokia;
 
+import com.alibaba.fastjson2.JSON;
 import com.google.gson.JsonObject;
 import com.google.gson.JsonParser;
+import com.nokia.dao.AreaDao;
 import com.nokia.dao.RoleDao;
+import com.nokia.pojo.Area;
+import com.nokia.task.SyncTask;
 import com.nokia.vo.TokenFlagVo;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.csv.CSVFormat;
@@ -18,14 +22,13 @@ import org.springframework.test.context.ActiveProfiles;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
 
+import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -40,6 +43,90 @@ class OtherTest {
     private JdbcTemplate jdbcTemplate;
     @Autowired
     private RoleDao roleDao;
+    @Autowired
+    private AreaDao areaDao;
+    @Autowired
+    private SyncTask syncTask;
+
+    @Test
+    void testSync() {
+        syncTask.syncTopUser();
+    }
+
+    @Test
+    void testUpdateTopUser() {
+        List<Area> areas = areaDao.getAll();
+        log.debug("areas: {}", JSON.toJSONString(areas));
+        Map<String, Area> cityMap = new HashMap<>();
+        Map<Integer, List<Area>> areaMap = new HashMap<>();
+        for (Area t : areas) {
+            if (t.getTypeCode().equals(2)) {
+                cityMap.put(t.getAreaName(), t);
+                areaMap.putIfAbsent(t.getAreaId(), new ArrayList<>());
+                for (Area tt : areas) {
+                    if (tt.getTypeCode().equals(3) && tt.getParentId().equals(t.getAreaId())) {
+                        areaMap.get(t.getAreaId()).add(tt);
+                    }
+                }
+            }
+        }
+        log.debug("cityMap: {}", JSON.toJSONString(cityMap));
+        log.debug("areaMap: {}", JSON.toJSONString(areaMap));
+        // 创建文件夹
+        try {
+            Files.createDirectories(Paths.get("test/result/"));
+        } catch (IOException e) {
+            log.error("创建文件夹失败: {}", e.getMessage(), e);
+            return;
+        }
+        String inputPath = "test/data/data.csv";
+        String outputPath = "test/result/result.csv";
+        try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
+                .parse(new InputStreamReader(Files.newInputStream(Paths.get(inputPath)), "gbk"));
+             OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(outputPath)),
+                     StandardCharsets.UTF_8);
+             CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
+            printer.printRecord("login_name","org_id","org_name","user_id","user_name","phone","employee_code","province_id","city_id","area_id", "province_name", "city_name", "area_name");
+            for (CSVRecord t : parser) {
+                if (t.getRecordNumber() == 1) {
+                    continue;
+                }
+                String orgId = t.get(0);
+                String orgName = t.get(1);
+                String userId = t.get(2);
+                String userName = t.get(3);
+                String loginName = t.get(4);
+                String phone = t.get(5);
+                String employeeCode = t.get(6);
+                Integer provinceId = -1;
+                Integer cityId = null;
+                Integer areaId = null;
+                String provinceName = "河北省";
+                String cityName = null;
+                String areaName = null;
+                log.info("{}: {}", t.getRecordNumber(), t);
+                for (Map.Entry<String, Area> tt : cityMap.entrySet()) {
+                    if (orgName.contains(tt.getKey())) {
+                        Area area = tt.getValue();
+                        cityId = area.getAreaId();
+                        cityName = area.getAreaName();
+                        List<Area> areaList = areaMap.get(area.getAreaId());
+                        for (Area ttt : areaList) {
+                            if (orgName.contains(ttt.getAreaName())) {
+                                areaId = ttt.getAreaId();
+                                areaName = ttt.getAreaName();
+                                break;
+                            }
+                        }
+                        break;
+                    }
+                }
+                printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId, cityId, areaId, provinceName, cityName, areaName);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
 
     @Test
     void testSplitAddress() {