Quellcode durchsuchen

fix: 修复gpload错误

weijianghai vor 2 Jahren
Ursprung
Commit
748951dcea

+ 1 - 1
build-dir/gpload/pm_nr_gpload.sh

@@ -4,7 +4,7 @@ source /usr/local/greenplum-db-clients/greenplum_loaders_path.sh
 filename=$1
 
 # 修改gpload配置文件
-sed -i 's/\/data1\/pm_5g\/distinct\/pm_5g_hour_[0-9]\{10\}.csv/\/data1\/pm_5g\/distinct\/'${filename}'/' /data1/pm_5g/gpload/pm_nr_gpload.yml
+sed -i 's/\/data1\/pm_5g\/distinct\/\S\+/\/data1\/pm_5g\/distinct\/'"${filename}"'/' /data1/pm_5g/gpload/pm_nr_gpload.yml
 
 # 由于gpload需要输入密码,这里需要使用expect执行
 password=Richr00t#

+ 0 - 1
build-dir/gpload/pm_nr_gpload.yml

@@ -7,7 +7,6 @@ GPLOAD:
   PRELOAD:
     - TRUNCATE: false
     - REUSE_TABLES: true
-    - STAGING_TABLE: extrnal_table_pm_5g_gpload
     - FAST_MATCH: true
   INPUT:
     - SOURCE:

+ 38 - 44
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -9,69 +9,50 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 @Slf4j
 public class GploadUtil {
-
+    public static final Pattern PATTERN = Pattern.compile("= (\\d+)");
     private static Process process = null;
     private static BufferedReader reader = null;
 
     public static GploadResult gpload(String gploadCommand) {
         GploadResult result = new GploadResult();
+        result.setTaskStatus(true);
         try {
-            Process process = Runtime.getRuntime().exec(gploadCommand);
+            process = Runtime.getRuntime().exec(gploadCommand);
             reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
             List<String> lines = new ArrayList<>();
             String line;
             while ((line = reader.readLine()) != null) {
                 lines.add(line);
             }
-            for (int i = 0; i < lines.size(); i++) {
-                if (lines.get(i).contains("|INFO|rows Inserted")) {
-                    // 找到第一行 对应的是插入数据的数量
-                    // s4可以判断gpload是否成功
-                    String s4 = lines.get(i + 3);
-                    if (s4.endsWith("succeeded")) {
-                        // 插入数量
-                        String s1 = lines.get(i);
-                        s1 = s1.substring(s1.indexOf('=') + 2);
-                        // 更新数量
-                        String s2 = lines.get(i + 1);
-                        s2 = s2.substring(s2.indexOf('=') + 2);
-                        // 错误数量
-                        String s3 = lines.get(i + 2);
-                        s3 = s3.substring(s3.indexOf('=') + 2);
-                        result.setInsertedCount(Integer.parseInt(s1));
-                        result.setUpdatedCount(Integer.parseInt(s2));
-                        result.setErrorCount(Integer.parseInt(s3));
-                    } else {
-                        StringBuilder sb = new StringBuilder();
-                        for (int j = i - 1; j > 0; j--) {
-                            sb.insert(0, lines.get(i));
-                            if (lines.get(i).contains("|ERROR|ERROR:")) {
-                                sb.insert(0, lines.get(i));
-                                break;
-                            }
-                        }
-                        result.setMessage(sb.toString());
-                    }
+            log.info("gpload result: " + lines);
+            XxlJobHelper.log("gpload result: " + lines);
+            for (String s : lines) {
+                if (s.contains("|rows Inserted")) {
+                    result.setInsertedCount(getNum(s));
+                }
+                if (s.contains("|rows Updated")) {
+                    result.setUpdatedCount(getNum(s));
+                }
+                if (s.contains("|data formatting errors")) {
+                    result.setErrorCount(getNum(s));
+                }
+                if (s.contains("ERROR") || s.contains("gpload failed")) {
+                    result.setTaskStatus(false);
+                }
+                if (s.contains("ERROR")) {
+                    result.setMessage(s);
                 }
             }
-            int exitCode = process.waitFor();
-            result.setTaskStatus(exitCode == 0);
-            if (Thread.interrupted()) {
-                log.error("gpload线程阻塞");
-                XxlJobHelper.log("gpload线程阻塞");
-                throw new InterruptedException();
-            }
-        } catch (IOException | InterruptedException e) {
-            log.debug("gpload发生异常: {}", e.getMessage(), e);
+        } catch (IOException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
             XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
             result.setTaskStatus(false);
             result.setMessage(e.getMessage());
-            if (e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
         } finally {
             log.debug("gpload的结果为: {}", result);
             XxlJobHelper.log("gpload的结果为: {}", result);
@@ -85,7 +66,8 @@ public class GploadUtil {
             try {
                 reader.close();
             } catch (IOException e) {
-                e.printStackTrace();
+                log.error("close reader error: {}", e.getMessage(), e);
+                XxlJobHelper.log("close reader error: {}", e.getMessage(), e);
             }
             reader = null;
         }
@@ -94,4 +76,16 @@ public class GploadUtil {
             process = null;
         }
     }
+
+    /**
+     * 获取gpload统计条数
+     */
+    private static int getNum(String s) {
+        Matcher matcher = PATTERN.matcher(s);
+        if (matcher.find()) {
+            return Integer.parseInt(matcher.group(1));
+        }
+
+        return 0;
+    }
 }

+ 1 - 2
src/main/java/com/nokia/pm_interface_5g/task/FiveGPmTask.java

@@ -165,8 +165,7 @@ public class FiveGPmTask {
             // 删除重排文件
             Files.deleteIfExists(Paths.get(distinctTargetDir + filename));
         } else {
-            log.error("gpload 失败: {}", gpload.getMessage());
-            XxlJobHelper.log("gpload 失败: {}", gpload.getMessage());
+            throw new RuntimeException(gpload.getMessage());
         }
     }