Browse Source

feat: init

weijianghai 2 years ago
commit
da27fef95a
31 changed files with 1912 additions and 0 deletions
  1. 581 0
      .gitignore
  2. 105 0
      pom.xml
  3. 9 0
      readme.md
  4. 15 0
      scripts/product/gpload/gpload.sh
  5. 24 0
      scripts/product/gpload/gpload.yml
  6. 6 0
      scripts/product/rollback.sh
  7. 3 0
      scripts/product/run.sh
  8. 5 0
      scripts/product/stop.sh
  9. 7 0
      scripts/product/update.sh
  10. 14 0
      scripts/test/gpload/gpload.sh
  11. 24 0
      scripts/test/gpload/gpload.yml
  12. 6 0
      scripts/test/rollback.sh
  13. 3 0
      scripts/test/run.sh
  14. 5 0
      scripts/test/stop.sh
  15. 7 0
      scripts/test/update.sh
  16. 13 0
      src/main/java/com/nokia/AclTousuTaskApplication.java
  17. 22 0
      src/main/java/com/nokia/common/exception/MyRuntimeException.java
  18. 105 0
      src/main/java/com/nokia/common/gpload/GploadUtil.java
  19. 12 0
      src/main/java/com/nokia/common/gpload/entity/GploadResult.java
  20. 383 0
      src/main/java/com/nokia/common/ssh/SSHUtil.java
  21. 15 0
      src/main/java/com/nokia/common/ssh/entity/SSHServer.java
  22. 35 0
      src/main/java/com/nokia/common/ssh/entity/UserInfoImpl.java
  23. 8 0
      src/main/java/com/nokia/common/ssh/exception/SSHUtilException.java
  24. 7 0
      src/main/java/com/nokia/common/ssh/exception/ScpAckErrorException.java
  25. 76 0
      src/main/java/com/nokia/config/XxlJobConfig.java
  26. 279 0
      src/main/java/com/nokia/task/SyncTask.java
  27. 34 0
      src/main/resources/application-product.properties
  28. 32 0
      src/main/resources/application-test.properties
  29. 2 0
      src/main/resources/application.properties
  30. 54 0
      src/main/resources/logback-spring.xml
  31. 21 0
      src/test/java/com/nokia/acl_tousu_task/AclTousuTaskApplicationTests.java

+ 581 - 0
.gitignore

