|
@@ -0,0 +1,182 @@
|
|
|
+package com.nokia.finance.tasks.jobs.car.shujucangku;
|
|
|
+
|
|
|
+import com.nokia.finance.tasks.common.exception.MyRuntimeException;
|
|
|
+import com.nokia.finance.tasks.common.utils.psql.PsqlUtil;
|
|
|
+import com.nokia.finance.tasks.config.JobConfig;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.csv.CSVFormat;
|
|
|
+import org.apache.commons.csv.CSVParser;
|
|
|
+import org.apache.commons.csv.CSVPrinter;
|
|
|
+import org.apache.commons.csv.CSVRecord;
|
|
|
+import org.springframework.scheduling.annotation.Scheduled;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.io.OutputStreamWriter;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.nio.file.Files;
|
|
|
+import java.nio.file.Path;
|
|
|
+import java.nio.file.Paths;
|
|
|
+import java.nio.file.StandardCopyOption;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.stream.Stream;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 河北价值管理平台北十车辆费用月数据入库定时任务
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class CarFeeBsJob {
|
|
|
+ private final JobConfig jobConfig;
|
|
|
+
|
|
|
+ public CarFeeBsJob(JobConfig jobConfig) {
|
|
|
+ this.jobConfig = jobConfig;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 执行任务
|
|
|
+ */
|
|
|
+ @Scheduled(cron = "0 28 6 12 * ?")
|
|
|
+ public void runJob() {
|
|
|
+ // 数据目录
|
|
|
+ Path dir = Paths.get(jobConfig.getCarFeeBsSourcePath());
|
|
|
+ try (Stream<Path> stream = Files.list(dir)) {
|
|
|
+ // 获取数据目录下的文件列表
|
|
|
+ List<Path> pathList = stream.filter(t -> !t.toString().endsWith(".MD5")).sorted().toList();
|
|
|
+ log.info("河北价值管理平台北十车辆费用月数据文件列表: {}", pathList);
|
|
|
+ if (CollectionUtils.isEmpty(pathList)) {
|
|
|
+ throw new MyRuntimeException("河北价值管理平台北十车辆费用月数据没有文件");
|
|
|
+ }
|
|
|
+ for (Path path : pathList) {
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ singleJob(path);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new MyRuntimeException(e);
|
|
|
+ }
|
|
|
+ }).get(1, TimeUnit.MINUTES);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 处理单个文件
|
|
|
+ *
|
|
|
+ * @param path 文件路径
|
|
|
+ */
|
|
|
+ public void singleJob(Path path) throws Exception {
|
|
|
+ List<Map<String, String>> list = readFile(path);
|
|
|
+ List<Map<String, String>> distinctList = dataProcessing(path, list);
|
|
|
+ Path csvPath = toCsv(path, distinctList);
|
|
|
+ copyCsv(csvPath);
|
|
|
+ move(path);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 读取文件
|
|
|
+ *
|
|
|
+ * @param path 文件路径
|
|
|
+ */
|
|
|
+ public List<Map<String, String>> readFile(Path path) throws Exception {
|
|
|
+ log.info("读取: {}", path);
|
|
|
+ List<String> headers = Stream.of("month_id", "area_no", "area_name", "kpi_id", "kpi_name", "tm_value",
|
|
|
+ "lm_value", "ty_value", "ly_value").toList();
|
|
|
+ int headerSize = headers.size();
|
|
|
+ char delimiter = 1;
|
|
|
+ try (CSVParser parser = CSVFormat.DEFAULT.builder().setDelimiter(delimiter).build()
|
|
|
+ .parse(new InputStreamReader(Files.newInputStream(path), StandardCharsets.UTF_8))
|
|
|
+ ) {
|
|
|
+ List<Map<String, String>> resultList = new ArrayList<>();
|
|
|
+ for (CSVRecord csvRecord : parser) {
|
|
|
+ Map<String, String> rowMap = new LinkedHashMap<>();
|
|
|
+ for (int i = 0; i < headerSize; i++) {
|
|
|
+ String header = headers.get(i);
|
|
|
+ String value = csvRecord.get(i);
|
|
|
+ // 删除空白字符
|
|
|
+ value = StringUtils.trimAllWhitespace(value);
|
|
|
+ rowMap.put(header, value);
|
|
|
+ }
|
|
|
+ resultList.add(rowMap);
|
|
|
+ }
|
|
|
+ return resultList;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 数据加工
|
|
|
+ *
|
|
|
+ * @param path 文件路径
|
|
|
+ * @param list 数据
|
|
|
+ */
|
|
|
+ public List<Map<String, String>> dataProcessing(Path path, List<Map<String, String>> list) {
|
|
|
+ for (Map<String, String> map : list) {
|
|
|
+ map.put("source", path.getFileName().toString());
|
|
|
+ }
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成csv
|
|
|
+ *
|
|
|
+ * @param path 源文件路径
|
|
|
+ * @param list 数据
|
|
|
+ */
|
|
|
+ public Path toCsv(Path path, List<Map<String, String>> list) throws Exception {
|
|
|
+ log.info("条数:{}", list.size());
|
|
|
+ Files.createDirectories(Paths.get(jobConfig.getCarFeeBsHistoryPath()));
|
|
|
+ Path csvPath = Paths.get(jobConfig.getCarFeeBsHistoryPath() + path.getFileName() + ".csv");
|
|
|
+ try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(csvPath),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
+ // 添加bom头避免excel乱码
|
|
|
+ osw.write('\ufeff');
|
|
|
+ Map<String, String> header = list.get(0);
|
|
|
+ // 表头
|
|
|
+ printer.printRecord(header.keySet());
|
|
|
+ for (Map<String, String> map : list) {
|
|
|
+ printer.printRecord(map.values());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return csvPath;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 导入数据库
|
|
|
+ *
|
|
|
+ * @param path 文件路径
|
|
|
+ */
|
|
|
+ public void copyCsv(Path path) {
|
|
|
+ String dbTable = "car.car_fee_bs";
|
|
|
+ String csv = path.toString();
|
|
|
+ String columns = "(month_id,area_no,area_name,kpi_id,kpi_name,tm_value,lm_value,ty_value,ly_value,source)";
|
|
|
+ Long timeout = 60000L;
|
|
|
+ PsqlUtil.copyCsv(jobConfig.getCopyScriptPath(), jobConfig.getDbHost(), jobConfig.getDbPort(),
|
|
|
+ jobConfig.getDbUsername(), jobConfig.getDbPassword(), jobConfig.getDbName(), dbTable, csv, columns,
|
|
|
+ timeout, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 移动源文件到历史文件夹
|
|
|
+ *
|
|
|
+ * @param path 源文件路径
|
|
|
+ */
|
|
|
+ public void move(Path path) throws Exception {
|
|
|
+ Files.move(Paths.get(path + ".MD5"),
|
|
|
+ Paths.get(jobConfig.getCarFeeBsHistoryPath(), path.getFileName().toString() + ".MD5"),
|
|
|
+ StandardCopyOption.REPLACE_EXISTING);
|
|
|
+ Files.move(path, Paths.get(jobConfig.getCarFeeBsHistoryPath(), path.getFileName().toString()),
|
|
|
+ StandardCopyOption.REPLACE_EXISTING);
|
|
|
+ }
|
|
|
+}
|