|
@@ -0,0 +1,118 @@
|
|
|
+package top.lifuquan.service;
|
|
|
+
|
|
|
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import top.lifuquan.pojo.TaskRecord;
|
|
|
+import top.lifuquan.util.CsvUtil;
|
|
|
+import top.lifuquan.util.ProcessUtil;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.nio.file.Paths;
|
|
|
+import java.text.ParseException;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.Date;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class CronService {
|
|
|
+
|
|
|
+ private final String localPath = "/data/terminal/source";
|
|
|
+ private final String distinctPath = "/data/terminal/distinct";
|
|
|
+
|
|
|
+ private final String distinctPrefix_5g = "g5_terminal_";
|
|
|
+ private final String taskType_5g = "5g";
|
|
|
+
|
|
|
+ private final String distinctPrefix_volte = "volte_terminal_";
|
|
|
+ private final String taskType_volte = "volte";
|
|
|
+
|
|
|
+ private final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd");
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private TaskRecordService recordService;
|
|
|
+
|
|
|
+ public void run5gTask(Date date) {
|
|
|
+ log.debug("开始执行{}任务", taskType_5g);
|
|
|
+ String dateStr = dateFormat.format(date);
|
|
|
+ // 获取任务记录
|
|
|
+ TaskRecord taskRecord = new TaskRecord();
|
|
|
+ taskRecord.setTaskType(taskType_5g);
|
|
|
+ taskRecord.setTaskDate(dateStr);
|
|
|
+ TaskRecord record = recordService.getOne(new QueryWrapper<>(taskRecord));
|
|
|
+ if (record != null) {
|
|
|
+ log.debug("已存在任务记录" + record.toString());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 开始执行任务
|
|
|
+ String dateString2 = dateFormat.format(date.getTime() + 1000 * 3600 * 24);
|
|
|
+ String fileName = "814153986171547648_5G终端及5G用户数据含5G终端开关状态_hprovince_DB服务_" + dateStr + "_1_12788_" + dateString2 + "1220.dat.gz";
|
|
|
+ // 1. scp文件到本地
|
|
|
+ log.info("准备下载文件 " + fileName);
|
|
|
+ boolean b = ProcessUtil.scpFile(fileName, localPath);
|
|
|
+ if (!b) {
|
|
|
+ log.info("文件{}scp失败", fileName);
|
|
|
+ taskRecord.setTaskStatus("scp失败");
|
|
|
+ recordService.save(taskRecord);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ taskRecord.setTaskStatus("scp完成");
|
|
|
+ // 2. 对文件进行去重
|
|
|
+ String localFilePath = Paths.get(localPath, fileName).toString();
|
|
|
+ String distinctFilePath = Paths.get(distinctPath, distinctPrefix_5g + dateStr + ".csv").toString();
|
|
|
+ log.debug("准备筛选文件 " + localFilePath + " 并将不重复的记录写入 " + distinctFilePath);
|
|
|
+ b = CsvUtil.distinct(localFilePath, distinctFilePath, 0, "|");
|
|
|
+ if (!b) {
|
|
|
+ log.info("文件{}去重失败", localFilePath);
|
|
|
+ taskRecord.setTaskStatus("去重失败");
|
|
|
+ recordService.save(taskRecord);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ taskRecord.setTaskStatus("去重完成");
|
|
|
+ // 3. gpload
|
|
|
+ taskRecord = ProcessUtil.gpLoad(dateStr, taskRecord);
|
|
|
+ taskRecord.setLastUpdateTime(new Date());
|
|
|
+ recordService.save(taskRecord);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void runVolteTask(Date date) {
|
|
|
+ log.debug("开始执行{}任务", taskType_volte);
|
|
|
+ String dateStr = dateFormat.format(date);
|
|
|
+ // 获取任务记录
|
|
|
+ TaskRecord taskRecord = new TaskRecord();
|
|
|
+ taskRecord.setTaskType(taskType_volte);
|
|
|
+ taskRecord.setTaskDate(dateStr);
|
|
|
+ TaskRecord record = recordService.getOne(new QueryWrapper<>(taskRecord));
|
|
|
+ if (record != null) {
|
|
|
+ log.debug("已存在任务记录" + record.toString());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ // 开始执行任务
|
|
|
+ String fileName = "752951054268059649_intf_list_volte_users_day_" + dateStr + "_0_127.dat";
|
|
|
+ // 1. scp文件到本地
|
|
|
+ log.info("准备下载文件 " + fileName);
|
|
|
+ boolean b = ProcessUtil.scpFile(fileName, localPath);
|
|
|
+ if (!b) {
|
|
|
+ log.info("文件{}scp失败", fileName);
|
|
|
+ taskRecord.setTaskStatus("scp失败");
|
|
|
+ recordService.save(taskRecord);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ taskRecord.setTaskStatus("scp完成");
|
|
|
+ // 2. 对文件进行去重
|
|
|
+ String localFilePath = Paths.get(localPath, fileName).toString();
|
|
|
+ String distinctFilePath = Paths.get(distinctPath, distinctPrefix_volte + dateStr + ".csv").toString();
|
|
|
+ log.debug("准备筛选文件 " + localFilePath + " 并将不重复的记录写入 " + distinctFilePath);
|
|
|
+ b = CsvUtil.distinct(localFilePath, distinctFilePath, 0, "|");
|
|
|
+ if (!b) {
|
|
|
+ log.info("文件{}去重失败", localFilePath);
|
|
|
+ taskRecord.setTaskStatus("去重失败");
|
|
|
+ recordService.save(taskRecord);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ taskRecord.setTaskStatus("去重完成");
|
|
|
+ // 3. gpload
|
|
|
+ taskRecord = ProcessUtil.gpLoad(dateStr, taskRecord);
|
|
|
+ taskRecord.setLastUpdateTime(new Date());
|
|
|
+ recordService.save(taskRecord);
|
|
|
+ }
|
|
|
+}
|