@@ -0,0 +1,581 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**
+!**/src/test/**
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+
+### VS Code ###
+.vscode/
+### Java template
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+### Maven template
+pom.xml.tag
+pom.xml.releaseBackup
+pom.xml.versionsBackup
+pom.xml.next
+release.properties
+dependency-reduced-pom.xml
+buildNumber.properties
+.mvn/timing.properties
+# https://github.com/takari/maven-wrapper#usage-without-binary-jar
+.mvn/wrapper/maven-wrapper.jar
+
+### VisualStudioCode template
+.vscode/*
+!.vscode/settings.json
+!.vscode/tasks.json
+!.vscode/launch.json
+!.vscode/extensions.json
+*.code-workspace
+
+# Local History for Visual Studio Code
+.history/
+
+### JetBrains template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn.  Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+
+### Eclipse template
+.metadata
+bin/
+tmp/
+*.tmp
+*.bak
+*.swp
+*~.nib
+local.properties
+.settings/
+.loadpath
+.recommenders
+
+# External tool builders
+.externalToolBuilders/
+
+# Locally stored "Eclipse launch configurations"
+*.launch
+
+# PyDev specific (Python IDE for Eclipse)
+*.pydevproject
+
+# CDT-specific (C/C++ Development Tooling)
+.cproject
+
+# CDT- autotools
+.autotools
+
+# Java annotation processor (APT)
+
+# PDT-specific (PHP Development Tools)
+.buildpath
+
+# sbteclipse plugin
+.target
+
+# Tern plugin
+.tern-project
+
+# TeXlipse plugin
+.texlipse
+
+# STS (Spring Tool Suite)
+
+# Code Recommenders
+.recommenders/
+
+# Annotation Processing
+.apt_generated/
+.apt_generated_test/
+
+# Scala IDE specific (Scala & Java development for Eclipse)
+.cache-main
+.scala_dependencies
+.worksheet
+
+# Uncomment this line if you wish to ignore the project description file.
+# Typically, this file would be tracked if it contains build/dependency configurations:
+#.project
+
+### Example user template template
+### Example user template
+
+# IntelliJ project files
+out
+gen
+### VisualStudio template
+## Ignore Visual Studio temporary files, build results, and
+## files generated by popular Visual Studio add-ons.
+##
+## Get latest from https://github.com/github/gitignore/blob/master/VisualStudio.gitignore
+
+# User-specific files
+*.rsuser
+*.suo
+*.user
+*.userosscache
+*.sln.docstates
+
+# User-specific files (MonoDevelop/Xamarin Studio)
+*.userprefs
+
+# Mono auto generated files
+mono_crash.*
+
+# Build results
+[Dd]ebug/
+[Dd]ebugPublic/
+[Rr]elease/
+[Rr]eleases/
+x64/
+x86/
+[Ww][Ii][Nn]32/
+[Aa][Rr][Mm]/
+[Aa][Rr][Mm]64/
+bld/
+[Bb]in/
+[Oo]bj/
+[Ll]og/
+[Ll]ogs/
+
+# Visual Studio 2015/2017 cache/options directory
+.vs/
+# Uncomment if you have tasks that create the project's static files in wwwroot
+#wwwroot/
+
+# Visual Studio 2017 auto generated files
+Generated\ Files/
+
+# MSTest test Results
+[Tt]est[Rr]esult*/
+[Bb]uild[Ll]og.*
+
+# NUnit
+*.VisualState.xml
+TestResult.xml
+nunit-*.xml
+
+# Build Results of an ATL Project
+[Dd]ebugPS/
+[Rr]eleasePS/
+dlldata.c
+
+# Benchmark Results
+BenchmarkDotNet.Artifacts/
+
+# .NET Core
+project.lock.json
+project.fragment.lock.json
+artifacts/
+
+# ASP.NET Scaffolding
+ScaffoldingReadMe.txt
+
+# StyleCop
+StyleCopReport.xml
+
+# Files built by Visual Studio
+*_i.c
+*_p.c
+*_h.h
+*.ilk
+*.meta
+*.obj
+*.iobj
+*.pch
+*.pdb
+*.ipdb
+*.pgc
+*.pgd
+*.rsp
+*.sbr
+*.tlb
+*.tli
+*.tlh
+*.tmp_proj
+*_wpftmp.csproj
+*.vspscc
+*.vssscc
+.builds
+*.pidb
+*.svclog
+*.scc
+
+# Chutzpah Test files
+_Chutzpah*
+
+# Visual C++ cache files
+ipch/
+*.aps
+*.ncb
+*.opendb
+*.opensdf
+*.sdf
+*.cachefile
+*.VC.db
+*.VC.VC.opendb
+
+# Visual Studio profiler
+*.psess
+*.vsp
+*.vspx
+*.sap
+
+# Visual Studio Trace Files
+*.e2e
+
+# TFS 2012 Local Workspace
+$tf/
+
+# Guidance Automation Toolkit
+*.gpState
+
+# ReSharper is a .NET coding add-in
+_ReSharper*/
+*.[Rr]e[Ss]harper
+*.DotSettings.user
+
+# TeamCity is a build add-in
+_TeamCity*
+
+# DotCover is a Code Coverage Tool
+*.dotCover
+
+# AxoCover is a Code Coverage Tool
+.axoCover/*
+!.axoCover/settings.json
+
+# Coverlet is a free, cross platform Code Coverage Tool
+coverage*.json
+coverage*.xml
+coverage*.info
+
+# Visual Studio code coverage results
+*.coverage
+*.coveragexml
+
+# NCrunch
+_NCrunch_*
+.*crunch*.local.xml
+nCrunchTemp_*
+
+# MightyMoose
+*.mm.*
+AutoTest.Net/
+
+# Web workbench (sass)
+.sass-cache/
+
+# Installshield output folder
+[Ee]xpress/
+
+# DocProject is a documentation generator add-in
+DocProject/buildhelp/
+DocProject/Help/*.HxT
+DocProject/Help/*.HxC
+DocProject/Help/*.hhc
+DocProject/Help/*.hhk
+DocProject/Help/*.hhp
+DocProject/Help/Html2
+DocProject/Help/html
+
+# Click-Once directory
+publish/
+
+# Publish Web Output
+*.[Pp]ublish.xml
+*.azurePubxml
+# Note: Comment the next line if you want to checkin your web deploy settings,
+# but database connection strings (with potential passwords) will be unencrypted
+*.pubxml
+*.publishproj
+
+# Microsoft Azure Web App publish settings. Comment the next line if you want to
+# checkin your Azure Web App publish settings, but sensitive information contained
+# in these scripts will be unencrypted
+PublishScripts/
+
+# NuGet Packages
+*.nupkg
+# NuGet Symbol Packages
+*.snupkg
+# The packages folder can be ignored because of Package Restore
+**/[Pp]ackages/*
+# except build/, which is used as an MSBuild target.
+!**/[Pp]ackages/build/
+# Uncomment if necessary however generally it will be regenerated when needed
+#!**/[Pp]ackages/repositories.config
+# NuGet v3's project.json files produces more ignorable files
+*.nuget.props
+*.nuget.targets
+
+# Microsoft Azure Build Output
+csx/
+*.build.csdef
+
+# Microsoft Azure Emulator
+ecf/
+rcf/
+
+# Windows Store app package directories and files
+AppPackages/
+BundleArtifacts/
+Package.StoreAssociation.xml
+_pkginfo.txt
+*.appx
+*.appxbundle
+*.appxupload
+
+# Visual Studio cache files
+# files ending in .cache can be ignored
+*.[Cc]ache
+# but keep track of directories ending in .cache
+!?*.[Cc]ache/
+
+# Others
+ClientBin/
+~$*
+*~
+*.dbmdl
+*.dbproj.schemaview
+*.jfm
+*.pfx
+*.publishsettings
+orleans.codegen.cs
+
+# Including strong name files can present a security risk
+# (https://github.com/github/gitignore/pull/2483#issue-259490424)
+#*.snk
+
+# Since there are multiple workflows, uncomment next line to ignore bower_components
+# (https://github.com/github/gitignore/pull/1529#issuecomment-104372622)
+#bower_components/
+
+# RIA/Silverlight projects
+Generated_Code/
+
+# Backup & report files from converting an old project file
+# to a newer Visual Studio version. Backup files are not needed,
+# because we have git ;-)
+_UpgradeReport_Files/
+Backup*/
+UpgradeLog*.XML
+UpgradeLog*.htm
+ServiceFabricBackup/
+*.rptproj.bak
+
+# SQL Server files
+*.mdf
+*.ldf
+*.ndf
+
+# Business Intelligence projects
+*.rdl.data
+*.bim.layout
+*.bim_*.settings
+*.rptproj.rsuser
+*- [Bb]ackup.rdl
+*- [Bb]ackup ([0-9]).rdl
+*- [Bb]ackup ([0-9][0-9]).rdl
+
+# Microsoft Fakes
+FakesAssemblies/
+
+# GhostDoc plugin setting file
+*.GhostDoc.xml
+
+# Node.js Tools for Visual Studio
+.ntvs_analysis.dat
+node_modules/
+
+# Visual Studio 6 build log
+*.plg
+
+# Visual Studio 6 workspace options file
+*.opt
+
+# Visual Studio 6 auto-generated workspace file (contains which files were open etc.)
+*.vbw
+
+# Visual Studio LightSwitch build output
+**/*.HTMLClient/GeneratedArtifacts
+**/*.DesktopClient/GeneratedArtifacts
+**/*.DesktopClient/ModelManifest.xml
+**/*.Server/GeneratedArtifacts
+**/*.Server/ModelManifest.xml
+_Pvt_Extensions
+
+# Paket dependency manager
+.paket/paket.exe
+paket-files/
+
+# FAKE - F# Make
+.fake/
+
+# CodeRush personal settings
+.cr/personal
+
+# Python Tools for Visual Studio (PTVS)
+__pycache__/
+*.pyc
+
+# Cake - Uncomment if you are using it
+# tools/**
+# !tools/packages.config
+
+# Tabs Studio
+*.tss
+
+# Telerik's JustMock configuration file
+*.jmconfig
+
+# BizTalk build output
+*.btp.cs
+*.btm.cs
+*.odx.cs
+*.xsd.cs
+
+# OpenCover UI analysis results
+OpenCover/
+
+# Azure Stream Analytics local run output
+ASALocalRun/
+
+# MSBuild Binary and Structured Log
+*.binlog
+
+# NVidia Nsight GPU debugger configuration file
+*.nvuser
+
+# MFractors (Xamarin productivity tool) working folder
+.mfractor/
+
+# Local History for Visual Studio
+.localhistory/
+
+# BeatPulse healthcheck temp database
+healthchecksdb
+
+# Backup folder for Package Reference Convert tool in Visual Studio 2017
+MigrationBackup/
+
+# Ionide (cross platform F# VS Code tools) working folder
+.ionide/
+
+# Fody - auto-generated XML schema
+FodyWeavers.xsd
+
+/.mvn/
+/mvnw
+/mvnw.cmd

+ 105 - 0
pom.xml

@@ -0,0 +1,105 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.6.14</version>
+        <relativePath/> <!-- lookup parent from repository -->
+    </parent>
+    <groupId>com.nokia</groupId>
+    <artifactId>acl_tousu_task</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>acl_tousu_task</name>
+    <description>acl_tousu_task</description>
+    <properties>
+        <java.version>8</java.version>
+        <maven.compiler.source>${java.version}</maven.compiler.source>
+        <maven.compiler.target>${java.version}</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>${project.build.sourceEncoding}</project.reporting.outputEncoding>
+        <skipTests>true</skipTests>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.postgresql</groupId>
+            <artifactId>postgresql</artifactId>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-configuration-processor</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-csv -->
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-csv</artifactId>
+            <version>1.9.0</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>2.0.22</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/com.jcraft/jsch -->
+        <dependency>
+            <groupId>com.jcraft</groupId>
+            <artifactId>jsch</artifactId>
+            <version>0.1.55</version>
+        </dependency>
+        <dependency>
+            <groupId>com.xuxueli</groupId>
+            <artifactId>xxl-job-core</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>aclTousuTask</finalName>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>${project.parent.version}</version>
+                <executions>
+                    <execution>
+                        <id>repackage</id>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <excludes>
+                        <exclude>
+                            <groupId>org.projectlombok</groupId>
+                            <artifactId>lombok</artifactId>
+                        </exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 9 - 0
readme.md

@@ -0,0 +1,9 @@
+# acl投诉定时任务
+
+## 测试环境
+
+- 部署位置:192.168.50.3/data1/acl_task/
+
+## 正式环境
+
+- 部署位置:192.168.70.130/data1/acl_task/

+ 15 - 0
scripts/product/gpload/gpload.sh

@@ -0,0 +1,15 @@
+#!/bin/bash
+
+source /usr/local/greenplum-db-clients/greenplum_loaders_path.sh
+
+# 由于gpload需要输入密码,这里需要使用expect执行
+password=sqmdb_1QAZ
+expect -c "
+set timeout 300
+spawn gpload -f gpload/gpload.yml -V --gpfdist_timeout 30
+expect {
+\"connecting (yes/no)?\" { send \"yes\n\";exp_continue }
+\"Password:\" { send \"${password}\n\"; exp_continue}
+timeout { puts \"超时\" exit 2}
+}
+"

+ 24 - 0
scripts/product/gpload/gpload.yml

@@ -0,0 +1,24 @@
+VERSION: 1.0.0.1
+DATABASE: sqmmt
+USER: sqmdb
+HOST: 192.168.70.109
+PORT: 5432
+GPLOAD:
+  PRELOAD:
+    - TRUNCATE: true
+    - REUSE_TABLES: false
+  INPUT:
+    - SOURCE:
+        LOCAL_HOSTNAME:
+          - 192.168.70.130
+        PORT: 54331
+        FILE:
+          - top-user.csv
+    - FORMAT: csv
+    - DELIMITER: ','
+    - ENCODING: utf-8
+    - ERROR_LIMIT: 20000000
+    - LOG_ERRORS: true
+  OUTPUT:
+    - TABLE: sqmdb_rpt.acl_top_user
+    - MODE: insert

+ 6 - 0
scripts/product/rollback.sh

@@ -0,0 +1,6 @@
+#!/bin/bash
+
+sh stop.sh
+rm -rf aclTousuTask.jar
+mv aclTousuTask.jar.bak aclTousuTask.jar
+sh run.sh

+ 3 - 0
scripts/product/run.sh

@@ -0,0 +1,3 @@
+#!/bin/bash
+
+nohup java -jar aclTousuTask.jar >/dev/null 2>&1 &

+ 5 - 0
scripts/product/stop.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+for i in $(pgrep -f aclTousuTask.jar); do
+  kill -9 "$i"
+done

+ 7 - 0
scripts/product/update.sh

@@ -0,0 +1,7 @@
+#!/bin/bash
+
+rm -rf aclTousuTask.jar.bak
+sh stop.sh
+mv aclTousuTask.jar aclTousuTask.jar.bak
+mv aclTousuTask.jar.new aclTousuTask.jar
+sh run.sh

+ 14 - 0
scripts/test/gpload/gpload.sh

@@ -0,0 +1,14 @@
+#!/bin/bash
+
+source /usr/local/greenplum-db-clients/greenplum_loaders_path.sh
+# 由于gpload需要输入密码,这里需要使用expect执行
+password=sqmdb_1QAZ
+expect -c "
+set timeout 300
+spawn gpload -f gpload/gpload.yml -V --gpfdist_timeout 30
+expect {
+\"connecting (yes/no)?\" { send \"yes\n\";exp_continue }
+\"Password:\" { send \"${password}\n\"; exp_continue}
+timeout { puts \"超时\" exit 2}
+}
+"

+ 24 - 0
scripts/test/gpload/gpload.yml

@@ -0,0 +1,24 @@
+VERSION: 1.0.0.1
+DATABASE: sqmmt
+USER: sqmdb
+HOST: 192.168.50.5
+PORT: 5432
+GPLOAD:
+  PRELOAD:
+    - TRUNCATE: true
+    - REUSE_TABLES: false
+  INPUT:
+    - SOURCE:
+        LOCAL_HOSTNAME:
+          - 192.168.50.3
+        PORT: 54331
+        FILE:
+          - top-user.csv
+    - FORMAT: csv
+    - DELIMITER: ','
+    - ENCODING: utf-8
+    - ERROR_LIMIT: 20000000
+    - LOG_ERRORS: true
+  OUTPUT:
+    - TABLE: sqmdb_rpt.acl_top_user
+    - MODE: insert

+ 6 - 0
scripts/test/rollback.sh

@@ -0,0 +1,6 @@
+#!/bin/bash
+
+sh stop.sh
+rm -rf aclTousuTask.jar
+mv aclTousuTask.jar.bak aclTousuTask.jar
+sh run.sh

+ 3 - 0
scripts/test/run.sh

@@ -0,0 +1,3 @@
+#!/bin/bash
+
+nohup java -jar aclTousuTask.jar >/dev/null 2>&1 &

+ 5 - 0
scripts/test/stop.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+for i in $(pgrep -f aclTousuTask.jar); do
+  kill -9 "$i"
+done

+ 7 - 0
scripts/test/update.sh

@@ -0,0 +1,7 @@
+#!/bin/bash
+
+rm -rf aclTousuTask.jar.bak
+sh stop.sh
+mv aclTousuTask.jar aclTousuTask.jar.bak
+mv aclTousuTask.jar.new aclTousuTask.jar
+sh run.sh

+ 13 - 0
src/main/java/com/nokia/AclTousuTaskApplication.java

@@ -0,0 +1,13 @@
+package com.nokia;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class AclTousuTaskApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(AclTousuTaskApplication.class, args);
+    }
+
+}

+ 22 - 0
src/main/java/com/nokia/common/exception/MyRuntimeException.java

@@ -0,0 +1,22 @@
+package com.nokia.common.exception;
+
+public class MyRuntimeException extends RuntimeException{
+    public MyRuntimeException() {
+    }
+
+    public MyRuntimeException(String message) {
+        super(message);
+    }
+
+    public MyRuntimeException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MyRuntimeException(Throwable cause) {
+        super(cause);
+    }
+
+    public MyRuntimeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}

+ 105 - 0
src/main/java/com/nokia/common/gpload/GploadUtil.java

@@ -0,0 +1,105 @@
+package com.nokia.common.gpload;
+
+import com.nokia.common.gpload.entity.GploadResult;
+import com.xxl.job.core.context.XxlJobHelper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.util.CollectionUtils;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+@Slf4j
+public class GploadUtil {
+    public static final Pattern PATTERN = Pattern.compile("= (\\d+)");
+
+    public static GploadResult gpload(String gploadCommand) {
+        Process process = null;
+        GploadResult result = new GploadResult();
+        result.setTaskStatus(true);
+        try {
+            process = Runtime.getRuntime().exec(gploadCommand);
+            try (BufferedReader inputReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+                 BufferedReader errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
+                String line;
+                // 读取错误流
+                List<String> error = new ArrayList<>();
+                while ((line = errorReader.readLine()) != null) {
+                    error.add(line);
+                }
+                if (!CollectionUtils.isEmpty(error)) {
+                    log.info("gpload errorStream: " + error);
+                    XxlJobHelper.log("gpload errorStream: " + error);
+                    result.setTaskStatus(false);
+                    result.setMessage(error.toString());
+                }
+
+                // 读取标准流
+                List<String> input = new ArrayList<>();
+                while ((line = inputReader.readLine()) != null) {
+                    input.add(line);
+                }
+                if (!CollectionUtils.isEmpty(input)) {
+                    log.info("gpload inputStream: " + input);
+                    XxlJobHelper.log("gpload inputStream: " + input);
+                    for (String s : input) {
+                        if (s.contains("|rows Inserted")) {
+                            result.setInsertedCount(getNum(s));
+                        }
+                        if (s.contains("|rows Updated")) {
+                            result.setUpdatedCount(getNum(s));
+                        }
+                        if (s.contains("|data formatting errors")) {
+                            result.setErrorCount(getNum(s));
+                        }
+                        if (s.contains("ERROR") || s.contains("gpload failed")) {
+                            result.setTaskStatus(false);
+                        }
+                        if (s.contains("ERROR")) {
+                            result.setMessage(result.getMessage() + "\n" + s);
+                        }
+                    }
+                }
+            }
+
+            int code = process.waitFor();
+            if (code != 0) {
+                result.setTaskStatus(false);
+            }
+        } catch (IOException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
+            XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
+            result.setTaskStatus(false);
+            result.setMessage(e.getMessage());
+        } catch (InterruptedException e) {
+            log.error("gpload发生异常: {}", e.getMessage(), e);
+            XxlJobHelper.log("gpload发生异常: {}", e.getMessage(), e);
+            result.setTaskStatus(false);
+            result.setMessage(e.getMessage());
+            Thread.currentThread().interrupt();
+        } finally {
+            log.info("gpload的结果为: {}", result);
+            XxlJobHelper.log("gpload的结果为: {}", result);
+            if (process != null) {
+                process.destroy();
+            }
+        }
+        return result;
+    }
+
+    /**
+     * 获取gpload统计条数
+     */
+    private static int getNum(String s) {
+        Matcher matcher = PATTERN.matcher(s);
+        if (matcher.find()) {
+            return Integer.parseInt(matcher.group(1));
+        }
+
+        return 0;
+    }
+}

