123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- package com.nokia.task;
- import com.jcraft.jsch.JSchException;
- import com.jcraft.jsch.SftpException;
- import com.nokia.common.gpload.GploadUtil;
- import com.nokia.common.gpload.entity.GploadResult;
- import com.nokia.common.ssh.SSHUtil;
- import com.nokia.common.ssh.exception.SSHUtilException;
- import com.xxl.job.core.handler.annotation.XxlJob;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.csv.CSVFormat;
- import org.apache.commons.csv.CSVParser;
- import org.apache.commons.csv.CSVPrinter;
- import org.apache.commons.csv.CSVRecord;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Component;
- import org.springframework.util.CollectionUtils;
- import java.io.File;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.nio.charset.StandardCharsets;
- import java.nio.file.Files;
- import java.nio.file.Paths;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- @Slf4j
- /*@Task*/
- @Component
- public class LtePmTask {
- @Value("${lte.pm.download.host:10.17.180.55}")
- private String host;
- @Value("${lte.pm.download.port:22}")
- private Integer port;
- @Value("${lte.pm.download.username:nokia}")
- private String username;
- @Value("${lte.pm.download.password:Nokia*123}")
- private String password;
- @Value("${lte.pm.download.sourceDir:/data/out2/pm_4g_hour}")
- private String sourceDir;
- @Value("${lte.pm.download.targetDir:download/}")
- private String downloadTargetDir;
- @Value("${lte.pm.filePrefix:pm_4g_hour_}")
- private String filePrefix;
- @Value("${lte.pm.distinct.targetDir:distinct/}")
- private String distinctTargetDir;
- /* @XxlJob("execHandler")
- public void Test(){
- System.out.println("java------------------");
- }
- */
- @XxlJob("execHandlerPm4G")
- public void cronTask() throws JSchException, SSHUtilException, IOException, SftpException {
- System.out.println("已触发pm4G");
- log.info("1");
- // 创建文件夹
- Files.createDirectories(Paths.get(downloadTargetDir));
- log.info("2");
- Files.createDirectories(Paths.get(distinctTargetDir));
- log.info("3");
- SSHUtil sshUtil = new SSHUtil(host, port, username, password);
- log.info("4");
- // 获取文件列表
- List<String> list = sshUtil.ls(sourceDir);
- log.info("5");
- if (CollectionUtils.isEmpty(list)) {
- log.info("6");
- return;
- }
- log.debug("扫描到的文件: {}", list);
- for (String s : list) {
- String[] s1 = s.split("_");
- String[] split = s1[3].split("\\.");
- if (split[1].equals("csv")){
- singleTask(split[0]);
- }
- }
- }
- /* @AllowedTaskType({TaskType.CRON})
- public void cronTask(String hourDelay) throws JSchException, SSHUtilException, IOException {
- long hours = Long.parseLong(hourDelay);
- String hourString = dateFormat.format(new Date(System.currentTimeMillis() - 3600L * 1000L * hours));
- singleTask(hourString);
- }*/
- /*@AllowedTaskType({TaskType.TIMING, TaskType.IMMEDIATE})*/
- public void singleTask(String hourString) throws JSchException, SSHUtilException, IOException, SftpException {
- download(hourString);
- distinct(hourString);
- gpload(hourString);
- }
- public void download(String hourString) throws JSchException, SSHUtilException, IOException, SftpException {
- log.info("hourString: {}", hourString);
- SSHUtil sshUtil = new SSHUtil(host, port, username, password);
- String downloadFileName = filePrefix + hourString + ".csv";
- String sourceFilePath = sourceDir + "/" + downloadFileName;
- File targetFile = new File(downloadTargetDir + downloadFileName);
- if (!targetFile.exists()) {
- targetFile.getParentFile().mkdirs();
- }
- String targetPath = targetFile.getAbsolutePath();
- boolean b = sshUtil.scpFrom(sourceFilePath, targetPath);
- if (b) {
- log.debug("文件 {} 下载成功...", targetPath);
- // sshUtil.delete(sourceFilePath);
- }
- }
- public void distinct(String hourString) throws IOException {
- String fileName = filePrefix + hourString + ".csv";
- String inputFilePath = downloadTargetDir + fileName;
- String outputFilePath = distinctTargetDir + fileName;
- CSVFormat format = CSVFormat.DEFAULT.builder().build();
- CSVParser records = format.parse(new InputStreamReader(Files.newInputStream(Paths.get(inputFilePath)), StandardCharsets.UTF_8));
- Map<String, CSVRecord> recordMap = new HashMap<>();
- int count = 0;
- for (CSVRecord record : records) {
- recordMap.put(record.get(1), record);
- count++;
- }
- CSVPrinter printer = format.print(new File(outputFilePath), StandardCharsets.UTF_8);
- for (CSVRecord record : recordMap.values()) {
- printer.printRecord(record);
- }
- printer.flush();
- printer.close();
- log.debug("去重完成,原文件{}条数据,去重后{}条数据", count, recordMap.size());
- }
- public void gpload(String hourString) {
- String gploadCommand = "sh /data1/pm/pm_4g/pm_lte_gpload.sh " + hourString;
- GploadResult gpload = GploadUtil.gpload(gploadCommand);
- if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
- log.debug("gpload完成: {}", gpload);
- } else {
- log.error("gpload 失败: {}", gpload.getMessage());
- }
- }
- }
|