|
@@ -1,14 +1,14 @@
|
|
|
package com.nokia.siteinfo.task;
|
|
|
|
|
|
-import com.nokia.common.gpload.GploadUtil;
|
|
|
-import com.nokia.common.gpload.entity.GploadResult;
|
|
|
+import com.nokia.common.exception.MyRuntimeException;
|
|
|
+import com.nokia.common.psql.PsqlUtil;
|
|
|
+import com.nokia.siteinfo.config.TaskConfig;
|
|
|
import com.xxl.job.core.context.XxlJobHelper;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.Data;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.commons.csv.CSVFormat;
|
|
|
import org.apache.commons.csv.CSVPrinter;
|
|
|
-import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
@@ -27,23 +27,21 @@ import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.TimeoutException;
|
|
|
|
|
|
-@ConfigurationProperties("task")
|
|
|
@Data
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
public class UpdateTask {
|
|
|
- private String gploadShellPath;
|
|
|
- /**
|
|
|
- * 超时分钟
|
|
|
- */
|
|
|
- private Long timeout;
|
|
|
+ private final TaskConfig taskConfig;
|
|
|
+ private final JdbcTemplate jdbcTemplate1;
|
|
|
+ private final JdbcTemplate jdbcTemplate2;
|
|
|
+ private final JdbcTemplate jdbcTemplate3;
|
|
|
|
|
|
- private final JdbcTemplate inputJdbcTemplate;
|
|
|
- private final JdbcTemplate outputJdbcTemplate;
|
|
|
-
|
|
|
- public UpdateTask(JdbcTemplate inputJdbcTemplate, JdbcTemplate outputJdbcTemplate) {
|
|
|
- this.inputJdbcTemplate = inputJdbcTemplate;
|
|
|
- this.outputJdbcTemplate = outputJdbcTemplate;
|
|
|
+ public UpdateTask(TaskConfig taskConfig, JdbcTemplate jdbcTemplate1, JdbcTemplate jdbcTemplate2,
|
|
|
+ JdbcTemplate jdbcTemplate3) {
|
|
|
+ this.taskConfig = taskConfig;
|
|
|
+ this.jdbcTemplate1 = jdbcTemplate1;
|
|
|
+ this.jdbcTemplate2 = jdbcTemplate2;
|
|
|
+ this.jdbcTemplate3 = jdbcTemplate3;
|
|
|
}
|
|
|
|
|
|
// 0 0 1 ? * 2
|
|
@@ -54,14 +52,14 @@ public class UpdateTask {
|
|
|
try {
|
|
|
singleTask();
|
|
|
} catch (Exception e) {
|
|
|
- throw new RuntimeException(e);
|
|
|
+ throw new MyRuntimeException(e);
|
|
|
}
|
|
|
- }).get(timeout, TimeUnit.MINUTES);
|
|
|
+ }).get(29, TimeUnit.MINUTES);
|
|
|
} catch (InterruptedException e) {
|
|
|
log.error("线程中断: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
Thread.currentThread().interrupt();
|
|
|
- throw new RuntimeException(e);
|
|
|
+ throw new MyRuntimeException(e);
|
|
|
} catch (TimeoutException e) {
|
|
|
log.error("执行超时: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.log("执行超时: {}", e.getMessage(), e);
|
|
@@ -79,21 +77,19 @@ public class UpdateTask {
|
|
|
Map<String, Map<String, Object>> five = new HashMap<>();
|
|
|
update4g(four);
|
|
|
update5g(five);
|
|
|
- writeCsvs(four, five, siteLevelMap);
|
|
|
- backup("customer_service.cfg_p_netconf_std");
|
|
|
- gpload("customer_service.cfg_p_netconf_std");
|
|
|
- backup("customer_service.cfg_cell_info");
|
|
|
- gpload("customer_service.cfg_cell_info");
|
|
|
+ Map<Object, Map<String, Object>> map = siteInfoToCfgPNetconfStd(four, five, siteLevelMap);
|
|
|
+ updateCfgPNetconfStd(map);
|
|
|
+ updateCfgCellInfo(map);
|
|
|
}
|
|
|
|
|
|
private Map<Object, Object> getSiteLevelMap() {
|
|
|
Map<Object, Object> siteLevelMap = new HashMap<>();
|
|
|
String sql = "select * from cfm.cfg_0_4g_sitelevel";
|
|
|
- List<Map<String, Object>> l1 = outputJdbcTemplate.queryForList(sql);
|
|
|
+ List<Map<String, Object>> l1 = jdbcTemplate2.queryForList(sql);
|
|
|
log.info("cfg_0_4g_sitelevel: {}", l1.size());
|
|
|
l1.forEach(t -> siteLevelMap.put(t.get("site_id"), t.get("area3")));
|
|
|
sql = "select * from cfm.cfg_0_5g_sitelevel";
|
|
|
- List<Map<String, Object>> l2 = outputJdbcTemplate.queryForList(sql);
|
|
|
+ List<Map<String, Object>> l2 = jdbcTemplate2.queryForList(sql);
|
|
|
log.info("cfg_0_5g_sitelevel: {}", l2.size());
|
|
|
l2.forEach(t -> siteLevelMap.put(t.get("site_id"), t.get("area3")));
|
|
|
log.info("siteLevelMap: {}", siteLevelMap.size());
|
|
@@ -103,19 +99,38 @@ public class UpdateTask {
|
|
|
private void update4g(Map<String, Map<String, Object>> map) throws IOException {
|
|
|
// 查询o2p.cfg_0_4g_siteinfo
|
|
|
String sql = "select * from o2p.cfg_0_4g_siteinfo";
|
|
|
- List<Map<String, Object>> list = inputJdbcTemplate.queryForList(sql);
|
|
|
+ List<Map<String, Object>> list = jdbcTemplate1.queryForList(sql);
|
|
|
// 去重
|
|
|
list.forEach(t -> map.put((String) t.get("cgi"), t));
|
|
|
log.info("cfg_0_4g_siteinfo: {} -> {}", list.size(), map.size());
|
|
|
XxlJobHelper.log("cfg_0_4g_siteinfo: {} -> {}", list.size(), map.size());
|
|
|
String table = "cfm.cfg_0_4g_siteinfo";
|
|
|
- writeCsv(map, table);
|
|
|
+ String csv = table + ".csv";
|
|
|
+ log.info("writeCsv: {}", table);
|
|
|
+ XxlJobHelper.log("writeCsv: {}", table);
|
|
|
+ try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(csv)),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
+ // 添加bom头避免excel乱码
|
|
|
+ osw.write('\ufeff');
|
|
|
+ // 表头
|
|
|
+ printer.printRecord("sdate","city_code","city_name","district_code","district_name","network_name","phystation_address","bbu_name","enbid","cell_name","cell_id","cgi","lon","lat","direction","height","m_downtilt","e_downtilt","station_type","isdigitalindoor","down_freq","vender","own_schoolyard","toweraddress_code","property","scene","is_scenesite","marketing_network","terminalamount_5g","sector_incoming","is_busy","is_alive","is_alive_update_time","construction","is_same_address","same_address_sites","is_same_address_ct");
|
|
|
+ for (Map<String, Object> t : map.values()) {
|
|
|
+ printer.printRecord(t.values());
|
|
|
+ }
|
|
|
+ }
|
|
|
backup(table);
|
|
|
// o2p.cfg_0_4g_siteinfo -> cfm.cfg_0_4g_siteinfo
|
|
|
- gpload(table);
|
|
|
+ String columns = "(sdate,city_code,city_name,district_code,district_name,network_name,phystation_address,bbu_name,enbid,cell_name,cell_id,cgi,lon,lat,direction,height,m_downtilt,e_downtilt,station_type,isdigitalindoor,down_freq,vender,own_schoolyard,toweraddress_code,property,scene,is_scenesite,marketing_network,terminalamount_5g,sector_incoming,is_busy,is_alive,is_alive_update_time,construction,is_same_address,same_address_sites,is_same_address_ct)";
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost2(), taskConfig.getDbPort2(),
|
|
|
+ taskConfig.getDbUsername2(), taskConfig.getDbPassword2(), taskConfig.getDbName2(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost3(), taskConfig.getDbPort3(),
|
|
|
+ taskConfig.getDbUsername3(), taskConfig.getDbPassword3(), taskConfig.getDbName3(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
// 查询cfm.cfg_0_4g_siteinfo_gx
|
|
|
sql = "select * from cfm.cfg_0_4g_siteinfo_gx";
|
|
|
- list = outputJdbcTemplate.queryForList(sql);
|
|
|
+ list = jdbcTemplate2.queryForList(sql);
|
|
|
// 添加新元素
|
|
|
list.forEach(t -> map.putIfAbsent((String) t.get("cgi"), t));
|
|
|
log.info("cfg_0_4g_siteinfo_gx: {} -> {}", list.size(), map.size());
|
|
@@ -125,19 +140,38 @@ public class UpdateTask {
|
|
|
private void update5g(Map<String, Map<String, Object>> map) throws IOException {
|
|
|
// 查询o2p.cfg_0_5g_siteinfo
|
|
|
String sql = "select * from o2p.cfg_0_5g_siteinfo";
|
|
|
- List<Map<String, Object>> list = inputJdbcTemplate.queryForList(sql);
|
|
|
+ List<Map<String, Object>> list = jdbcTemplate1.queryForList(sql);
|
|
|
// 去重
|
|
|
list.forEach(t -> map.put((String) t.get("cgisai"), t));
|
|
|
log.info("cfg_0_5g_siteinfo: {} -> {}", list.size(), map.size());
|
|
|
XxlJobHelper.log("cfg_0_5g_siteinfo: {} -> {}", list.size(), map.size());
|
|
|
String table = "cfm.cfg_0_5g_siteinfo";
|
|
|
- writeCsv(map, table);
|
|
|
+ String csv = table + ".csv";
|
|
|
+ log.info("writeCsv: {}", table);
|
|
|
+ XxlJobHelper.log("writeCsv: {}", table);
|
|
|
+ try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(csv)),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
+ // 添加bom头避免excel乱码
|
|
|
+ osw.write('\ufeff');
|
|
|
+ // 表头
|
|
|
+ printer.printRecord("sdate","city_code","city_name","district_code","district_name","network_name","phystation_address","station_name","gnbid","cell_name","cell_id","cgisai","lon","lat","direction","height","m_downtilt","e_downtilt","station_type","isdigitalindoor","down_freq","vender","own_schoolyard","toweraddress_code","property","scene","is_scenesite","marketing_network","terminalamount_5g","sector_incoming","is_busy","is_alive","is_alive_update_time","construction","is_same_address","same_address_sites","is_same_address_ct");
|
|
|
+ for (Map<String, Object> t : map.values()) {
|
|
|
+ printer.printRecord(t.values());
|
|
|
+ }
|
|
|
+ }
|
|
|
backup(table);
|
|
|
// o2p.cfg_0_5g_siteinfo -> cfm.cfg_0_5g_siteinfo
|
|
|
- gpload(table);
|
|
|
+ String columns = "(sdate,city_code,city_name,district_code,district_name,network_name,phystation_address,station_name,gnbid,cell_name,cell_id,cgisai,lon,lat,direction,height,m_downtilt,e_downtilt,station_type,isdigitalindoor,down_freq,vender,own_schoolyard,toweraddress_code,property,scene,is_scenesite,marketing_network,terminalamount_5g,sector_incoming,is_busy,is_alive,is_alive_update_time,construction,is_same_address,same_address_sites,is_same_address_ct)";
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost2(), taskConfig.getDbPort2(),
|
|
|
+ taskConfig.getDbUsername2(), taskConfig.getDbPassword2(), taskConfig.getDbName2(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost3(), taskConfig.getDbPort3(),
|
|
|
+ taskConfig.getDbUsername3(), taskConfig.getDbPassword3(), taskConfig.getDbName3(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
// 查询cfm.cfg_0_5g_siteinfo_gx
|
|
|
sql = "select * from cfm.cfg_0_5g_siteinfo_gx";
|
|
|
- list = outputJdbcTemplate.queryForList(sql);
|
|
|
+ list = jdbcTemplate2.queryForList(sql);
|
|
|
// 添加新元素
|
|
|
list.forEach(t -> map.putIfAbsent((String) t.get("cgisai"), t));
|
|
|
log.info("cfg_0_5g_siteinfo_gx: {} -> {}", list.size(), map.size());
|
|
@@ -145,26 +179,67 @@ public class UpdateTask {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 将cfg_0_4g_siteinfo、cfg_0_5g_siteinfo转成cfg_p_netconf_std、cfg_cell_info写入csv文件
|
|
|
+ * 将cfg_0_4g_siteinfo、cfg_0_5g_siteinfo转成cfg_p_netconf_std
|
|
|
+ */
|
|
|
+ private Map<Object, Map<String, Object>> siteInfoToCfgPNetconfStd(Map<String, Map<String, Object>> four,
|
|
|
+ Map<String, Map<String, Object>> five,
|
|
|
+ Map<Object, Object> siteLevelMap) {
|
|
|
+ Map<Object, Map<String, Object>> map = new HashMap<>();
|
|
|
+ fourGCfgPNetconfStd(four, map, siteLevelMap);
|
|
|
+ fiveGCfgPNetconfStd(five, map, siteLevelMap);
|
|
|
+ log.info("cfg_p_netconf_std: {}", map.size());
|
|
|
+ XxlJobHelper.log("cfg_p_netconf_std: {}", map.size());
|
|
|
+ return map;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 导入customer_service.cfg_p_netconf_std
|
|
|
+ */
|
|
|
+ private void updateCfgPNetconfStd(Map<Object, Map<String, Object>> map) throws IOException {
|
|
|
+ String table = "customer_service.cfg_p_netconf_std";
|
|
|
+ String csv = table + ".csv";
|
|
|
+ try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(csv)),
|
|
|
+ StandardCharsets.UTF_8);
|
|
|
+ CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
+ // 添加bom头避免excel乱码
|
|
|
+ osw.write('\ufeff');
|
|
|
+ // 表头
|
|
|
+ printer.printRecord("id","city_id","city_name","area_id","core_ne_name","radio_ne_name","site_name","site_id","cell_no","cell_name","cell_id","ecgi","gnb_cu_id","gnb_du_id","gsm_cell_id","network_type","type_5g","tac","cover_type","nodeb_type","rac","lac","grid","areatype","sub_areatype","area3","longitude","latitude","vendor","height","downtilt","azimuth","fr","scramber","province_id","bsbusip","bsbusip_1","isanchor","operator","isshare","slice_type");
|
|
|
+ printCfgPNetconfStd(printer, map);
|
|
|
+ }
|
|
|
+ backup(table);
|
|
|
+ String columns = "(id,city_id,city_name,area_id,core_ne_name,radio_ne_name,site_name,site_id,cell_no,cell_name,cell_id,ecgi,gnb_cu_id,gnb_du_id,gsm_cell_id,network_type,type_5g,tac,cover_type,nodeb_type,rac,lac,grid,areatype,sub_areatype,area3,longitude,latitude,vendor,height,downtilt,azimuth,fr,scramber,province_id,bsbusip,bsbusip_1,isanchor,operator,isshare,slice_type)";
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost2(), taskConfig.getDbPort2(),
|
|
|
+ taskConfig.getDbUsername2(), taskConfig.getDbPassword2(), taskConfig.getDbName2(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost3(), taskConfig.getDbPort3(),
|
|
|
+ taskConfig.getDbUsername3(), taskConfig.getDbPassword3(), taskConfig.getDbName3(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 导入customer_service.cfg_cell_info
|
|
|
*/
|
|
|
- private void writeCsvs(Map<String, Map<String, Object>> four, Map<String, Map<String, Object>> five,
|
|
|
- Map<Object, Object> siteLevelMap) throws IOException {
|
|
|
- try (OutputStreamWriter osw1 = new OutputStreamWriter(
|
|
|
- Files.newOutputStream(Paths.get("customer_service.cfg_p_netconf_std.csv")),
|
|
|
+ private void updateCfgCellInfo(Map<Object, Map<String, Object>> map) throws IOException {
|
|
|
+ String table = "customer_service.cfg_cell_info";
|
|
|
+ String csv = table + ".csv";
|
|
|
+ try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(csv)),
|
|
|
StandardCharsets.UTF_8);
|
|
|
- CSVPrinter printer1 = new CSVPrinter(osw1, CSVFormat.DEFAULT);
|
|
|
- OutputStreamWriter osw2 = new OutputStreamWriter(
|
|
|
- Files.newOutputStream(Paths.get("customer_service.cfg_cell_info.csv")),
|
|
|
- StandardCharsets.UTF_8);
|
|
|
- CSVPrinter printer2 = new CSVPrinter(osw2, CSVFormat.DEFAULT);) {
|
|
|
- Map<Object, Map<String, Object>> map = new HashMap<>();
|
|
|
- fourGCfgPNetconfStd(four, map, siteLevelMap);
|
|
|
- fiveGCfgPNetconfStd(five, map, siteLevelMap);
|
|
|
- log.info("cfg_p_netconf_std: {}", map.size());
|
|
|
- XxlJobHelper.log("cfg_p_netconf_std: {}", map.size());
|
|
|
- printCfgPNetconfStd(printer1, map);
|
|
|
- printCfgCellInfo(printer2, map);
|
|
|
+ CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
+ // 添加bom头避免excel乱码
|
|
|
+ osw.write('\ufeff');
|
|
|
+ // 表头
|
|
|
+ printer.printRecord("cell_id","site_name","city_name","longitude","latitude","tac");
|
|
|
+ printCfgCellInfo(printer, map);
|
|
|
}
|
|
|
+ backup(table);
|
|
|
+ String columns = "(cell_id,site_name,city_name,longitude,latitude,tac)";
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost2(), taskConfig.getDbPort2(),
|
|
|
+ taskConfig.getDbUsername2(), taskConfig.getDbPassword2(), taskConfig.getDbName2(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost3(), taskConfig.getDbPort3(),
|
|
|
+ taskConfig.getDbUsername3(), taskConfig.getDbPassword3(), taskConfig.getDbName3(), table, csv,
|
|
|
+ columns, taskConfig.getTimeout(), taskConfig.getMinInsertCount());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -286,7 +361,9 @@ public class UpdateTask {
|
|
|
XxlJobHelper.log("writeCsv: {}", table);
|
|
|
try (OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(table + ".csv")),
|
|
|
StandardCharsets.UTF_8);
|
|
|
- CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT);) {
|
|
|
+ CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
+ // 添加bom头避免excel乱码
|
|
|
+ osw.write('\ufeff');
|
|
|
for (Map<String, Object> t : map.values()) {
|
|
|
printer.printRecord(t.values());
|
|
|
}
|
|
@@ -305,7 +382,8 @@ public class UpdateTask {
|
|
|
String nowString = dateFormat(now);
|
|
|
// 备份
|
|
|
String sql = "create table " + table + "_bak_" + nowString + " as select * from " + table;
|
|
|
- outputJdbcTemplate.execute(sql);
|
|
|
+ jdbcTemplate2.execute(sql);
|
|
|
+ jdbcTemplate3.execute(sql);
|
|
|
// 删除之前的备份
|
|
|
sql = "DO $$\n" +
|
|
|
"DECLARE t varchar(255);\n" +
|
|
@@ -317,10 +395,12 @@ public class UpdateTask {
|
|
|
"END LOOP;\n" +
|
|
|
"END;\n" +
|
|
|
"$$";
|
|
|
- outputJdbcTemplate.execute(sql);
|
|
|
+ jdbcTemplate2.execute(sql);
|
|
|
+ jdbcTemplate3.execute(sql);
|
|
|
// 清空表
|
|
|
sql = "truncate table " + table;
|
|
|
- outputJdbcTemplate.execute(sql);
|
|
|
+ jdbcTemplate2.execute(sql);
|
|
|
+ jdbcTemplate3.execute(sql);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -333,19 +413,6 @@ public class UpdateTask {
|
|
|
return localDate.format(DateTimeFormatter.ofPattern("MMdd"));
|
|
|
}
|
|
|
|
|
|
- private void gpload(String table) throws IOException {
|
|
|
- String gploadCommand = "sh " + gploadShellPath + " " + table;
|
|
|
- GploadResult gpload = GploadUtil.gpload(gploadCommand);
|
|
|
- if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
|
|
|
- log.info("gpload {} 完成: {}", table, gpload);
|
|
|
- XxlJobHelper.log("gpload {} 完成: {}", table, gpload);
|
|
|
- // 删除文件
|
|
|
- Files.deleteIfExists(Paths.get(table + ".csv"));
|
|
|
- } else {
|
|
|
- throw new RuntimeException("gpload " + table + " 失败: " + gpload.getMessage());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* 将cfg_p_netconf_std写入csv文件
|
|
|
*/
|