+ 12 - 0
src/main/java/com/nokia/common/gpload/entity/GploadResult.java

@@ -0,0 +1,12 @@
+package com.nokia.common.gpload.entity;
+
+import lombok.Data;
+
+@Data
+public class GploadResult {
+    private Boolean taskStatus;
+    private Integer insertedCount;
+    private Integer updatedCount;
+    private Integer errorCount;
+    private String message = "";
+}

+ 383 - 0
src/main/java/com/nokia/common/ssh/SSHUtil.java

@@ -0,0 +1,383 @@
+package com.nokia.common.ssh;
+
+import com.jcraft.jsch.*;
+import com.nokia.common.ssh.entity.SSHServer;
+import com.nokia.common.ssh.entity.UserInfoImpl;
+import com.nokia.common.ssh.exception.SSHUtilException;
+import com.nokia.common.ssh.exception.ScpAckErrorException;
+import com.xxl.job.core.context.XxlJobHelper;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.*;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.stream.Collectors;
+
+/**
+ * 使用jsch库实现的ssh的工具类
+ * <p>
+ * todo: scpTo和scpFrom 在本机和targetServer默认编码不一致的时候,文件名中的中文会乱码,但是不会影响到文件内容,
+ */
+
+@Slf4j
+public class SSHUtil {
+
+    @Getter
+    @Setter
+    private SSHServer targetServer = new SSHServer();
+    private Session session = null;
+    private Channel channel = null;
+    private JSch jSch = null;
+    private FileInputStream fileInputStream = null;
+    private FileOutputStream fileOutputStream = null;
+    private OutputStream outputStream = null;
+    private InputStream inputStream = null;
+    private ChannelSftp channelSftp = null;
+
+    public SSHUtil() {
+    }
+
+    public SSHUtil(String host, String user, String password) {
+        targetServer = new SSHServer(host, 22, user, password);
+    }
+
+    public SSHUtil(String host, Integer port, String user, String password) {
+        targetServer = new SSHServer(host, port, user, password);
+    }
+
+    /**
+     * 获取文件列表
+     */
+    @SuppressWarnings("rawtypes")
+    public List<String> ls(String path) throws JSchException, SftpException {
+        getConnectSession();
+        channelSftpConnect();
+        List<String> fileNameList = new ArrayList<>();
+        Vector fileList = channelSftp.ls(path);
+        for (Object o : fileList) {
+            String fileName = ((ChannelSftp.LsEntry) o).getFilename();
+            if (".".equals(fileName) || "..".equals(fileName)) {
+                continue;
+            }
+            fileNameList.add(fileName);
+        }
+
+        return fileNameList.stream().sorted().collect(Collectors.toList());
+    }
+
+    /**
+     * 下载文件
+     */
+    public void get(String src, String dst) throws JSchException, SftpException, IOException {
+        try (OutputStream out = Files.newOutputStream(Paths.get(dst))) {
+            getConnectSession();
+            channelSftpConnect();
+            channelSftp.get(src, out);
+        }
+    }
+
+    /**
+     * 删除文件
+     */
+    public void rm(String path) throws JSchException, SftpException {
+        getConnectSession();
+        channelSftpConnect();
+        channelSftp.rm(path);
+    }
+
+    /**
+     * 远程执行指令
+     */
+    public String exec(String command) throws JSchException, IOException {
+        StringBuilder stringBuilder = new StringBuilder();
+        getConnectSession();
+        channel = session.openChannel("exec");
+        // jsch的登陆是无环境登陆即非login状态登陆,因此是没有环境变量的,
+        String execCommand;
+        // 在命令前添加 bash --login -c "command"以获取环境变量
+        // source .bashrc && command 也可以解决问题, 但是可能环境加载不全
+        if (command.startsWith("bash --login -c")) {
+            execCommand = command;
+        } else {
+            execCommand = String.format("bash --login -c \"%s\"", command);
+        }
+        ((ChannelExec) channel).setCommand(execCommand);
+        channel.setInputStream(null);
+        ((ChannelExec) channel).setErrStream(System.err);
+        InputStream in = channel.getInputStream();
+        channel.connect();
+        byte[] tmp = new byte[1024];
+        while (true) {
+            while (in.available() > 0) {
+                int i = in.read(tmp, 0, 1024);
+                if (i < 0) {
+                    break;
+                }
+                stringBuilder.append(new String(tmp, 0, i));
+            }
+            if (channel.isClosed()) {
+                if (in.available() > 0) {
+                    continue;
+                }
+                break;
+            }
+        }
+        return stringBuilder.toString();
+    }
+
+    /**
+     * 使用SCP把本地文件推送到targetServer目录下
+     * <p>
+     * 注意,文件名不能包含中文
+     */
+    public boolean scpTo(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -t 指定为to 也就是目的端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp " + "-t " + targetPath;
+        channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        File sourceFile = new File(sourceFilePath);
+        if (sourceFile.isDirectory()) {
+            log.error("sourceFilePath 必须是文件");
+            XxlJobHelper.log("sourceFilePath 必须是文件");
+            return false;
+        }
+        long fileSize = sourceFile.length();
+        command = "C0644 " + fileSize + " " + sourceFile.getName() + "\n";
+        outputStream.write(command.getBytes());
+        outputStream.flush();
+        if (checkAck(inputStream) != 0) {
+            log.error("scpTo 执行失败");
+            XxlJobHelper.log("scpTo 执行失败");
+            return false;
+        }
+        fileInputStream = new FileInputStream(sourceFile);
+        byte[] buffer = new byte[1024];
+        while (true) {
+            int len = fileInputStream.read(buffer, 0, buffer.length);
+            if (len <= 0) {
+                break;
+            }
+            outputStream.write(buffer, 0, len);
+        }
+        buffer[0] = 0;
+        outputStream.write(buffer, 0, 1);
+        outputStream.flush();
+        return checkAck(inputStream) == 0;
+    }
+
+    /**
+     * 使用scp把targetServer目录下的文件复制到本地
+     */
+    public boolean scpFrom(String sourceFilePath, String targetPath) throws JSchException, IOException, SSHUtilException {
+        log.info(sourceFilePath);
+        XxlJobHelper.log(sourceFilePath);
+        getConnectSession();
+        // scp内置了两个参数 -t 和 -f ,这两个参数是隐藏的,不会被用户显式提供,
+        // 两个scp进程之间传输数据时,远端机器上的scp进程被本地scp进程启动起来时提供上去。
+        // 需要说明的是,这是通过本地scp进程经ssh远程过去开启远端机器的scp进程来实现的。
+        // -f 指定对端为from 也就是源端模式 指定的对象就是session对应的连接对象targetServer
+        String command = "scp -f " + sourceFilePath;
+        Channel channel = session.openChannel("exec");
+        ((ChannelExec) channel).setCommand(command);
+        outputStream = channel.getOutputStream();
+        inputStream = channel.getInputStream();
+        channel.connect();
+        byte[] buf = new byte[1024];
+        // 发送指令 '0'
+        // 源端会一直等宿端的回应, 直到等到回应才会传输下一条协议文本.
+        // 在送出最后一条协议文本后, 源端会传出一个大小为零的字符'0'来表示真正文件传输的开始.
+        // 当文件接收完成后, 宿端会给源端发送一个'0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 接收C0644 这条消息携带了文件的信息
+        while (true) {
+            int c = checkAck(inputStream);
+            // 遇到C时跳出循环
+            if (c == 'C') {
+                break;
+            }
+        }
+        // 接收 '0644 ' 这段字符表示文件的权限
+        inputStream.read(buf, 0, 5);
+        // 获取filesize
+        long filesize = 0L;
+        while (true) {
+            if (inputStream.read(buf, 0, 1) < 0) {
+                break;
+            }
+            if (buf[0] == ' ') {
+                break;
+            }
+            filesize = filesize * 10L + (long) (buf[0] - '0');
+        }
+        // 从 C0644命令读取文件名,命令中的文件名是不带路径的
+        String file = null;
+        for (int i = 0; ; i++) {
+            inputStream.read(buf, i, 1);
+            // 0x0a 是LF 换行符
+            if (buf[i] == (byte) 0x0a) {
+                file = new String(buf, 0, i);
+                break;
+            }
+        }
+        log.info("filesize={}, file={}", filesize, file);
+        XxlJobHelper.log("filesize={}, file={}", filesize, file);
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        // 如果目标是目录,则需要加上文件名
+        File target = new File(targetPath);
+        if (target.isDirectory()) {
+            log.info("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            XxlJobHelper.log("{} 是目录,需要添加文件名", target.getAbsolutePath());
+            target = new File(targetPath + File.separator + file);
+        }
+
+        fileOutputStream = new FileOutputStream(target);
+        int foo;
+        while (true) {
+            if (buf.length < filesize) {
+                foo = buf.length;
+            } else {
+                foo = (int) filesize;
+            }
+            foo = inputStream.read(buf, 0, foo);
+            if (foo < 0) {
+                break;
+            }
+            fileOutputStream.write(buf, 0, foo);
+            filesize -= foo;
+            if (filesize == 0L) {
+                break;
+            }
+        }
+        if (checkAck(inputStream) != 0) {
+            return false;
+        }
+        // 发送 '0'
+        buf[0] = 0;
+        outputStream.write(buf, 0, 1);
+        outputStream.flush();
+        log.info("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        XxlJobHelper.log("scp from {}@{}:{}{} to {} 完成", targetServer.getUser(), targetServer.getHost(), targetServer.getPort(), sourceFilePath, target.getAbsolutePath());
+        return true;
+    }
+
+    public void getConnectSession() throws JSchException {
+        if (jSch == null) {
+            jSch = new JSch();
+        }
+
+        if (session == null) {
+            session = jSch.getSession(targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            session.setPassword(targetServer.getPassword());
+            session.setUserInfo(new UserInfoImpl());
+            // 不需要输入保存ssh安全密钥的yes或no
+            Properties properties = new Properties();
+            properties.put("StrictHostKeyChecking", "no");
+            session.setConfig(properties);
+        }
+
+        if (!session.isConnected()) {
+            session.connect();
+            log.info("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+            XxlJobHelper.log("已连接到{}@{}:{}", targetServer.getUser(), targetServer.getHost(), targetServer.getPort());
+        }
+    }
+
+    public void channelSftpConnect() throws JSchException {
+        if (channelSftp == null) {
+            channelSftp = (ChannelSftp) session.openChannel("sftp");
+        }
+
+        if (!channelSftp.isConnected()) {
+            channelSftp.connect();
+        }
+    }
+
+    public void disconnect() throws IOException {
+        if (fileOutputStream != null) {
+            fileOutputStream.close();
+            fileOutputStream = null;
+        }
+        if (fileInputStream != null) {
+            fileInputStream.close();
+            fileInputStream = null;
+        }
+        if (outputStream != null) {
+            outputStream.close();
+            outputStream = null;
+        }
+        if (channel != null) {
+            channel.disconnect();
+            channel = null;
+        }
+        if (channelSftp != null) {
+            channelSftp.quit();
+        }
+        if (session != null) {
+            session.disconnect();
+            session = null;
+        }
+        jSch = null;
+        log.info("jsch disconnected");
+    }
+
+    /**
+     * 来自源端的每条消息和每个传输完毕的文件都需要宿端的确认和响应.
+     * 宿端会返回三种确认消息: 0(正常), 1(警告)或2(严重错误, 将中断连接).
+     * 消息1和2可以跟一个字符串和一个换行符, 这个字符串将显示在scp的源端. 无论这个字符串是否为空, 换行符都是不可缺少的.
+     */
+    private static int checkAck(InputStream in) throws IOException, SSHUtilException {
+        int b = in.read();
+        // b 取值为0表示成功
+        if (b == 0) {
+            return b;
+        }
+        if (b == -1) {
+            return b;
+        }
+
+        // 1表示警告 2表示严重错误,将中断连接
+        // 1和2 后面会携带一条错误信息,以\n结尾
+        if (b == 1 || b == 2) {
+            // 打印消息后面跟的字符串
+            StringBuilder sb = new StringBuilder();
+            int c;
+            do {
+                // 读取字符串直到遇到换行符
+                c = in.read();
+                sb.append((char) c);
+            } while (c != '\n');
+            log.info("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            XxlJobHelper.log("checkAck发现错误消息: ack={}-msg={}", b, sb);
+            if (b == 1 && sb.toString().endsWith("No such file or directory")) {
+                throw new NoSuchFileException(sb.toString());
+            } else {
+                throw new ScpAckErrorException(sb.toString());
+            }
+        }
+        return b;
+    }
+}

+ 15 - 0
src/main/java/com/nokia/common/ssh/entity/SSHServer.java

@@ -0,0 +1,15 @@
+package com.nokia.common.ssh.entity;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class SSHServer {
+    private String host;
+    private int port = 22;
+    private String user;
+    private String password;
+}

+ 35 - 0
src/main/java/com/nokia/common/ssh/entity/UserInfoImpl.java

@@ -0,0 +1,35 @@
+package com.nokia.common.ssh.entity;
+
+import com.jcraft.jsch.UserInfo;
+
+public class UserInfoImpl implements UserInfo {
+    @Override
+    public String getPassphrase() {
+        return null;
+    }
+
+    @Override
+    public String getPassword() {
+        return null;
+    }
+
+    @Override
+    public boolean promptPassword(String s) {
+        return false;
+    }
+
+    @Override
+    public boolean promptPassphrase(String s) {
+        return false;
+    }
+
+    @Override
+    public boolean promptYesNo(String s) {
+        return false;
+    }
+
+    @Override
+    public void showMessage(String s) {
+
+    }
+}

+ 8 - 0
src/main/java/com/nokia/common/ssh/exception/SSHUtilException.java

@@ -0,0 +1,8 @@
+package com.nokia.common.ssh.exception;
+
+public class SSHUtilException extends Exception{
+
+    public SSHUtilException(String message) {
+        super(message);
+    }
+}

+ 7 - 0
src/main/java/com/nokia/common/ssh/exception/ScpAckErrorException.java

@@ -0,0 +1,7 @@
+package com.nokia.common.ssh.exception;
+
+public class ScpAckErrorException extends SSHUtilException {
+    public ScpAckErrorException(String message) {
+        super(message);
+    }
+}

+ 76 - 0
src/main/java/com/nokia/config/XxlJobConfig.java

@@ -0,0 +1,76 @@
+package com.nokia.config;
+
+import com.xxl.job.core.executor.impl.XxlJobSpringExecutor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * xxl-job config
+ *
+ * @author xuxueli 2017-04-28
+ */
+@Slf4j
+@Configuration
+public class XxlJobConfig {
+    @Value("${xxl.job.admin.addresses}")
+    private String adminAddresses;
+
+    @Value("${xxl.job.accessToken}")
+    private String accessToken;
+
+    @Value("${xxl.job.executor.appname}")
+    private String appname;
+
+    @Value("${xxl.job.executor.address}")
+    private String address;
+
+    @Value("${xxl.job.executor.ip}")
+    private String ip;
+
+    @Value("${xxl.job.executor.port}")
+    private int port;
+
+    @Value("${xxl.job.executor.logpath}")
+    private String logPath;
+
+    @Value("${xxl.job.executor.logretentiondays}")
+    private int logRetentionDays;
+
+
+    @Bean
+    public XxlJobSpringExecutor xxlJobExecutor() {
+        log.info(">>>>>>>>>>> xxl-job config init.");
+        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
+        xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
+        xxlJobSpringExecutor.setAppname(appname);
+        xxlJobSpringExecutor.setAddress(address);
+        xxlJobSpringExecutor.setIp(ip);
+        xxlJobSpringExecutor.setPort(port);
+        xxlJobSpringExecutor.setAccessToken(accessToken);
+        xxlJobSpringExecutor.setLogPath(logPath);
+        xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
+
+        return xxlJobSpringExecutor;
+    }
+
+    /**
+     * 针对多网卡、容器内部署等情况,可借助 "spring-cloud-commons" 提供的 "InetUtils" 组件灵活定制注册IP;
+     *
+     *      1、引入依赖:
+     *          <dependency>
+     *             <groupId>org.springframework.cloud</groupId>
+     *             <artifactId>spring-cloud-commons</artifactId>
+     *             <version>${version}</version>
+     *         </dependency>
+     *
+     *      2、配置文件,或者容器启动变量
+     *          spring.cloud.inetutils.preferred-networks: 'xxx.xxx.xxx.'
+     *
+     *      3、获取IP
+     *          String ip_ = inetUtils.findFirstNonLoopbackHostInfo().getIpAddress();
+     */
+
+
+}

+ 279 - 0
src/main/java/com/nokia/task/SyncTask.java

@@ -0,0 +1,279 @@
+package com.nokia.task;
+
+import com.alibaba.fastjson2.JSON;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.SftpException;
+import com.nokia.common.exception.MyRuntimeException;
+import com.nokia.common.gpload.GploadUtil;
+import com.nokia.common.gpload.entity.GploadResult;
+import com.nokia.common.ssh.SSHUtil;
+import com.xxl.job.core.context.XxlJobHelper;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.csv.CSVFormat;
+import org.apache.commons.csv.CSVParser;
+import org.apache.commons.csv.CSVPrinter;
+import org.apache.commons.csv.CSVRecord;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Data
+@Component
+@Slf4j
+@ConfigurationProperties("task")
+public class SyncTask {
+
+    /**
+     * sftp ip
+     */
+    private String host;
+    /**
+     * sftp端口
+     */
+    private Integer port;
+    /**
+     * sftp账号
+     */
+    private String username;
+    /**
+     * sftp密码
+     */
+    private String password;
+    /**
+     * top用户数据文件夹
+     */
+    private String sourceDir;
+    /**
+     * 下载文件夹
+     */
+    private String downloadDir;
+    /**
+     * 去重文件名
+     */
+    private String distinctFilename;
+    private SSHUtil sshUtil;
+    private final JdbcTemplate jdbcTemplate;
+
+    public SyncTask(JdbcTemplate jdbcTemplate) {
+        this.jdbcTemplate = jdbcTemplate;
+    }
+
+    /**
+     * 同步top用户信息定时任务
+     *
+     */
+    @XxlJob("syncTopUser")
+    public void syncTopUser() {
+        try {
+            // 创建文件夹
+            Files.createDirectories(Paths.get(downloadDir));
+            sshUtil = new SSHUtil(host, port, username, password);
+            // 获取文件列表
+            List<String> list = sshUtil.ls(sourceDir);
+            if (CollectionUtils.isEmpty(list)) {
+                log.error("没有文件");
+                XxlJobHelper.log("没有文件");
+                XxlJobHelper.handleFail("没有文件");
+                return;
+            }
+
+            String filename = list.get(list.size() - 1);
+            log.info("同步的文件: {}", filename);
+            XxlJobHelper.log("同步的文件: {}", filename);
+            singleTask(filename);
+        } catch (Exception e) {
+            log.error("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+            XxlJobHelper.handleFail(e.getMessage());
+        } finally {
+            try {
+                sshUtil.disconnect();
+            } catch (IOException e) {
+                log.error("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.log("发生异常了: {}", e.getMessage(), e);
+                XxlJobHelper.handleFail(e.getMessage());
+            }
+        }
+    }
+
+    /**
+     * 单一任务
+     *
+     * @param filename 文件名
+     */
+    public void singleTask(String filename) throws JSchException, IOException, SftpException {
+        download(filename);
+        distinct(filename);
+        gpload();
+    }
+
+    /**
+     * 下载文件
+     *
+     * @param filename 文件名
+     */
+    public void download(String filename) throws JSchException, IOException, SftpException {
+        log.info("下载文件: {}", filename);
+        XxlJobHelper.log("下载文件: {}", filename);
+        String src = sourceDir + filename;
+        String dst = downloadDir + filename;
+        sshUtil.get(src, dst);
+    }
+
+    /**
+     * 去重
+     *
+     * @param filename 文件名
+     */
+    public void distinct(String filename) throws IOException {
+        log.info("文件 {} 去重", filename);
+        XxlJobHelper.log("文件 {} 去重", filename);
+        String inputFilePath = downloadDir + filename;
+        Path inputPath = Paths.get(inputFilePath);
+        List<Object> modifiedUsers = new ArrayList<>();
+        try (CSVParser parser = CSVFormat.DEFAULT.builder().build()
+                .parse(new InputStreamReader(Files.newInputStream(inputPath), "gbk"));
+             OutputStreamWriter osw = new OutputStreamWriter(Files.newOutputStream(Paths.get(distinctFilename)),
+                     StandardCharsets.UTF_8);
+             CSVPrinter printer = new CSVPrinter(osw, CSVFormat.DEFAULT)) {
+//            printer.printRecord("login_name","org_id","org_name","user_id","user_name","phone","employee_code","province_id","city_id","area_id", "province_name", "city_name", "area_name");
+            Map<String, CSVRecord> csvRecordMap = new HashMap<>();
+            // 按loginName去重
+            for (CSVRecord t : parser) {
+                if (t.getRecordNumber() == 1) {
+                    continue;
+                }
+                csvRecordMap.put(t.get(4), t);
+            }
+            log.info("{} 条数据", csvRecordMap.size());
+            XxlJobHelper.log("{} 条数据", csvRecordMap.size());
+            if (CollectionUtils.isEmpty(csvRecordMap)) {
+                throw new MyRuntimeException("数据为空");
+            }
+            // 查询所有用户,过滤已删除和测试用户
+            String sql = "select user_id, login_name, city_id from sqmdb_rpt.acl_user"
+                    + " where deleted = 0 and test_user = 0 order by user_id";
+            List<Map<String, Object>> allUsers = jdbcTemplate.queryForList(sql);
+            Map<Object, Map<String, Object>> userMap = new HashMap<>();
+            allUsers.forEach(t -> userMap.put(t.get("login_name"), t));
+            // 查询所有地区
+            sql = "select * from sqmdb_rpt.acl_area order by type_code, area_id";
+            List<Map<String, Object>> areas = jdbcTemplate.queryForList(sql);
+            Map<String, Map<String, Object>> cityMap = new HashMap<>();
+            Map<Object, List<Map<String, Object>>> areaMap = new HashMap<>();
+            for (Map<String, Object> t : areas) {
+                if (t.get("type_code").equals(2)) {
+                    cityMap.put((String) t.get("area_name"), t);
+                    areaMap.putIfAbsent(t.get("area_id"), new ArrayList<>());
+                    for (Map<String, Object> tt : areas) {
+                        if (tt.get("type_code").equals(3) && tt.get("parent_id").equals(t.get("area_id"))) {
+                            areaMap.get(t.get("area_id")).add(tt);
+                        }
+                    }
+                }
+            }
+            for (CSVRecord t : csvRecordMap.values()) {
+                String orgId = t.get(0);
+                String orgName = t.get(1);
+                String userId = t.get(2);
+                String userName = t.get(3);
+                String loginName = t.get(4);
+                String phone = t.get(5);
+                String employeeCode = t.get(6);
+                Integer provinceId = -1;
+                Object cityId = -1;
+                Object areaId = null;
+//                String provinceName = "河北省";
+//                Object cityName = null;
+//                Object areaName = null;
+                for (Map.Entry<String, Map<String, Object>> tt : cityMap.entrySet()) {
+                    // 匹配地市
+                    if (orgName.contains(tt.getKey())) {
+                        Map<String, Object> area = tt.getValue();
+                        cityId = area.get("area_id");
+//                        cityName = area.get("area_name");
+                        List<Map<String, Object>> areaList = areaMap.get(area.get("area_id"));
+                        for (Map<String, Object> ttt : areaList) {
+                            // 匹配区县
+                            if (orgName.contains((CharSequence) ttt.get("area_name"))) {
+                                areaId = ttt.get("area_id");
+//                                areaName = ttt.get("area_name");
+                                break;
+                            }
+                        }
+                        break;
+                    }
+                }
+//                printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId, cityId, areaId, provinceName, cityName, areaName);
+                printer.printRecord(loginName, orgId, orgName, userId, userName, phone, employeeCode, provinceId,
+                        cityId, areaId);
+                // 检查用户信息变化
+                Map<String, Object> user = userMap.get(loginName);
+                if (user != null && !cityId.equals(user.get("city_id"))) {
+                    modifiedUsers.add(user.get("user_id"));
+                    log.debug("用户 {} 地市变化: {} -> {}", user.get("user_id"), user.get("city_id"), cityId);
+                }
+            }
+            log.info("文件 {} 去重完成", filename);
+            XxlJobHelper.log("文件 {} 去重完成", filename);
+            // 删除本地源文件
+            Files.deleteIfExists(inputPath);
+            log.info("删除本地源文件 {}", filename);
+            XxlJobHelper.log("删除本地源文件 {}", filename);
+            // 检查删除用户
+            sql = "select user_id from sqmdb_rpt.acl_user"
+                    + " where deleted = 0 and test_user = 0 and login_name not in (:loginNames)"
+                    + " order by user_id";
+            Map<String, Object> paramMap = new HashMap<>();
+            paramMap.put("loginNames", csvRecordMap.keySet());
+            NamedParameterJdbcTemplate namedParameterJdbcTemplate = new NamedParameterJdbcTemplate(jdbcTemplate);
+            List<Integer> deletedUsers = namedParameterJdbcTemplate.queryForList(sql, paramMap, Integer.class);
+            if (!CollectionUtils.isEmpty(deletedUsers)) {
+                log.error("已删除用户: {}", JSON.toJSONString(deletedUsers));
+            }
+            // 检查权限地市
+            sql = "select distinct au.user_id"
+                    + " from sqmdb_rpt.acl_user au"
+                    + " inner join sqmdb_rpt.acl_user_role_city aurc on au.user_id = aurc.user_id"
+                    + " where au.city_id != aurc.city_id"
+                    + " and aurc.role_id != 3"
+                    + " order by au.user_id";
+            List<Integer> diffUsers = jdbcTemplate.queryForList(sql, Integer.class);
+            if (!CollectionUtils.isEmpty(diffUsers)) {
+                log.error("权限地市不一致用户: {}", JSON.toJSONString(diffUsers));
+            }
+            if (!CollectionUtils.isEmpty(modifiedUsers)) {
+                log.error("地市变化用户: {}", JSON.toJSONString(modifiedUsers));
+            }
+        }
+    }
+
+    public void gpload() throws IOException {
+        String gploadCommand = "sh gpload/gpload.sh";
+        GploadResult gpload = GploadUtil.gpload(gploadCommand);
+        if (Boolean.TRUE.equals(gpload.getTaskStatus())) {
+            log.info("gpload完成: {}", gpload);
+            XxlJobHelper.log("gpload完成: {}", gpload);
+            // 删除文件
+            Files.deleteIfExists(Paths.get(distinctFilename));
+        } else {
+            throw new MyRuntimeException("gpload失败: " + gpload.getMessage());
+        }
+    }
+}

+ 34 - 0
src/main/resources/application-product.properties

@@ -0,0 +1,34 @@
+# web服务端口号
+server.port=12129
+
+# log配置
+logging.level.root=info
+#logging.level.com.nokia=debug
+
+# 正式环境数据源GP数据库配置
+spring.datasource.driver-class-name=org.postgresql.Driver
+spring.datasource.url=jdbc:postgresql://192.168.70.109:5432/sqmmt
+spring.datasource.username=sqmdb
+spring.datasource.password=sqmdb_1QAZ
+task.host=192.168.70.130
+task.port=22
+task.username=do
+task.password=Richr00t
+task.sourceDir=/data1/esbftp/top_user/
+task.downloadDir=download/
+task.distinctFilename=top-user.csv
+### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
+xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+### xxl-job, access token
+xxl.job.accessToken=
+### xxl-job executor appname
+xxl.job.executor.appname=acl-tousu-prod
+### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
+xxl.job.executor.address=
+### xxl-job executor server-info
+xxl.job.executor.ip=
+xxl.job.executor.port=9998
+### xxl-job executor log-path
+xxl.job.executor.logpath=./log/xxl/
+### xxl-job executor log-retention-days
+xxl.job.executor.logretentiondays=30

+ 32 - 0
src/main/resources/application-test.properties

@@ -0,0 +1,32 @@
+# web服务端口号
+server.port=12129
+# log配置
+logging.level.root=info
+logging.level.com.nokia=debug
+# 测试环境数据库配置
+spring.datasource.driver-class-name=org.postgresql.Driver
+spring.datasource.url=jdbc:postgresql://192.168.50.5:5432/sqmmt
+spring.datasource.username=sqmdb
+spring.datasource.password=sqmdb_1QAZ
+task.host=192.168.70.130
+task.port=22
+task.username=do
+task.password=Richr00t
+task.sourceDir=/data1/esbftp/top_user/
+task.downloadDir=download/
+task.distinctFilename=top-user.csv
+### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
+xxl.job.admin.addresses=http://192.168.10.7:8087/xxl-job-admin
+### xxl-job, access token
+xxl.job.accessToken=
+### xxl-job executor appname
+xxl.job.executor.appname=acl-tousu-test
+### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
+xxl.job.executor.address=
+### xxl-job executor server-info
+xxl.job.executor.ip=
+xxl.job.executor.port=9998
+### xxl-job executor log-path
+xxl.job.executor.logpath=./log/xxl/
+### xxl-job executor log-retention-days
+xxl.job.executor.logretentiondays=30

+ 2 - 0
src/main/resources/application.properties

@@ -0,0 +1,2 @@
+# 启用的配置
+spring.profiles.active=test

+ 54 - 0
src/main/resources/logback-spring.xml

@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+    <property name="PATH" value="./log"/>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <Pattern>%d %highlight(%-5level) %magenta([%thread]) %cyan(%logger:%line) %msg%n</Pattern>
+            <charset>UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="TRACE_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${PATH}/trace.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <!-- rollover daily -->
+            <fileNamePattern>${PATH}/trace.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <!-- each file should be at most 10MB, keep 60 days worth of history, but at most 20GB -->
+            <maxFileSize>10MB</maxFileSize>
+            <maxHistory>60</maxHistory>
+            <totalSizeCap>20GB</totalSizeCap>
+        </rollingPolicy>
+        <encoder>
+            <Pattern>%d %highlight(%-5level) %magenta([%thread]) %cyan(%logger:%line) %msg%n</Pattern>
+            <charset>UTF-8</charset>
+        </encoder>
+    </appender>
+    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${PATH}/error.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+            <!-- rollover daily -->
+            <fileNamePattern>${PATH}/error.%d{yyyy-MM-dd}.%i.log</fileNamePattern>
+            <!-- each file should be at most 10MB, keep 60 days worth of history, but at most 20GB -->
+            <maxFileSize>10MB</maxFileSize>
+            <maxHistory>60</maxHistory>
+            <totalSizeCap>20GB</totalSizeCap>
+        </rollingPolicy>
+        <encoder>
+            <Pattern>%d %highlight(%-5level) %magenta([%thread]) %cyan(%logger:%line) %msg%n</Pattern>
+            <charset>UTF-8</charset>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>ERROR</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+    </appender>
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+    </root>
+    <root level="TRACE">
+        <appender-ref ref="TRACE_FILE"/>
+    </root>
+    <root level="ERROR">
+        <appender-ref ref="ERROR_FILE"/>
+    </root>
+</configuration>

+ 21 - 0
src/test/java/com/nokia/acl_tousu_task/AclTousuTaskApplicationTests.java

@@ -0,0 +1,21 @@
+package com.nokia.acl_tousu_task;
+
+import com.nokia.task.SyncTask;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.ActiveProfiles;
+
+@ActiveProfiles("test")
+@Slf4j
+@SpringBootTest
+class AclTousuTaskApplicationTests {
+    @Autowired
+    private SyncTask syncTask;
+
+    @Test
+    void testSync() {
+        syncTask.syncTopUser();
+    }
+}