SyncTask.java 7.8 KB


  1. package com.nokia.task;
  2. import com.jcraft.jsch.JSchException;
  3. import com.jcraft.jsch.SftpException;
  4. import com.nokia.common.exception.MyRuntimeException;
  5. import com.nokia.common.gpload.GploadUtil;
  6. import com.nokia.common.gpload.entity.GploadResult;
  7. import com.nokia.common.ssh.SSHUtil;
  8. import com.nokia.dao.AreaDao;
  9. import com.nokia.pojo.Area;
  10. import com.xxl.job.core.context.XxlJobHelper;
  11. import com.xxl.job.core.handler.annotation.XxlJob;
  12. import lombok.Data;
  13. import lombok.extern.slf4j.Slf4j;
  14. import org.apache.commons.csv.CSVFormat;
  15. import org.apache.commons.csv.CSVParser;
  16. import org.apache.commons.csv.CSVPrinter;
  17. import org.apache.commons.csv.CSVRecord;
  18. import org.springframework.boot.context.properties.ConfigurationProperties;
  19. import org.springframework.stereotype.Component;
  20. import org.springframework.util.CollectionUtils;
  21. import java.io.IOException;
  22. import java.io.InputStreamReader;
  23. import java.io.OutputStreamWriter;
  24. import java.nio.charset.StandardCharsets;
  25. import java.nio.file.Files;
  26. import java.nio.file.Path;
  27. import java.nio.file.Paths;
  28. import java.text.DateFormat;
  29. import java.text.SimpleDateFormat;
  30. import java.util.ArrayList;
  31. import java.util.HashMap;
  32. import java.util.List;
  33. import java.util.Map;
  34. @Data
  35. @Component
  36. @Slf4j
  37. @ConfigurationProperties("task")
  38. public class SyncTask {
  39. private String host;
  40. private Integer port;
  41. private String username;
  42. private String password;
  43. private String sourceDir;
  44. private String downloadDir;
  45. private String distinctFilename;
  46. private final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
  47. private SSHUtil sshUtil;
  48. private final AreaDao areaDao;
  49. public SyncTask(AreaDao areaDao) {
  50. this.areaDao = areaDao;
  51. }
  52. /**
  53. * 同步top用户信息定时任务
  54. *
  55. */
  56. @XxlJob("syncTopUser")
  57. public void syncTopUser() {
  58. try {
  59. // 创建文件夹
  60. Files.createDirectories(Paths.get(downloadDir));
  61. sshUtil = new SSHUtil(host, port, username, password);
  62. // 获取文件列表
  63. List<String> list = sshUtil.ls(sourceDir);
  64. if (CollectionUtils.isEmpty(list)) {
  65. log.error("没有文件");
  66. XxlJobHelper.log("没有文件");
  67. XxlJobHelper.handleFail("没有文件");
  68. return;
  69. }
  70. String filename = list.get(list.size() - 1);
  71. log.info("同步的文件: {}", filename);
  72. XxlJobHelper.log("同步的文件: {}", filename);
  73. singleTask(filename);
  74. } catch (Exception e) {
  75. log.error("发生异常了: {}", e.getMessage(), e);
  76. XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
  77. XxlJobHelper.handleFail(e.getMessage());
  78. } finally {
  79. try {
  80. sshUtil.disconnect();
  81. } catch (IOException e) {
  82. log.error("发生异常了: {}", e.getMessage(), e);
  83. XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
  84. XxlJobHelper.handleFail(e.getMessage());
  85. }
  86. }
  87. }
  88. /**
  89. * 单一任务
  90. *
  91. * @param filename 文件名
  92. */
  93. public void singleTask(String filename) throws JSchException, IOException, SftpException {
  94. download(filename);
  95. distinct(filename);
  96. gpload();
  97. }
  98. /**
  99. * 下载文件
  100. *
  101. * @param filename 文件名
  102. */
  103. public void download(String filename) throws JSchException, IOException, SftpException {
  104. log.info("下载文件: {}", filename);
  105. XxlJobHelper.log("下载文件: {}", filename);
  106. String src = sourceDir + filename;
  107. String dst = downloadDir + filename;
  108. sshUtil.get(src, dst);
  109. }
  110. /**
  111. * 去重
  112. *
  113. * @param filename 文件名
  114. */
  115. public void distinct(String filename) throws IOException {
  116. log.info("文件 {} 去重", filename);
  117. XxlJobHelper.log("文件 {} 去重", filename);
  118. String inputFilePath = downloadDir + filename;
  119. Path inputPath = Paths.get(inputFilePath);
  120. try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
  121. .parse(new InputStreamReader(Files.newInputStream(inputPath), "gbk"));
  122. OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(distinctFilename)),
  123. StandardCharsets.UTF_8);
  124. CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
  125. Map<String, CSVRecord> map = new HashMap<>();
  126. // 去重
  127. for (CSVRecord t : parser) {
  128. if (t.getRecordNumber() == 1) {
  129. continue;
  130. }
  131. map.put(t.get(4), t);
  132. }
  133. log.info("{} 条数据", map.size());
  134. XxlJobHelper.log("{} 条数据", map.size());
  135. if (CollectionUtils.isEmpty(map)) {
  136. throw new MyRuntimeException("数据为空");
  137. }
  138. // 查询地区
  139. List<Area> areas = areaDao.getAll();
  140. Map<String, Area> cityMap = new HashMap<>();
  141. Map<Integer, List<Area>> areaMap = new HashMap<>();
  142. for (Area t : areas) {
  143. if (t.getTypeCode().equals(2)) {
  144. cityMap.put(t.getAreaName(), t);
  145. areaMap.putIfAbsent(t.getAreaId(), new ArrayList<>());
  146. for (Area tt : areas) {
  147. if (tt.getTypeCode().equals(3) && tt.getParentId().equals(t.getAreaId())) {
  148. areaMap.get(t.getAreaId()).add(tt);
  149. }
  150. }
  151. }
  152. }
  153. for (CSVRecord t : map.values()) {
  154. String orgId = t.get(0);
  155. String orgName = t.get(1);
  156. String userId = t.get(2);
  157. String userName = t.get(3);
  158. String loginName = t.get(4);
  159. String phone = t.get(5);
  160. String employeeCode = t.get(6);
  161. Integer provinceId = -1;
  162. Integer cityId = null;
  163. Integer areaId = null;
  164. for (Map.Entry<String, Area> tt : cityMap.entrySet()) {
  165. // 匹配地市
  166. if (orgName.contains(tt.getKey())) {
  167. Area area = tt.getValue();
  168. cityId = area.getAreaId();
  169. List<Area> areaList = areaMap.get(area.getAreaId());
  170. for (Area ttt : areaList) {
  171. // 匹配区县
  172. if (orgName.contains(ttt.getAreaName())) {
  173. areaId = ttt.getAreaId();
  174. break;
  175. }
  176. }
  177. break;
  178. }
  179. }
  180. printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId, cityId, areaId);
  181. }
  182. }
  183. log.info("文件 {} 去重完成", filename);
  184. XxlJobHelper.log("文件 {} 去重完成", filename);
  185. // 删除本地源文件
  186. Files.deleteIfExists(inputPath);
  187. log.info("删除本地源文件 {}", filename);
  188. XxlJobHelper.log("删除本地源文件 {}", filename);
  189. }
  190. public void gpload() throws IOException {
  191. String gploadCommand = "sh /data1/acl/gpload/gpload.sh";
  192. GploadResult gpload = GploadUtil.gpload(gploadCommand);
  193. if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
  194. log.info("gpload完成: {}", gpload);
  195. XxlJobHelper.log("gpload完成: {}", gpload);
  196. // 删除文件
  197. Files.deleteIfExists(Paths.get(distinctFilename));
  198. } else {
  199. throw new MyRuntimeException("gpload失败: " + gpload.getMessage());
  200. }
  201. }
  202. }