Browse Source

feat: 黑名单数据同步问题发现--添加数据文件生产异常提醒

weijianghai 2 years ago
parent
commit
91c0559a45
1 changed files with 150 additions and 6 deletions
  1. 150 6
      sms_blk_interface/src/main/java/com/nokia/sms/task/SyncTask.java

+ 150 - 6
sms_blk_interface/src/main/java/com/nokia/sms/task/SyncTask.java

@@ -12,18 +12,37 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
+import org.springframework.util.StringUtils;
 
+import java.io.BufferedWriter;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.stream.Collectors;
 
 @Component
 @Slf4j
 public class SyncTask {
+    /**
+     * 记录上次时间的文件路径
+     */
+    public static final String LAST_FILE = "/data1/sms_blk/sms_blk_interface/last.txt";
+    /**
+     * 记录无文件次数的文件
+     */
+    public static final String COUNT_FILE = "/data1/sms_blk/sms_blk_interface/count.txt";
+    /**
+     * 无文件次数告警值
+     */
+    public static final int ALERT_COUNT = 10;
+    public static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
     @Value("${sms.blk.ssh.host:10.17.182.24}")
     private String host;
     @Value("${sms.blk.ssh.port:22}")
@@ -54,22 +73,38 @@ public class SyncTask {
             if (CollectionUtils.isEmpty(list)) {
                 log.info("没有文件");
                 XxlJobHelper.log("没有文件");
+                String countString = readCount();
+                int count = StringUtils.hasText(countString) ? Integer.parseInt(countString) : 0;
+                if (count > ALERT_COUNT) {
+                    log.error("超过{}分钟没有文件", ALERT_COUNT);
+                    XxlJobHelper.log("超过{}分钟没有文件", ALERT_COUNT);
+                    updateCount(0);
+                } else {
+                    updateCount(count + 1);
+                }
                 return;
             }
 
             log.info("扫描到的文件: {}", list);
             XxlJobHelper.log("扫描到的文件: {}", list);
-            for (String t : list) {
-                // 跳过临时文件
-                if (t.endsWith(".tmp")) {
-                    continue;
-                }
-
+            // 过滤临时文件
+            List<String> newList = list.stream().filter(t -> !t.endsWith(".tmp")).collect(Collectors.toList());
+            // 检查文件
+            String lastString = readLast();
+            log.info("last: {}", lastString);
+            LocalDateTime last = StringUtils.hasText(lastString)
+                    ? getLocalDateTime(lastString).plusMinutes(5)
+                    : getLocalDateTime(getTimeString(newList.get(0)));
+            int size = list.size();
+            checkFile(last, list, size, 0);
+            // 入库
+            for (String t : newList) {
                 // 删除空文件
                 if (t.endsWith("_0")) {
                     log.info("删除空文件: {}", t);
                     XxlJobHelper.log("删除空文件: {}", t);
                     sshUtil.rm(sourceDir + t);
+                    updateLast(t);
                     continue;
                 }
                 singleTask(t);
@@ -81,6 +116,114 @@ public class SyncTask {
         }
     }
 
+    /**
+     * 更新上次时间
+     */
+    public static void updateLast(String filename) {
+        writeFile(LAST_FILE, getTimeString(filename));
+    }
+
+    /**
+     * 读取上次时间
+     */
+    public static String readLast() {
+        return readFile(LAST_FILE);
+    }
+
+    /**
+     * 更新无文件次数
+     */
+    public static void updateCount(int count) {
+        writeFile(COUNT_FILE, String.valueOf(count));
+    }
+
+    /**
+     * 获取无文件次数
+     */
+    public static String readCount() {
+        return readFile(COUNT_FILE);
+    }
+
+    /**
+     * 写文件
+     * @param filename 文件名
+     * @param content 内容
+     */
+    public static void writeFile(String filename, String content) {
+        try (BufferedWriter bufferedWriter = new BufferedWriter(new FileWriter(filename))) {
+            bufferedWriter.write(content);
+        } catch (IOException e) {
+            log.error("写{}失败", filename);
+            XxlJobHelper.log("写{}失败", filename);
+        }
+    }
+
+    /**
+     * 读文件内容
+     * @param filename 文件名
+     */
+    public static String readFile(String filename) {
+        List<String> list = null;
+        try {
+            list = Files.readAllLines(Paths.get(filename), StandardCharsets.UTF_8);
+        } catch (IOException e) {
+            log.error("读{}失败", filename);
+            XxlJobHelper.log("读{}失败", filename);
+        }
+        if (CollectionUtils.isEmpty(list)) {
+            return null;
+        }
+
+        return list.get(0);
+    }
+
+    /**
+     * 检查文件
+     * @param t1 上次时间
+     * @param list 文件名列表
+     * @param size 文件名列表大小
+     * @param index 索引
+     */
+    public static void checkFile(LocalDateTime t1, List<String> list, int size, int index) {
+        if (index >= size) {
+            return;
+        }
+
+        LocalDateTime t2 = getLocalDateTime(getTimeString(list.get(index)));
+        if (t1.isEqual(t2)) {
+            checkFile(t1.plusMinutes(5), list, size, index + 1);
+        } else if (t1.isAfter(t2)) {
+            log.error("{} 文件重复上传", getTimeString(t2));
+            XxlJobHelper.log("{} 文件重复上传", getTimeString(t2));
+            checkFile(t1, list, size, index + 1);
+        } else {
+            log.error("{} 文件未上传", getTimeString(t1));
+            XxlJobHelper.log("{} 文件未上传", getTimeString(t1));
+            checkFile(t1.plusMinutes(5), list, size, index);
+        }
+    }
+
+    /**
+     * 文件名截取时间字符串
+     */
+    public static String getTimeString(String filename) {
+        return filename.split("_")[2];
+    }
+
+    /**
+     * 时间字符串转LocalDateTime
+     */
+    public static LocalDateTime getLocalDateTime(String time) {
+        return LocalDateTime.parse(time, DATE_TIME_FORMATTER);
+    }
+
+    /**
+     * 格式化LocalDateTime
+     */
+    public static String getTimeString(LocalDateTime time) {
+        return time.format(DATE_TIME_FORMATTER);
+    }
+
     /**
      * 单一任务
      *
@@ -107,6 +250,7 @@ public class SyncTask {
         log.info("删除远程文件: {}", filename);
         XxlJobHelper.log("删除远程文件: {}", filename);
         sshUtil.rm(src);
+        updateLast(filename);
     }
 
     /**