package com.nokia.task; import com.jcraft.jsch.JSchException; import com.jcraft.jsch.SftpException; import com.nokia.common.gpload.GploadUtil; import com.nokia.common.gpload.entity.GploadResult; import com.nokia.common.ssh.SSHUtil; import com.nokia.common.ssh.exception.SSHUtilException; import com.xxl.job.core.handler.annotation.XxlJob; import lombok.extern.slf4j.Slf4j; import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVPrinter; import org.apache.commons.csv.CSVRecord; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; import java.util.HashMap; import java.util.List; import java.util.Map; @Slf4j /*@Task*/ @Component public class LtePmTask { @Value("${lte.pm.download.host:10.17.180.55}") private String host; @Value("${lte.pm.download.port:22}") private Integer port; @Value("${lte.pm.download.username:nokia}") private String username; @Value("${lte.pm.download.password:Nokia*123}") private String password; @Value("${lte.pm.download.sourceDir:/data/out2/pm_4g_hour}") private String sourceDir; @Value("${lte.pm.download.targetDir:download/}") private String downloadTargetDir; @Value("${lte.pm.filePrefix:pm_4g_hour_}") private String filePrefix; @Value("${lte.pm.distinct.targetDir:distinct/}") private String distinctTargetDir; /* @XxlJob("execHandler") public void Test(){ System.out.println("java------------------"); } */ @XxlJob("execHandlerPm4G") public void cronTask() throws JSchException, SSHUtilException, IOException, SftpException { System.out.println("已触发pm4G"); log.info("1"); // 创建文件夹 Files.createDirectories(Paths.get(downloadTargetDir)); log.info("2"); Files.createDirectories(Paths.get(distinctTargetDir)); log.info("3"); SSHUtil sshUtil = new SSHUtil(host, port, username, password); log.info("4"); // 获取文件列表 List list = sshUtil.ls(sourceDir); log.info("5"); if (CollectionUtils.isEmpty(list)) { log.info("6"); return; } log.debug("扫描到的文件: {}", list); for (String s : list) { String[] s1 = s.split("_"); String[] split = s1[3].split("\\."); if (split[1].equals("csv")){ singleTask(split[0]); } } } /* @AllowedTaskType({TaskType.CRON}) public void cronTask(String hourDelay) throws JSchException, SSHUtilException, IOException { long hours = Long.parseLong(hourDelay); String hourString = dateFormat.format(new Date(System.currentTimeMillis() - 3600L * 1000L * hours)); singleTask(hourString); }*/ /*@AllowedTaskType({TaskType.TIMING, TaskType.IMMEDIATE})*/ public void singleTask(String hourString) throws JSchException, SSHUtilException, IOException, SftpException { download(hourString); distinct(hourString); gpload(hourString); } public void download(String hourString) throws JSchException, SSHUtilException, IOException, SftpException { log.info("hourString: {}", hourString); SSHUtil sshUtil = new SSHUtil(host, port, username, password); String downloadFileName = filePrefix + hourString + ".csv"; String sourceFilePath = sourceDir + "/" + downloadFileName; File targetFile = new File(downloadTargetDir + downloadFileName); if (!targetFile.exists()) { targetFile.getParentFile().mkdirs(); } String targetPath = targetFile.getAbsolutePath(); boolean b = sshUtil.scpFrom(sourceFilePath, targetPath); if (b) { log.debug("文件 {} 下载成功...", targetPath); // sshUtil.delete(sourceFilePath); } } public void distinct(String hourString) throws IOException { String fileName = filePrefix + hourString + ".csv"; String inputFilePath = downloadTargetDir + fileName; String outputFilePath = distinctTargetDir + fileName; CSVFormat format = CSVFormat.DEFAULT.builder().build(); CSVParser records = format.parse(new InputStreamReader(Files.newInputStream(Paths.get(inputFilePath)), StandardCharsets.UTF_8)); Map recordMap = new HashMap<>(); int count = 0; for (CSVRecord record : records) { recordMap.put(record.get(1), record); count++; } CSVPrinter printer = format.print(new File(outputFilePath), StandardCharsets.UTF_8); for (CSVRecord record : recordMap.values()) { printer.printRecord(record); } printer.flush(); printer.close(); log.debug("去重完成,原文件{}条数据,去重后{}条数据", count, recordMap.size()); } public void gpload(String hourString) { String gploadCommand = "sh /data1/pm/pm_4g/pm_lte_gpload.sh " + hourString; GploadResult gpload = GploadUtil.gpload(gploadCommand); if (Boolean.TRUE.equals(gpload.getTaskStatus())) { log.debug("gpload完成: {}", gpload); } else { log.error("gpload 失败: {}", gpload.getMessage()); } } }