package com.nokia.task; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.SftpException; import com.nokia.common.exception.MyRuntimeException; import com.nokia.common.gpload.GploadUtil; import com.nokia.common.gpload.entity.GploadResult; import com.nokia.common.ssh.SSHUtil; import com.nokia.dao.AreaDao; import com.nokia.pojo.Area; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVPrinter; import org.apache.commons.csv.CSVRecord; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @Data @Component @Slf4j @ConfigurationProperties("task") public class SyncTask { private String host; private Integer port; private String username; private String password; private String sourceDir; private String downloadDir; private String distinctFilename; private final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH"); private SSHUtil sshUtil; private final AreaDao areaDao; public SyncTask(AreaDao areaDao) { this.areaDao = areaDao; } /** * 同步top用户信息定时任务 * */ @XxlJob("syncTopUser") public void syncTopUser() { try { // 创建文件夹 Files.createDirectories(Paths.get(downloadDir)); sshUtil = new SSHUtil(host, port, username, password); // 获取文件列表 List list = sshUtil.ls(sourceDir); if (CollectionUtils.isEmpty(list)) { log.error("没有文件"); XxlJobHelper.log("没有文件"); XxlJobHelper.handleFail("没有文件"); return; } String filename = list.get(list.size() - 1); log.info("同步的文件: {}", filename); XxlJobHelper.log("同步的文件: {}", filename); singleTask(filename); } catch (Exception e) { log.error("发生异常了: {}", e.getMessage(), e); XxlJobHelper.log("发生异常了: {}", e.getMessage(), e); XxlJobHelper.handleFail(e.getMessage()); } finally { try { sshUtil.disconnect(); } catch (IOException e) { log.error("发生异常了: {}", e.getMessage(), e); XxlJobHelper.log("发生异常了: {}", e.getMessage(), e); XxlJobHelper.handleFail(e.getMessage()); } } } /** * 单一任务 * * @param filename 文件名 */ public void singleTask(String filename) throws JSchException, IOException, SftpException { download(filename); distinct(filename); gpload(); } /** * 下载文件 * * @param filename 文件名 */ public void download(String filename) throws JSchException, IOException, SftpException { log.info("下载文件: {}", filename); XxlJobHelper.log("下载文件: {}", filename); String src = sourceDir + filename; String dst = downloadDir + filename; sshUtil.get(src, dst); } /** * 去重 * * @param filename 文件名 */ public void distinct(String filename) throws IOException { log.info("文件 {} 去重", filename); XxlJobHelper.log("文件 {} 去重", filename); String inputFilePath = downloadDir + filename; Path inputPath = Paths.get(inputFilePath); try (CSVParser parser = CSVFormat.DEFAULT.builder().build() .parse(new InputStreamReader(Files.newInputStream(inputPath), "gbk")); OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(distinctFilename)), StandardCharsets.UTF_8); CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) { Map map = new HashMap<>(); // 去重 for (CSVRecord t : parser) { if (t.getRecordNumber() == 1) { continue; } map.put(t.get(4), t); } log.info("{} 条数据", map.size()); XxlJobHelper.log("{} 条数据", map.size()); if (CollectionUtils.isEmpty(map)) { throw new MyRuntimeException("数据为空"); } // 查询地区 List areas = areaDao.getAll(); Map cityMap = new HashMap<>(); Map> areaMap = new HashMap<>(); for (Area t : areas) { if (t.getTypeCode().equals(2)) { cityMap.put(t.getAreaName(), t); areaMap.putIfAbsent(t.getAreaId(), new ArrayList<>()); for (Area tt : areas) { if (tt.getTypeCode().equals(3) && tt.getParentId().equals(t.getAreaId())) { areaMap.get(t.getAreaId()).add(tt); } } } } for (CSVRecord t : map.values()) { String orgId = t.get(0); String orgName = t.get(1); String userId = t.get(2); String userName = t.get(3); String loginName = t.get(4); String phone = t.get(5); String employeeCode = t.get(6); Integer provinceId = -1; Integer cityId = null; Integer areaId = null; for (Map.Entry tt : cityMap.entrySet()) { // 匹配地市 if (orgName.contains(tt.getKey())) { Area area = tt.getValue(); cityId = area.getAreaId(); List areaList = areaMap.get(area.getAreaId()); for (Area ttt : areaList) { // 匹配区县 if (orgName.contains(ttt.getAreaName())) { areaId = ttt.getAreaId(); break; } } break; } } printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId, cityId, areaId); } } log.info("文件 {} 去重完成", filename); XxlJobHelper.log("文件 {} 去重完成", filename); // 删除本地源文件 Files.deleteIfExists(inputPath); log.info("删除本地源文件 {}", filename); XxlJobHelper.log("删除本地源文件 {}", filename); } public void gpload() throws IOException { String gploadCommand = "sh /data1/acl/gpload/gpload.sh"; GploadResult gpload = GploadUtil.gpload(gploadCommand); if (Boolean.TRUE.equals(gpload.getTaskStatus())) { log.info("gpload完成: {}", gpload); XxlJobHelper.log("gpload完成: {}", gpload); // 删除文件 Files.deleteIfExists(Paths.get(distinctFilename)); } else { throw new MyRuntimeException("gpload失败: " + gpload.getMessage()); } } }