|
@@ -1,129 +0,0 @@
|
|
-package top.lifuquan.service;
|
|
|
|
-
|
|
|
|
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
|
-import top.lifuquan.pojo.TaskRecord;
|
|
|
|
-import top.lifuquan.util.CsvUtil;
|
|
|
|
-import top.lifuquan.util.ProcessUtil;
|
|
|
|
-
|
|
|
|
-import javax.annotation.Resource;
|
|
|
|
-import java.nio.file.Paths;
|
|
|
|
-import java.text.ParseException;
|
|
|
|
-import java.text.SimpleDateFormat;
|
|
|
|
-import java.util.Date;
|
|
|
|
-
|
|
|
|
-@Slf4j
|
|
|
|
-@Service
|
|
|
|
-public class CronService {
|
|
|
|
-
|
|
|
|
- private final String localPath = "/data/terminal/source";
|
|
|
|
- private final String distinctPath = "/data/terminal/distinct";
|
|
|
|
-
|
|
|
|
- private final String distinctPrefix_5g = "g5_terminal_";
|
|
|
|
- private final String taskType_5g = "5g";
|
|
|
|
-
|
|
|
|
- private final String distinctPrefix_volte = "volte_terminal_";
|
|
|
|
- private final String taskType_volte = "volte";
|
|
|
|
-
|
|
|
|
- private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
|
|
|
|
-
|
|
|
|
- @Resource
|
|
|
|
- private TaskRecordService recordService;
|
|
|
|
-
|
|
|
|
- public void run5gTask(Date date) {
|
|
|
|
- log.debug("开始执行{}任务", taskType_5g);
|
|
|
|
- String dateStr = dateFormat.format(date);
|
|
|
|
- // 获取任务记录
|
|
|
|
- TaskRecord taskRecord = new TaskRecord();
|
|
|
|
- taskRecord.setTaskType(taskType_5g);
|
|
|
|
- taskRecord.setTaskDate(dateStr);
|
|
|
|
- TaskRecord record = recordService.getOne(new QueryWrapper<>(taskRecord));
|
|
|
|
- if (record != null) {
|
|
|
|
- // 在已存在的记录上继续进行
|
|
|
|
- taskRecord = record;
|
|
|
|
- log.debug("已存在任务记录" + record.toString());
|
|
|
|
- if(record.getTaskStatus().equals("任务完成")) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- // 开始执行任务
|
|
|
|
- String dateString2 = dateFormat.format(date.getTime() + 1000 * 3600 * 24);
|
|
|
|
- String fileName = "814153986171547648_5G终端及5G用户数据含5G终端开关状态_hprovince_DB服务_" + dateStr + "_1_12788_" + dateString2 + "1220.dat.gz";
|
|
|
|
- // 1. scp文件到本地
|
|
|
|
- log.info("准备下载文件 " + fileName);
|
|
|
|
- boolean b = ProcessUtil.scpFile(fileName, localPath);
|
|
|
|
- if (!b) {
|
|
|
|
- log.info("文件{}scp失败", fileName);
|
|
|
|
- taskRecord.setTaskStatus("scp失败");
|
|
|
|
- taskRecord.setLastUpdateTime(new Date());
|
|
|
|
- recordService.saveOrUpdate(taskRecord);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- taskRecord.setTaskStatus("scp完成");
|
|
|
|
- // 2. 对文件进行去重
|
|
|
|
- String localFilePath = Paths.get(localPath, fileName).toString();
|
|
|
|
- String distinctFilePath = Paths.get(distinctPath, distinctPrefix_5g + dateStr + ".csv").toString();
|
|
|
|
- log.debug("准备筛选文件 " + localFilePath + " 并将不重复的记录写入 " + distinctFilePath);
|
|
|
|
- b = CsvUtil.distinct(localFilePath, distinctFilePath, 0, "|");
|
|
|
|
- if (!b) {
|
|
|
|
- log.info("文件{}去重失败", localFilePath);
|
|
|
|
- taskRecord.setTaskStatus("去重失败");
|
|
|
|
- taskRecord.setLastUpdateTime(new Date());
|
|
|
|
- recordService.saveOrUpdate(taskRecord);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- taskRecord.setTaskStatus("去重完成");
|
|
|
|
- // 3. gpload
|
|
|
|
- taskRecord = ProcessUtil.gpLoad(dateStr, taskRecord);
|
|
|
|
- taskRecord.setLastUpdateTime(new Date());
|
|
|
|
- recordService.saveOrUpdate(taskRecord);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- public void runVolteTask(Date date) {
|
|
|
|
- log.debug("开始执行{}任务", taskType_volte);
|
|
|
|
- String dateStr = dateFormat.format(date);
|
|
|
|
- // 获取任务记录
|
|
|
|
- TaskRecord taskRecord = new TaskRecord();
|
|
|
|
- taskRecord.setTaskType(taskType_volte);
|
|
|
|
- taskRecord.setTaskDate(dateStr);
|
|
|
|
- TaskRecord record = recordService.getOne(new QueryWrapper<>(taskRecord));
|
|
|
|
- if (record != null) {
|
|
|
|
- taskRecord = record;
|
|
|
|
- log.debug("已存在任务记录" + record.toString());
|
|
|
|
- if(record.getTaskStatus().equals("任务完成")) {
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- // 开始执行任务
|
|
|
|
- String fileName = "752951054268059649_intf_list_volte_users_day_" + dateStr + "_0_127.dat";
|
|
|
|
- // 1. scp文件到本地
|
|
|
|
- log.info("准备下载文件 " + fileName);
|
|
|
|
- boolean b = ProcessUtil.scpFile(fileName, localPath);
|
|
|
|
- if (!b) {
|
|
|
|
- log.info("文件{}scp失败", fileName);
|
|
|
|
- taskRecord.setTaskStatus("scp失败");
|
|
|
|
- taskRecord.setLastUpdateTime(new Date());
|
|
|
|
- recordService.saveOrUpdate(taskRecord);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- taskRecord.setTaskStatus("scp完成");
|
|
|
|
- // 2. 对文件进行去重
|
|
|
|
- String localFilePath = Paths.get(localPath, fileName).toString();
|
|
|
|
- String distinctFilePath = Paths.get(distinctPath, distinctPrefix_volte + dateStr + ".csv").toString();
|
|
|
|
- log.debug("准备筛选文件 " + localFilePath + " 并将不重复的记录写入 " + distinctFilePath);
|
|
|
|
- b = CsvUtil.distinct(localFilePath, distinctFilePath, 0, "|");
|
|
|
|
- if (!b) {
|
|
|
|
- log.info("文件{}去重失败", localFilePath);
|
|
|
|
- taskRecord.setTaskStatus("去重失败");
|
|
|
|
- taskRecord.setLastUpdateTime(new Date());
|
|
|
|
- recordService.saveOrUpdate(taskRecord);
|
|
|
|
- return;
|
|
|
|
- }
|
|
|
|
- taskRecord.setTaskStatus("去重完成");
|
|
|
|
- // 3. gpload
|
|
|
|
- taskRecord = ProcessUtil.gpLoad(dateStr, taskRecord);
|
|
|
|
- taskRecord.setLastUpdateTime(new Date());
|
|
|
|
- recordService.saveOrUpdate(taskRecord);
|
|
|
|
- }
|
|
|
|
-}
|
|
|