|
@@ -4,6 +4,7 @@ import com.jcraft.jsch.JSchException;
|
|
|
import com.jcraft.jsch.SftpException;
|
|
|
import com.nokia.common.gpload.GploadUtil;
|
|
|
import com.nokia.common.gpload.entity.GploadResult;
|
|
|
+import com.nokia.common.psql.PsqlUtil;
|
|
|
import com.nokia.common.ssh.SSHUtil;
|
|
|
import com.xxl.job.core.context.XxlJobHelper;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
@@ -13,7 +14,7 @@ import org.apache.commons.csv.CSVFormat;
|
|
|
import org.apache.commons.csv.CSVParser;
|
|
|
import org.apache.commons.csv.CSVPrinter;
|
|
|
import org.apache.commons.csv.CSVRecord;
|
|
|
-import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
import org.springframework.util.StringUtils;
|
|
@@ -27,41 +28,99 @@ import java.nio.charset.StandardCharsets;
|
|
|
import java.nio.file.Files;
|
|
|
import java.nio.file.Path;
|
|
|
import java.nio.file.Paths;
|
|
|
-import java.rmi.RemoteException;
|
|
|
import java.text.DateFormat;
|
|
|
import java.text.SimpleDateFormat;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
|
import java.util.stream.Stream;
|
|
|
|
|
|
+@ConfigurationProperties("task")
|
|
|
@Data
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
public class FiveGPmTask {
|
|
|
-
|
|
|
- @Value("${fiveg.pm.download.host:10.17.180.55}")
|
|
|
- private String host;
|
|
|
- @Value("${fiveg.pm.download.port:22}")
|
|
|
- private Integer port;
|
|
|
- @Value("${fiveg.pm.download.username:nokia}")
|
|
|
- private String username;
|
|
|
- @Value("${fiveg.pm.download.password:Nokia*123}")
|
|
|
- private String password;
|
|
|
- @Value("${fiveg.pm.download.sourceDir:/data/out2/pm_5g_hour/}")
|
|
|
+ /**
|
|
|
+ * 无文件次数告警值
|
|
|
+ */
|
|
|
+ private static final int ALERT_COUNT = 12;
|
|
|
+ /**
|
|
|
+ * 无文件次数
|
|
|
+ */
|
|
|
+ private static final AtomicInteger NO_FILE_COUNT = new AtomicInteger(0);
|
|
|
+ /**
|
|
|
+ * ssh ip
|
|
|
+ */
|
|
|
+ private String sshHost;
|
|
|
+ /**
|
|
|
+ * ssh端口
|
|
|
+ */
|
|
|
+ private Integer sshPort;
|
|
|
+ /**
|
|
|
+ * ssh账号
|
|
|
+ */
|
|
|
+ private String sshUsername;
|
|
|
+ /**
|
|
|
+ * ssh密码
|
|
|
+ */
|
|
|
+ private String sshPassword;
|
|
|
+ /**
|
|
|
+ * 5gpm数据文件目录
|
|
|
+ */
|
|
|
private String sourceDir;
|
|
|
- @Value("${fiveg.pm.download.targetDir:download/}")
|
|
|
- private String downloadTargetDir;
|
|
|
- @Value("${fiveg.pm.filePrefix:pm_5g_hour_}")
|
|
|
+ /**
|
|
|
+ * 5gpm数据文件下载目录
|
|
|
+ */
|
|
|
+ private String downloadDir;
|
|
|
+ /**
|
|
|
+ * 5gpm数据文件前缀
|
|
|
+ */
|
|
|
private String filePrefix;
|
|
|
- @Value("${fiveg.pm.distinct.targetDir:distinct/}")
|
|
|
- private String distinctTargetDir;
|
|
|
+ /**
|
|
|
+ * 5gpm数据文件去重目录
|
|
|
+ */
|
|
|
+ private String distinctDir;
|
|
|
+ /**
|
|
|
+ * 导入数据脚本路径
|
|
|
+ */
|
|
|
+ private String importScript;
|
|
|
+ /**
|
|
|
+ * 导入的数据库ip
|
|
|
+ */
|
|
|
+ private String dbHost;
|
|
|
+ /**
|
|
|
+ * 数据库端口
|
|
|
+ */
|
|
|
+ private Integer dbPort;
|
|
|
+ /**
|
|
|
+ * 数据库账号
|
|
|
+ */
|
|
|
+ private String dbUsername;
|
|
|
+ /**
|
|
|
+ * 数据库密码
|
|
|
+ */
|
|
|
+ private String dbPassword;
|
|
|
+ /**
|
|
|
+ * 数据库名称
|
|
|
+ */
|
|
|
+ private String dbName;
|
|
|
+ /**
|
|
|
+ * 数据库表
|
|
|
+ */
|
|
|
+ private String dbTable;
|
|
|
/**
|
|
|
* 最小插入数据
|
|
|
*/
|
|
|
- @Value("${fiveg.pm.minInsertCount:10000}")
|
|
|
- private long minInsertCount;
|
|
|
+ private Long minInsertCount;
|
|
|
+ /**
|
|
|
+ * 超时分钟
|
|
|
+ */
|
|
|
+ private Long timeout;
|
|
|
private final DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHH");
|
|
|
private SSHUtil sshUtil;
|
|
|
|
|
@@ -74,22 +133,52 @@ public class FiveGPmTask {
|
|
|
public void pm5gJobHandler() {
|
|
|
try {
|
|
|
// 创建文件夹
|
|
|
- Files.createDirectories(Paths.get(downloadTargetDir));
|
|
|
- Files.createDirectories(Paths.get(distinctTargetDir));
|
|
|
- sshUtil = new SSHUtil(host, port, username, password);
|
|
|
+ Files.createDirectories(Paths.get(downloadDir));
|
|
|
+ Files.createDirectories(Paths.get(distinctDir));
|
|
|
+ sshUtil = new SSHUtil(sshHost, sshPort, sshUsername, sshPassword);
|
|
|
// 获取文件列表
|
|
|
List<String> list = sshUtil.ls(sourceDir);
|
|
|
if (CollectionUtils.isEmpty(list)) {
|
|
|
- log.info("没有文件");
|
|
|
- XxlJobHelper.log("没有文件");
|
|
|
+ int count = NO_FILE_COUNT.incrementAndGet();
|
|
|
+ log.info("{}次没有文件", count);
|
|
|
+ XxlJobHelper.log("{}次没有文件", count);
|
|
|
+ // 无文件次数告警
|
|
|
+ if (count > ALERT_COUNT) {
|
|
|
+ log.error("超过{}次没有文件", ALERT_COUNT);
|
|
|
+ XxlJobHelper.log("超过{}次没有文件", ALERT_COUNT);
|
|
|
+ // 重置无文件计数
|
|
|
+ NO_FILE_COUNT.set(0);
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ // 重置无文件计数
|
|
|
+ NO_FILE_COUNT.set(0);
|
|
|
log.info("扫描到的文件: {}", list);
|
|
|
XxlJobHelper.log("扫描到的文件: {}", list);
|
|
|
for (String t : list) {
|
|
|
- singleTask(t);
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ singleTask(t);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }).get(timeout, TimeUnit.MINUTES);
|
|
|
}
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ XxlJobHelper.handleFail(e.getMessage());
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ log.error("执行超时: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("执行超时: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.handleFail(e.getMessage());
|
|
|
} catch (Exception e) {
|
|
|
log.error("发生异常了: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
|
|
@@ -98,8 +187,8 @@ public class FiveGPmTask {
|
|
|
try {
|
|
|
sshUtil.disconnect();
|
|
|
} catch (IOException e) {
|
|
|
- log.error("发生异常了: {}", e.getMessage(), e);
|
|
|
- XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
|
|
|
+ log.error("ssh关闭异常: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("ssh关闭异常: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.handleFail(e.getMessage());
|
|
|
}
|
|
|
}
|
|
@@ -108,7 +197,7 @@ public class FiveGPmTask {
|
|
|
@XxlJob("pm5gRestHandler")
|
|
|
public void pm5gRestHandler() {
|
|
|
// 读取目标文件夹下的文件
|
|
|
- try (Stream<Path> stream = Files.list(Paths.get(distinctTargetDir))) {
|
|
|
+ try (Stream<Path> stream = Files.list(Paths.get(distinctDir))) {
|
|
|
List<Path> pathList = stream.collect(Collectors.toList());
|
|
|
log.info("pathList: {}", pathList);
|
|
|
XxlJobHelper.log("pathList: {}", pathList);
|
|
@@ -116,8 +205,28 @@ public class FiveGPmTask {
|
|
|
String filename = t.getFileName().toString();
|
|
|
log.info("file: {}", filename);
|
|
|
XxlJobHelper.log("file: {}", filename);
|
|
|
- gpload(filename);
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ importData(filename);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }).get(timeout, TimeUnit.MINUTES);
|
|
|
}
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ XxlJobHelper.handleFail(e.getMessage());
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ log.error("执行超时: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("执行超时: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.handleFail(e.getMessage());
|
|
|
} catch (Exception e) {
|
|
|
log.error("发生异常了: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
|
|
@@ -130,10 +239,10 @@ public class FiveGPmTask {
|
|
|
*
|
|
|
* @param filename 文件名
|
|
|
*/
|
|
|
- public void singleTask(String filename) throws JSchException, IOException, SftpException {
|
|
|
+ public void singleTask(String filename) throws JSchException, IOException, SftpException, InterruptedException {
|
|
|
download(filename);
|
|
|
distinct(filename);
|
|
|
- gpload(filename);
|
|
|
+ importData(filename);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -145,7 +254,7 @@ public class FiveGPmTask {
|
|
|
log.info("下载文件: {}", filename);
|
|
|
XxlJobHelper.log("下载文件: {}", filename);
|
|
|
String src = sourceDir + filename;
|
|
|
- String dst = downloadTargetDir + filename;
|
|
|
+ String dst = downloadDir + filename;
|
|
|
sshUtil.get(src, dst);
|
|
|
// 删除远程文件
|
|
|
log.info("删除远程文件: {}", filename);
|
|
@@ -161,8 +270,8 @@ public class FiveGPmTask {
|
|
|
public void distinct(String filename) throws IOException {
|
|
|
log.info("文件 {} 去重...", filename);
|
|
|
XxlJobHelper.log("文件 {} 去重...", filename);
|
|
|
- String inputFilePath = downloadTargetDir + filename;
|
|
|
- String outputFilePath = distinctTargetDir + filename;
|
|
|
+ String inputFilePath = downloadDir + filename;
|
|
|
+ String outputFilePath = distinctDir + filename;
|
|
|
Path inputPath = Paths.get(inputFilePath);
|
|
|
try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
|
|
|
.parse(new InputStreamReader(Files.newInputStream(inputPath), StandardCharsets.UTF_8));
|
|
@@ -186,6 +295,19 @@ public class FiveGPmTask {
|
|
|
// XxlJobHelper.log("删除本地源文件 {}...", filename);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 导入数据
|
|
|
+ *
|
|
|
+ * @param filename 文件名
|
|
|
+ * @throws IOException ioexception
|
|
|
+ */
|
|
|
+ public void importData(String filename) throws IOException, InterruptedException {
|
|
|
+ String csv = distinctDir + filename;
|
|
|
+ PsqlUtil.copyCsv(importScript, dbHost, dbPort, dbUsername, dbPassword, dbName, dbTable, csv, minInsertCount);
|
|
|
+ // 删除重排文件
|
|
|
+ Files.deleteIfExists(Paths.get(distinctDir + filename));
|
|
|
+ }
|
|
|
+
|
|
|
public void gpload(String filename) throws IOException {
|
|
|
String gploadCommand = "sh /data1/pm_5g/gpload/pm_nr_gpload.sh " + filename;
|
|
|
GploadResult gpload = GploadUtil.gpload(gploadCommand);
|
|
@@ -193,10 +315,10 @@ public class FiveGPmTask {
|
|
|
log.info("gpload完成: {}", gpload);
|
|
|
XxlJobHelper.log("gpload完成: {}", gpload);
|
|
|
if (gpload.getInsertedCount() < minInsertCount) {
|
|
|
- throw new RemoteException(filename + " 数据异常,少于 " + minInsertCount);
|
|
|
+ throw new RuntimeException(filename + " 数据异常,少于 " + minInsertCount);
|
|
|
}
|
|
|
// 删除重排文件
|
|
|
- Files.deleteIfExists(Paths.get(distinctTargetDir + filename));
|
|
|
+ Files.deleteIfExists(Paths.get(distinctDir + filename));
|
|
|
} else {
|
|
|
throw new RuntimeException("gpload失败: " + gpload.getMessage());
|
|
|
}
|
|
@@ -221,6 +343,8 @@ public class FiveGPmTask {
|
|
|
String nciStr = t.get(2);
|
|
|
String enbid = "";
|
|
|
String cellId = "";
|
|
|
+ // 数据库字段限制100个字符
|
|
|
+ String userLabel = org.apache.commons.lang3.StringUtils.substring(t.get(3), 0, 100);
|
|
|
if (!StringUtils.hasText(nciStr)) {
|
|
|
continue;
|
|
|
}
|
|
@@ -242,7 +366,7 @@ public class FiveGPmTask {
|
|
|
int nci = Integer.parseInt(enbid) * 4096 + Integer.parseInt(cellId);
|
|
|
printer.printRecord(
|
|
|
t.get(1),
|
|
|
- t.get(3),
|
|
|
+ userLabel,
|
|
|
nci,
|
|
|
enbid,
|
|
|
cellId,
|