LtePmTask.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. package com.nokia.task;
  2. import com.jcraft.jsch.JSchException;
  3. import com.jcraft.jsch.SftpException;
  4. import com.nokia.common.gpload.GploadUtil;
  5. import com.nokia.common.gpload.entity.GploadResult;
  6. import com.nokia.common.ssh.SSHUtil;
  7. import com.nokia.common.ssh.exception.SSHUtilException;
  8. import com.xxl.job.core.handler.annotation.XxlJob;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.apache.commons.csv.CSVFormat;
  11. import org.apache.commons.csv.CSVParser;
  12. import org.apache.commons.csv.CSVPrinter;
  13. import org.apache.commons.csv.CSVRecord;
  14. import org.springframework.beans.factory.annotation.Value;
  15. import org.springframework.stereotype.Component;
  16. import org.springframework.util.CollectionUtils;
  17. import java.io.File;
  18. import java.io.IOException;
  19. import java.io.InputStreamReader;
  20. import java.nio.charset.StandardCharsets;
  21. import java.nio.file.Files;
  22. import java.nio.file.Paths;
  23. import java.util.HashMap;
  24. import java.util.List;
  25. import java.util.Map;
  26. @Slf4j
  27. /*@Task*/
  28. @Component
  29. public class LtePmTask {
  30. @Value("${lte.pm.download.host:10.17.180.55}")
  31. private String host;
  32. @Value("${lte.pm.download.port:22}")
  33. private Integer port;
  34. @Value("${lte.pm.download.username:nokia}")
  35. private String username;
  36. @Value("${lte.pm.download.password:Nokia*123}")
  37. private String password;
  38. @Value("${lte.pm.download.sourceDir:/data/out2/pm_4g_hour}")
  39. private String sourceDir;
  40. @Value("${lte.pm.download.targetDir:download/}")
  41. private String downloadTargetDir;
  42. @Value("${lte.pm.filePrefix:pm_4g_hour_}")
  43. private String filePrefix;
  44. @Value("${lte.pm.distinct.targetDir:distinct/}")
  45. private String distinctTargetDir;
  46. /* @XxlJob("execHandler")
  47. public void Test(){
  48. System.out.println("java------------------");
  49. }
  50. */
  51. @XxlJob("execHandlerPm4G")
  52. public void cronTask() throws JSchException, SSHUtilException, IOException, SftpException {
  53. System.out.println("已触发pm4G");
  54. log.info("1");
  55. // 创建文件夹
  56. Files.createDirectories(Paths.get(downloadTargetDir));
  57. log.info("2");
  58. Files.createDirectories(Paths.get(distinctTargetDir));
  59. log.info("3");
  60. SSHUtil sshUtil = new SSHUtil(host, port, username, password);
  61. log.info("4");
  62. // 获取文件列表
  63. List<String> list = sshUtil.ls(sourceDir);
  64. log.info("5");
  65. if (CollectionUtils.isEmpty(list)) {
  66. log.info("6");
  67. return;
  68. }
  69. log.debug("扫描到的文件: {}", list);
  70. for (String s : list) {
  71. String[] s1 = s.split("_");
  72. String[] split = s1[3].split("\\.");
  73. if (split[1].equals("csv")){
  74. singleTask(split[0]);
  75. }
  76. }
  77. }
  78. /* @AllowedTaskType({TaskType.CRON})
  79. public void cronTask(String hourDelay) throws JSchException, SSHUtilException, IOException {
  80. long hours = Long.parseLong(hourDelay);
  81. String hourString = dateFormat.format(new Date(System.currentTimeMillis() - 3600L * 1000L * hours));
  82. singleTask(hourString);
  83. }*/
  84. /*@AllowedTaskType({TaskType.TIMING, TaskType.IMMEDIATE})*/
  85. public void singleTask(String hourString) throws JSchException, SSHUtilException, IOException, SftpException {
  86. download(hourString);
  87. distinct(hourString);
  88. gpload(hourString);
  89. }
  90. public void download(String hourString) throws JSchException, SSHUtilException, IOException, SftpException {
  91. log.info("hourString: {}", hourString);
  92. SSHUtil sshUtil = new SSHUtil(host, port, username, password);
  93. String downloadFileName = filePrefix + hourString + ".csv";
  94. String sourceFilePath = sourceDir + "/" + downloadFileName;
  95. File targetFile = new File(downloadTargetDir + downloadFileName);
  96. if (!targetFile.exists()) {
  97. targetFile.getParentFile().mkdirs();
  98. }
  99. String targetPath = targetFile.getAbsolutePath();
  100. boolean b = sshUtil.scpFrom(sourceFilePath, targetPath);
  101. if (b) {
  102. log.debug("文件 {} 下载成功...", targetPath);
  103. // sshUtil.delete(sourceFilePath);
  104. }
  105. }
  106. public void distinct(String hourString) throws IOException {
  107. String fileName = filePrefix + hourString + ".csv";
  108. String inputFilePath = downloadTargetDir + fileName;
  109. String outputFilePath = distinctTargetDir + fileName;
  110. CSVFormat format = CSVFormat.DEFAULT.builder().build();
  111. CSVParser records = format.parse(new InputStreamReader(Files.newInputStream(Paths.get(inputFilePath)), StandardCharsets.UTF_8));
  112. Map<String, CSVRecord> recordMap = new HashMap<>();
  113. int count = 0;
  114. for (CSVRecord record : records) {
  115. recordMap.put(record.get(1), record);
  116. count++;
  117. }
  118. CSVPrinter printer = format.print(new File(outputFilePath), StandardCharsets.UTF_8);
  119. for (CSVRecord record : recordMap.values()) {
  120. printer.printRecord(record);
  121. }
  122. printer.flush();
  123. printer.close();
  124. log.debug("去重完成,原文件{}条数据,去重后{}条数据", count, recordMap.size());
  125. }
  126. public void gpload(String hourString) {
  127. String gploadCommand = "sh /data1/pm/pm_4g/pm_lte_gpload.sh " + hourString;
  128. GploadResult gpload = GploadUtil.gpload(gploadCommand);
  129. if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
  130. log.debug("gpload完成: {}", gpload);
  131. } else {
  132. log.error("gpload 失败: {}", gpload.getMessage());
  133. }
  134. }
  135. }