|
@@ -1,12 +1,13 @@
|
|
|
package com.nokia.task;
|
|
|
|
|
|
-import com.alibaba.fastjson2.JSON;
|
|
|
import com.jcraft.jsch.JSchException;
|
|
|
import com.jcraft.jsch.SftpException;
|
|
|
import com.nokia.common.exception.MyRuntimeException;
|
|
|
import com.nokia.common.gpload.GploadUtil;
|
|
|
import com.nokia.common.gpload.entity.GploadResult;
|
|
|
+import com.nokia.common.psql.PsqlUtil;
|
|
|
import com.nokia.common.ssh.SSHUtil;
|
|
|
+import com.nokia.config.TaskConfig;
|
|
|
import com.xxl.job.core.context.XxlJobHelper;
|
|
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
|
|
import lombok.Data;
|
|
@@ -15,9 +16,7 @@ import org.apache.commons.csv.CSVFormat;
|
|
|
import org.apache.commons.csv.CSVParser;
|
|
|
import org.apache.commons.csv.CSVPrinter;
|
|
|
import org.apache.commons.csv.CSVRecord;
|
|
|
-import org.springframework.boot.context.properties.ConfigurationProperties;
|
|
|
import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
-import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.util.CollectionUtils;
|
|
|
|
|
@@ -32,46 +31,24 @@ import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.CompletableFuture;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
@Data
|
|
|
@Component
|
|
|
@Slf4j
|
|
|
-@ConfigurationProperties("task")
|
|
|
public class SyncTask {
|
|
|
-
|
|
|
-
|
|
|
- * sftp ip
|
|
|
- */
|
|
|
- private String host;
|
|
|
-
|
|
|
- * sftp端口
|
|
|
- */
|
|
|
- private Integer port;
|
|
|
-
|
|
|
- * sftp账号
|
|
|
- */
|
|
|
- private String username;
|
|
|
|
|
|
- * sftp密码
|
|
|
+ * 上次扫描到的文件
|
|
|
*/
|
|
|
- private String password;
|
|
|
-
|
|
|
- * top用户数据文件夹
|
|
|
- */
|
|
|
- private String sourceDir;
|
|
|
-
|
|
|
- * 下载文件夹
|
|
|
- */
|
|
|
- private String downloadDir;
|
|
|
-
|
|
|
- * 去重文件名
|
|
|
- */
|
|
|
- private String distinctFilename;
|
|
|
+ private String lastFilename;
|
|
|
private SSHUtil sshUtil;
|
|
|
private final JdbcTemplate jdbcTemplate;
|
|
|
+ private final TaskConfig taskConfig;
|
|
|
|
|
|
- public SyncTask(JdbcTemplate jdbcTemplate) {
|
|
|
+ public SyncTask(JdbcTemplate jdbcTemplate, TaskConfig taskConfig) {
|
|
|
this.jdbcTemplate = jdbcTemplate;
|
|
|
+ this.taskConfig = taskConfig;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -81,26 +58,29 @@ public class SyncTask {
|
|
|
@XxlJob("syncTopUser")
|
|
|
public void syncTopUser() {
|
|
|
try {
|
|
|
-
|
|
|
- Files.createDirectories(Paths.get(downloadDir));
|
|
|
- sshUtil = new SSHUtil(host, port, username, password);
|
|
|
-
|
|
|
- List<String> list = sshUtil.ls(sourceDir);
|
|
|
- if (CollectionUtils.isEmpty(list)) {
|
|
|
- log.error("没有文件");
|
|
|
- XxlJobHelper.log("没有文件");
|
|
|
- XxlJobHelper.handleFail("没有文件");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- String filename = list.get(list.size() - 1);
|
|
|
- log.info("同步的文件: {}", filename);
|
|
|
- XxlJobHelper.log("同步的文件: {}", filename);
|
|
|
- singleTask(filename);
|
|
|
+ CompletableFuture.runAsync(() -> {
|
|
|
+ try {
|
|
|
+ singleTask();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ throw new MyRuntimeException(e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new MyRuntimeException(e);
|
|
|
+ }
|
|
|
+ }).get(5, TimeUnit.MINUTES);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("线程中断: {}", e.getMessage(), e);
|
|
|
+ XxlJobHelper.log("线程中断: {}", e.getMessage(), e);
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
+ XxlJobHelper.handleFail(e.getMessage());
|
|
|
+ rollback();
|
|
|
} catch (Exception e) {
|
|
|
log.error("发生异常了: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
|
|
|
XxlJobHelper.handleFail(e.getMessage());
|
|
|
+ rollback();
|
|
|
} finally {
|
|
|
try {
|
|
|
sshUtil.disconnect();
|
|
@@ -115,12 +95,31 @@ public class SyncTask {
|
|
|
|
|
|
* 单一任务
|
|
|
*
|
|
|
- * @param filename 文件名
|
|
|
*/
|
|
|
- public void singleTask(String filename) throws JSchException, IOException, SftpException {
|
|
|
+ public void singleTask() throws JSchException, IOException, SftpException, InterruptedException {
|
|
|
+
|
|
|
+ Files.createDirectories(Paths.get(taskConfig.getDownloadDir()));
|
|
|
+ sshUtil = new SSHUtil(taskConfig.getSshHost(), taskConfig.getSshPort(), taskConfig.getSshUsername(),
|
|
|
+ taskConfig.getSshPassword());
|
|
|
+
|
|
|
+ List<String> list = sshUtil.ls(taskConfig.getSourceDir());
|
|
|
+ if (CollectionUtils.isEmpty(list)) {
|
|
|
+ log.error("没有文件");
|
|
|
+ XxlJobHelper.log("没有文件");
|
|
|
+ XxlJobHelper.handleFail("没有文件");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String filename = list.get(list.size() - 1);
|
|
|
+ log.info("同步的文件: {}", filename);
|
|
|
+ XxlJobHelper.log("同步的文件: {}", filename);
|
|
|
+ if (filename.equals(lastFilename)) {
|
|
|
+ throw new MyRuntimeException("没有新文件");
|
|
|
+ }
|
|
|
download(filename);
|
|
|
distinct(filename);
|
|
|
- gpload();
|
|
|
+ importData();
|
|
|
+ lastFilename = filename;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -131,8 +130,8 @@ public class SyncTask {
|
|
|
public void download(String filename) throws JSchException, IOException, SftpException {
|
|
|
log.info("下载文件: {}", filename);
|
|
|
XxlJobHelper.log("下载文件: {}", filename);
|
|
|
- String src = sourceDir + filename;
|
|
|
- String dst = downloadDir + filename;
|
|
|
+ String src = taskConfig.getSourceDir() + filename;
|
|
|
+ String dst = taskConfig.getDownloadDir() + filename;
|
|
|
sshUtil.get(src, dst);
|
|
|
}
|
|
|
|
|
@@ -144,15 +143,13 @@ public class SyncTask {
|
|
|
public void distinct(String filename) throws IOException {
|
|
|
log.info("文件 {} 去重", filename);
|
|
|
XxlJobHelper.log("文件 {} 去重", filename);
|
|
|
- String inputFilePath = downloadDir + filename;
|
|
|
+ String inputFilePath = taskConfig.getDownloadDir() + filename;
|
|
|
Path inputPath = Paths.get(inputFilePath);
|
|
|
- List<Object> modifiedUsers = new ArrayList<>();
|
|
|
try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
|
|
|
.parse(new InputStreamReader(Files.newInputStream(inputPath), "gbk"));
|
|
|
- OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(distinctFilename)),
|
|
|
- StandardCharsets.UTF_8);
|
|
|
+ OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(
|
|
|
+ Paths.get(taskConfig.getDistinctFilename())), StandardCharsets.UTF_8);
|
|
|
CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
|
|
|
-
|
|
|
Map<String, CSVRecord> csvRecordMap = new HashMap<>();
|
|
|
|
|
|
for (CSVRecord t : parser) {
|
|
@@ -166,104 +163,121 @@ public class SyncTask {
|
|
|
if (CollectionUtils.isEmpty(csvRecordMap)) {
|
|
|
throw new MyRuntimeException("数据为空");
|
|
|
}
|
|
|
-
|
|
|
- String sql = "select user_id, login_name, city_id from sqmdb_rpt.acl_user"
|
|
|
- + " where deleted = 0 and test_user = 0 order by user_id";
|
|
|
- List<Map<String, Object>> allUsers = jdbcTemplate.queryForList(sql);
|
|
|
- Map<Object, Map<String, Object>> userMap = new HashMap<>();
|
|
|
- allUsers.forEach(t -> userMap.put(t.get("login_name"), t));
|
|
|
-
|
|
|
- sql = "select * from sqmdb_rpt.acl_area order by type_code, area_id";
|
|
|
- List<Map<String, Object>> areas = jdbcTemplate.queryForList(sql);
|
|
|
Map<String, Map<String, Object>> cityMap = new HashMap<>();
|
|
|
Map<Object, List<Map<String, Object>>> areaMap = new HashMap<>();
|
|
|
- for (Map<String, Object> t : areas) {
|
|
|
- if (t.get("type_code").equals(2)) {
|
|
|
- cityMap.put((String) t.get("area_name"), t);
|
|
|
- areaMap.putIfAbsent(t.get("area_id"), new ArrayList<>());
|
|
|
- for (Map<String, Object> tt : areas) {
|
|
|
- if (tt.get("type_code").equals(3) && tt.get("parent_id").equals(t.get("area_id"))) {
|
|
|
- areaMap.get(t.get("area_id")).add(tt);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- for (CSVRecord t : csvRecordMap.values()) {
|
|
|
- String orgId = t.get(0);
|
|
|
- String orgName = t.get(1);
|
|
|
- String userId = t.get(2);
|
|
|
- String userName = t.get(3);
|
|
|
- String loginName = t.get(4);
|
|
|
- String phone = t.get(5);
|
|
|
- String employeeCode = t.get(6);
|
|
|
- Integer provinceId = -1;
|
|
|
- Object cityId = -1;
|
|
|
- Object areaId = null;
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
- for (Map.Entry<String, Map<String, Object>> tt : cityMap.entrySet()) {
|
|
|
-
|
|
|
- if (orgName.contains(tt.getKey())) {
|
|
|
- Map<String, Object> area = tt.getValue();
|
|
|
- cityId = area.get("area_id");
|
|
|
-
|
|
|
- List<Map<String, Object>> areaList = areaMap.get(area.get("area_id"));
|
|
|
- for (Map<String, Object> ttt : areaList) {
|
|
|
-
|
|
|
- if (orgName.contains((CharSequence) ttt.get("area_name"))) {
|
|
|
- areaId = ttt.get("area_id");
|
|
|
-
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId,
|
|
|
- cityId, areaId);
|
|
|
-
|
|
|
- Map<String, Object> user = userMap.get(loginName);
|
|
|
- if (user != null && !cityId.equals(user.get("city_id"))) {
|
|
|
- modifiedUsers.add(user.get("user_id"));
|
|
|
- log.debug("用户 {} 地市变化: {} -> {}", user.get("user_id"), user.get("city_id"), cityId);
|
|
|
- }
|
|
|
- }
|
|
|
+ getAreaMap(cityMap, areaMap);
|
|
|
+ matchArea(printer, csvRecordMap, cityMap, areaMap);
|
|
|
log.info("文件 {} 去重完成", filename);
|
|
|
XxlJobHelper.log("文件 {} 去重完成", filename);
|
|
|
|
|
|
Files.deleteIfExists(inputPath);
|
|
|
log.info("删除本地源文件 {}", filename);
|
|
|
XxlJobHelper.log("删除本地源文件 {}", filename);
|
|
|
-
|
|
|
- sql = "select user_id from sqmdb_rpt.acl_user"
|
|
|
- + " where deleted = 0 and test_user = 0 and login_name not in (:loginNames)"
|
|
|
- + " order by user_id";
|
|
|
- Map<String, Object> paramMap = new HashMap<>();
|
|
|
- paramMap.put("loginNames", csvRecordMap.keySet());
|
|
|
- NamedParameterJdbcTemplate namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
|
|
|
- List<Integer> deletedUsers = namedParameterJdbcTemplate.queryForList(sql, paramMap, Integer.class);
|
|
|
- if (!CollectionUtils.isEmpty(deletedUsers)) {
|
|
|
- log.error("已删除用户: {}", JSON.toJSONString(deletedUsers));
|
|
|
- }
|
|
|
-
|
|
|
- sql = "select distinct au.user_id"
|
|
|
- + " from sqmdb_rpt.acl_user au"
|
|
|
- + " inner join sqmdb_rpt.acl_user_role_city aurc on au.user_id = aurc.user_id"
|
|
|
- + " where au.city_id != aurc.city_id"
|
|
|
- + " and aurc.role_id != 3"
|
|
|
- + " order by au.user_id";
|
|
|
- List<Integer> diffUsers = jdbcTemplate.queryForList(sql, Integer.class);
|
|
|
- if (!CollectionUtils.isEmpty(diffUsers)) {
|
|
|
- log.error("权限地市不一致用户: {}", JSON.toJSONString(diffUsers));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 得到区域地图
|
|
|
+ *
|
|
|
+ * @param cityMap 城市地图
|
|
|
+ * @param areaMap 区域地图
|
|
|
+ */
|
|
|
+ private void getAreaMap(Map<String, Map<String, Object>> cityMap, Map<Object, List<Map<String, Object>>> areaMap) {
|
|
|
+
|
|
|
+ String sql = "select * from sqmdb_rpt.acl_area order by type_code, area_id";
|
|
|
+ List<Map<String, Object>> areas = jdbcTemplate.queryForList(sql);
|
|
|
+ for (Map<String, Object> t : areas) {
|
|
|
+ if (t.get("type_code").equals(2)) {
|
|
|
+ cityMap.put((String) t.get("area_name"), t);
|
|
|
+ areaMap.putIfAbsent(t.get("area_id"), new ArrayList<>());
|
|
|
+ for (Map<String, Object> tt : areas) {
|
|
|
+ if (tt.get("type_code").equals(3) && tt.get("parent_id").equals(t.get("area_id"))) {
|
|
|
+ areaMap.get(t.get("area_id")).add(tt);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- if (!CollectionUtils.isEmpty(modifiedUsers)) {
|
|
|
- log.error("地市变化用户: {}", JSON.toJSONString(modifiedUsers));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 匹配区域
|
|
|
+ *
|
|
|
+ * @param printer 打印机
|
|
|
+ * @param csvRecordMap csv记录地图
|
|
|
+ * @param cityMap 城市地图
|
|
|
+ * @param areaMap 区域地图
|
|
|
+ */
|
|
|
+ private void matchArea(CSVPrinter printer, Map<String, CSVRecord> csvRecordMap,
|
|
|
+ Map<String, Map<String, Object>> cityMap, Map<Object, List<Map<String, Object>>> areaMap)
|
|
|
+ throws IOException {
|
|
|
+ for (CSVRecord t : csvRecordMap.values()) {
|
|
|
+ String orgId = t.get(0);
|
|
|
+ String orgName = t.get(1);
|
|
|
+ String userId = t.get(2);
|
|
|
+ String userName = t.get(3);
|
|
|
+ String loginName = t.get(4);
|
|
|
+ String phone = t.get(5);
|
|
|
+ String employeeCode = t.get(6);
|
|
|
+ Integer provinceId = -1;
|
|
|
+ Object cityId = -1;
|
|
|
+ Object areaId = null;
|
|
|
+ for (Map.Entry<String, Map<String, Object>> tt : cityMap.entrySet()) {
|
|
|
+
|
|
|
+ if (orgName.contains(tt.getKey())) {
|
|
|
+ Map<String, Object> area = tt.getValue();
|
|
|
+ cityId = area.get("area_id");
|
|
|
+ List<Map<String, Object>> areaList = areaMap.get(area.get("area_id"));
|
|
|
+ for (Map<String, Object> ttt : areaList) {
|
|
|
+
|
|
|
+ if (orgName.contains((CharSequence) ttt.get("area_name"))) {
|
|
|
+ areaId = ttt.get("area_id");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
+ printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId,
|
|
|
+ cityId, areaId);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ * 导入数据
|
|
|
+ *
|
|
|
+ */
|
|
|
+ public void importData() throws IOException, InterruptedException {
|
|
|
+ String sql = "drop table if exists sqmdb_rpt.acl_top_user_bak";
|
|
|
+ log.info(sql);
|
|
|
+ XxlJobHelper.log(sql);
|
|
|
+ jdbcTemplate.execute(sql);
|
|
|
+ sql = "create table sqmdb_rpt.acl_top_user_bak as select * from sqmdb_rpt.acl_top_user";
|
|
|
+ log.info(sql);
|
|
|
+ XxlJobHelper.log(sql);
|
|
|
+ jdbcTemplate.execute(sql);
|
|
|
+ sql = "truncate table sqmdb_rpt.acl_top_user";
|
|
|
+ log.info(sql);
|
|
|
+ XxlJobHelper.log(sql);
|
|
|
+ jdbcTemplate.execute(sql);
|
|
|
+ PsqlUtil.copyCsv(taskConfig.getImportScript(), taskConfig.getDbHost(), taskConfig.getDbPort(),
|
|
|
+ taskConfig.getDbUsername(), taskConfig.getDbPassword(), taskConfig.getDbName(), taskConfig.getDbTable(),
|
|
|
+ taskConfig.getDistinctFilename(), taskConfig.getMinInsertCount());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ * 回滚
|
|
|
+ */
|
|
|
+ public void rollback() {
|
|
|
+ String sql = "drop table if exists sqmdb_rpt.acl_top_user";
|
|
|
+ log.warn(sql);
|
|
|
+ XxlJobHelper.log(sql);
|
|
|
+ jdbcTemplate.execute(sql);
|
|
|
+ sql = "alter table sqmdb_rpt.acl_top_user_bak rename to acl_top_user";
|
|
|
+ log.warn(sql);
|
|
|
+ XxlJobHelper.log(sql);
|
|
|
+ jdbcTemplate.execute(sql);
|
|
|
+ }
|
|
|
+
|
|
|
public void gpload() throws IOException {
|
|
|
String gploadCommand = "sh gpload/gpload.sh";
|
|
|
GploadResult gpload = GploadUtil.gpload(gploadCommand);
|
|
@@ -271,7 +285,7 @@ public class SyncTask {
|
|
|
log.info("gpload完成: {}", gpload);
|
|
|
XxlJobHelper.log("gpload完成: {}", gpload);
|
|
|
|
|
|
- Files.deleteIfExists(Paths.get(distinctFilename));
|
|
|
+ Files.deleteIfExists(Paths.get(taskConfig.getDistinctFilename()));
|
|
|
} else {
|
|
|
throw new MyRuntimeException("gpload失败: " + gpload.getMessage());
|
|
|
}
|