|
- package com.nokia.task;
- import com.jcraft.jsch.JSchException;
- import com.jcraft.jsch.SftpException;
- import com.nokia.common.exception.MyRuntimeException;
- import com.nokia.common.gpload.GploadUtil;
- import com.nokia.common.gpload.entity.GploadResult;
- import com.nokia.common.ssh.SSHUtil;
- import com.nokia.dao.AreaDao;
- import com.nokia.pojo.Area;
- import com.xxl.job.core.context.XxlJobHelper;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import lombok.Data;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.csv.CSVFormat;
- import org.apache.commons.csv.CSVParser;
- import org.apache.commons.csv.CSVPrinter;
- import org.apache.commons.csv.CSVRecord;
- import org.springframework.boot.context.properties.ConfigurationProperties;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.OutputStreamWriter;
- import java.nio.charset.StandardCharsets;
- import java.nio.file.Files;
- import java.nio.file.Path;
- import java.nio.file.Paths;
- import java.text.DateFormat;
- import java.text.SimpleDateFormat;
- import java.util.ArrayList;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- @Data
- @Component
- @Slf4j
- @ConfigurationProperties("task")
- public class SyncTask {
- private String host;
- private Integer port;
- private String username;
- private String password;
- private String sourceDir;
- private String downloadDir;
- private String distinctFilename;
- private final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
- private SSHUtil sshUtil;
- private final AreaDao areaDao;
- public SyncTask(AreaDao areaDao) {
- this.areaDao = areaDao;
- }
- /**
- * 同步top用户信息定时任务
- *
- */
- @XxlJob("syncTopUser")
- public void syncTopUser() {
- try {
- // 创建文件夹
- Files.createDirectories(Paths.get(downloadDir));
- sshUtil = new SSHUtil(host, port, username, password);
- // 获取文件列表
- List<String> list = sshUtil.ls(sourceDir);
- if (CollectionUtils.isEmpty(list)) {
- log.error("没有文件");
- XxlJobHelper.log("没有文件");
- XxlJobHelper.handleFail("没有文件");
- return;
- }
- String filename = list.get(list.size() - 1);
- log.info("同步的文件: {}", filename);
- XxlJobHelper.log("同步的文件: {}", filename);
- singleTask(filename);
- } catch (Exception e) {
- log.error("发生异常了: {}", e.getMessage(), e);
- XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
- XxlJobHelper.handleFail(e.getMessage());
- } finally {
- try {
- sshUtil.disconnect();
- } catch (IOException e) {
- log.error("发生异常了: {}", e.getMessage(), e);
- XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
- XxlJobHelper.handleFail(e.getMessage());
- }
- }
- }
- /**
- * 单一任务
- *
- * @param filename 文件名
- */
- public void singleTask(String filename) throws JSchException, IOException, SftpException {
- download(filename);
- distinct(filename);
- gpload();
- }
- /**
- * 下载文件
- *
- * @param filename 文件名
- */
- public void download(String filename) throws JSchException, IOException, SftpException {
- log.info("下载文件: {}", filename);
- XxlJobHelper.log("下载文件: {}", filename);
- String src = sourceDir + filename;
- String dst = downloadDir + filename;
- sshUtil.get(src, dst);
- }
- /**
- * 去重
- *
- * @param filename 文件名
- */
- public void distinct(String filename) throws IOException {
- log.info("文件 {} 去重", filename);
- XxlJobHelper.log("文件 {} 去重", filename);
- String inputFilePath = downloadDir + filename;
- Path inputPath = Paths.get(inputFilePath);
- try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
- .parse(new InputStreamReader(Files.newInputStream(inputPath), "gbk"));
- OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(distinctFilename)),
- StandardCharsets.UTF_8);
- CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
- Map<String, CSVRecord> map = new HashMap<>();
- // 去重
- for (CSVRecord t : parser) {
- if (t.getRecordNumber() == 1) {
- continue;
- }
- map.put(t.get(4), t);
- }
- log.info("{} 条数据", map.size());
- XxlJobHelper.log("{} 条数据", map.size());
- if (CollectionUtils.isEmpty(map)) {
- throw new MyRuntimeException("数据为空");
- }
- // 查询地区
- List<Area> areas = areaDao.getAll();
- Map<String, Area> cityMap = new HashMap<>();
- Map<Integer, List<Area>> areaMap = new HashMap<>();
- for (Area t : areas) {
- if (t.getTypeCode().equals(2)) {
- cityMap.put(t.getAreaName(), t);
- areaMap.putIfAbsent(t.getAreaId(), new ArrayList<>());
- for (Area tt : areas) {
- if (tt.getTypeCode().equals(3) && tt.getParentId().equals(t.getAreaId())) {
- areaMap.get(t.getAreaId()).add(tt);
- }
- }
- }
- }
- for (CSVRecord t : map.values()) {
- String orgId = t.get(0);
- String orgName = t.get(1);
- String userId = t.get(2);
- String userName = t.get(3);
- String loginName = t.get(4);
- String phone = t.get(5);
- String employeeCode = t.get(6);
- Integer provinceId = -1;
- Integer cityId = null;
- Integer areaId = null;
- for (Map.Entry<String, Area> tt : cityMap.entrySet()) {
- // 匹配地市
- if (orgName.contains(tt.getKey())) {
- Area area = tt.getValue();
- cityId = area.getAreaId();
- List<Area> areaList = areaMap.get(area.getAreaId());
- for (Area ttt : areaList) {
- // 匹配区县
- if (orgName.contains(ttt.getAreaName())) {
- areaId = ttt.getAreaId();
- break;
- }
- }
- break;
- }
- }
- printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId, cityId, areaId);
- }
- }
- log.info("文件 {} 去重完成", filename);
- XxlJobHelper.log("文件 {} 去重完成", filename);
- // 删除本地源文件
- Files.deleteIfExists(inputPath);
- log.info("删除本地源文件 {}", filename);
- XxlJobHelper.log("删除本地源文件 {}", filename);
- }
- public void gpload() throws IOException {
- String gploadCommand = "sh /data1/acl/gpload/gpload.sh";
- GploadResult gpload = GploadUtil.gpload(gploadCommand);
- if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
- log.info("gpload完成: {}", gpload);
- XxlJobHelper.log("gpload完成: {}", gpload);
- // 删除文件
- Files.deleteIfExists(Paths.get(distinctFilename));
- } else {
- throw new MyRuntimeException("gpload失败: " + gpload.getMessage());
- }
- }
- }
|