house_abnormal_data.py 15 KB


  1. """不动产异常数据稽核数据处理
  2. """
  3. import os # 导入os模块,用于处理文件和目录操作
  4. import re # 导入re模块,用于正则表达式操作
  5. from loguru import logger # 导入loguru模块,用于日志记录
  6. import pandas as pd # 导入pandas模块,用于数据处理和分析
  7. import psycopg # 导入psycopg模块,用于连接PostgreSQL数据库
  8. from datetime import datetime # 导入datetime模块,用于日期和时间操作
  9. from dateutil.relativedelta import relativedelta # 导入relativedelta模块,用于日期的相对计算
  10. import xlwings as xw # 导入xlwings模块,用于操作Excel文件
  11. import subprocess
  12. import paramiko
  13. # 配置日志记录器,将日志输出到文件a.log
  14. logger.add(sink='a.log')
  15. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  16. ssh_port = 22 # 定义SSH服务的端口号
  17. ssh_username = 'app' # 定义登录远程主机的用户名
  18. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  19. # 服务器文件夹路径
  20. remote_dir_path = '/data/history/house/abnormal-data/'
  21. # 数据库连接信息
  22. db_host = "172.16.107.5" # 数据库主机地址
  23. db_port = 5432 # 数据库端口号
  24. db_username = "finance" # 数据库用户名
  25. db_password = "Finance@unicom23" # 数据库密码
  26. dbname = "financialdb" # 数据库名称
  27. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  28. # 创建一个不可见的Excel应用程序实例
  29. app = xw.App(visible=False)
  30. # 获取当前日期和时间
  31. today = datetime.today()
  32. # 计算上个月的第一天作为起始日期
  33. start_date = today - relativedelta(months=1, day=1)
  34. # 将起始日期格式化为YYYYMM的形式
  35. year_month = start_date.strftime('%Y%m')
  36. # 定义数据输入和输出目录
  37. input_dir = 'data/'
  38. output_dir = 'output/'
  39. # 输出文件路径
  40. output_path = 'output.csv'
  41. remote_dir_path = f"{remote_dir_path}{year_month}/"
  42. def xls_to_xlsx():
  43. # 如果输出目录不存在,则创建该目录
  44. if not os.path.exists(output_dir):
  45. os.makedirs(output_dir)
  46. # 遍历数据目录中的所有文件
  47. for filename in os.listdir(input_dir):
  48. # 只处理以.xls结尾的文件
  49. if filename.endswith(".xls"):
  50. file_path = os.path.join(input_dir, filename) # 构造文件路径
  51. workbook = app.books.open(file_path) # 打开Excel文件
  52. pattern = r'\D+(\d+)' # 定义正则表达式,用于匹配非数字部分
  53. # 使用正则表达式替换文件名,并将扩展名改为.xlsx
  54. new_file_name = re.sub(pattern, year_month + '_', filename).replace(".xls", ".xlsx")
  55. # 保存修改后的文件到输出目录
  56. workbook.save(os.path.join(output_dir, new_file_name))
  57. logger.info(f"{filename} -> {new_file_name}") # 记录日志
  58. workbook.close() # 关闭工作簿
  59. def data_process():
  60. # 初始化组织映射和三级组织列表映射
  61. org_map = {}
  62. third_org_list_map = {}
  63. # 连接PostgreSQL数据库
  64. with psycopg.connect(
  65. conninfo=conn_info,
  66. row_factory=psycopg.rows.dict_row # 使用字典格式返回查询结果
  67. ) as conn:
  68. with conn.cursor() as curs:
  69. # 查询一级组织信息
  70. sql = """
  71. select * from common.organization where grade = 1
  72. """
  73. logger.info(f"sql: {sql}") # 记录SQL语句
  74. curs.execute(sql)
  75. second_orgs = curs.fetchall()
  76. # 初始化三级组织列表映射
  77. for x in second_orgs:
  78. third_org_list_map[x['id']] = []
  79. # 查询所有组织信息
  80. sql = """
  81. select * from common.organization
  82. """
  83. logger.info(f"sql: {sql}") # 记录SQL语句
  84. curs.execute(sql)
  85. orgs = curs.fetchall()
  86. # 构建组织映射和三级组织列表映射
  87. for x in orgs:
  88. if x['parent_id'] in third_org_list_map:
  89. third_org_list_map[x['parent_id']].append(x)
  90. org_map[x['id']] = x
  91. files = os.listdir(output_dir)
  92. file_list = list(filter(lambda x: x.endswith('.xlsx'), files)) # 筛选出以.xlsx结尾的文件
  93. logger.info('file_list: {}', file_list) # 记录文件列表
  94. # 初始化数据列表
  95. data_list = []
  96. # 遍历文件列表,读取Excel文件内容并进行处理
  97. for t in file_list:
  98. logger.info(f'{t}') # 记录当前文件名
  99. ny = t.replace('.xlsx', '').split('_')[0] # 提取年月信息
  100. ds = t.replace('.xlsx', '').split('_')[1] # 提取地市信息
  101. tmp = pd.read_excel(output_dir + t, skiprows=8, header=None) # 读取Excel文件内容
  102. if '省本部' in ds or '省公司' in ds:
  103. tmp = pd.read_excel(output_dir + t, skiprows=8, header=None, nrows=1) # 特殊处理省本部文件
  104. tmp['年月'] = ny # 添加年月列
  105. tmp['地市'] = ds # 添加地市列
  106. tmp['source'] = t # 添加来源文件名列
  107. data_list.append(tmp) # 将数据添加到数据列表中
  108. # 合并所有数据到一个DataFrame中
  109. df = pd.concat(data_list)
  110. # 去除字符串中的多余空格
  111. df = df.map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
  112. # 定义DataFrame列名
  113. df.columns = ['third_unit', 'wei_guan_lian_tu_di_zheng', 'wei_guan_lian_fang_chan_zheng', 'wei_guan_lian_ju_zhi',
  114. 'jian_zhu_tu_di_ju_zhi_bu_dui_ying', 'tu_di_ju_zhi_bian_hao_bu_cun_zai',
  115. 'jian_zhu_ju_zhi_bian_hao_bu_cun_zai', 'jian_zhu_tu_di_biao_qian_hao_bu_cun_zai',
  116. 'dai_guan_lian_tu_di', 'dai_guan_lian_jian_zhu', 'ju_zhi_gte_three_tu_di', 'ju_zhi_gt_ten_tu_di',
  117. 'ju_zhi_gte_five_jian_zhu', 'ju_zhi_gte_ten_jian_zhu', 'tu_di_gte_seven_jian_zhu',
  118. 'tu_di_gte_ten_jian_zhu', 'tu_di_zheng_xia_ci_kong_bai', 'fang_chan_zheng_xia_ci_kong_bai',
  119. 'ju_zhi_di_duan_kong_bai', 'xian_zhi_jian_zhu_mian_ji', 'xian_zhi_tu_di_mian_ji',
  120. 'wu_jian_zhu_fei_xian_zhi_tu_di_mian_ji', 'tu_di_shi_yong_quan_qi_ta', 'year_month', 'second_unit',
  121. 'source']
  122. # 定义函数,根据二级单位和三级单位获取区域编号
  123. def get_area_no(x):
  124. second_unit = x['second_unit']
  125. third_unit = x['third_unit']
  126. if '长途局' in second_unit or '长途通信传输局' in second_unit or '机动局' in second_unit or '传输局' in second_unit:
  127. return '-11'
  128. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  129. return '782'
  130. for second_org in second_orgs:
  131. area_name = second_org['name']
  132. area_no = second_org['id']
  133. if area_name in second_unit:
  134. return area_no
  135. return '-12'
  136. # 应用get_area_no函数,生成area_no列
  137. df['area_no'] = df.apply(get_area_no, axis=1)
  138. # 定义函数,根据区域编号获取区域名称
  139. def get_area_name(x):
  140. area_no = x['area_no']
  141. second_org = org_map[area_no]
  142. area_name = second_org['name']
  143. return area_name
  144. # 应用get_area_name函数,生成area_name列
  145. df['area_name'] = df.apply(get_area_name, axis=1)
  146. # 定义函数,根据三级单位、区域名称和区域编号获取城市编号
  147. def get_city_no(x):
  148. third_unit = x['third_unit']
  149. area_name = x['area_name']
  150. area_no = x['area_no']
  151. if area_name == '石家庄':
  152. if '矿区' in third_unit:
  153. return 'D0130185'
  154. if '井陉' in third_unit:
  155. return 'D0130121'
  156. if area_name == '秦皇岛':
  157. if '北戴河新区' in third_unit:
  158. return 'D0130185'
  159. if '北戴河' in third_unit:
  160. return 'D0130304'
  161. if area_name == '唐山':
  162. if '滦县' in third_unit:
  163. return 'D0130223'
  164. if '高新技术开发区' in third_unit:
  165. return 'D0130205'
  166. if area_name == '邢台':
  167. if '内丘' in third_unit:
  168. return 'D0130523'
  169. if '任泽' in third_unit:
  170. return 'D0130526'
  171. if area_name == '邯郸':
  172. if '峰峰' in third_unit:
  173. return 'D0130406'
  174. if area_name == '省机动局':
  175. if '沧州' in third_unit:
  176. return 'HECS180'
  177. if '唐山' in third_unit:
  178. return 'HECS181'
  179. if '秦皇岛' in third_unit:
  180. return 'HECS182'
  181. if '廊坊' in third_unit:
  182. return 'HECS183'
  183. if '张家口' in third_unit:
  184. return 'HECS184'
  185. if '邢台' in third_unit:
  186. return 'HECS185'
  187. if '邯郸' in third_unit:
  188. return 'HECS186'
  189. if '保定' in third_unit:
  190. return 'HECS187'
  191. if '石家庄' in third_unit:
  192. return 'HECS188'
  193. if '承德' in third_unit:
  194. return 'HECS189'
  195. if '衡水' in third_unit:
  196. return 'HECS720'
  197. if '雄安' in third_unit:
  198. return 'HECS728'
  199. return 'HECS018'
  200. if '雄安' == area_name:
  201. third_unit = third_unit.replace('雄安新区', '')
  202. third_org_list = third_org_list_map[area_no]
  203. for third_org in third_org_list:
  204. city_name = third_org['name']
  205. if city_name in third_unit:
  206. return third_org['id']
  207. if '沧州' == area_name:
  208. return 'D0130911'
  209. if '唐山' == area_name:
  210. return 'D0130202'
  211. if '秦皇岛' == area_name:
  212. return 'D0130302'
  213. if '廊坊' == area_name:
  214. return 'D0131000'
  215. if '张家口' == area_name:
  216. return 'D0130701'
  217. if '邢台' == area_name:
  218. return 'D0130502'
  219. if '邯郸' == area_name:
  220. return 'D0130402'
  221. if '保定' == area_name:
  222. return 'D0130601'
  223. if '石家庄' == area_name:
  224. return 'D0130186'
  225. if '承德' == area_name:
  226. return 'D0130801'
  227. if '衡水' == area_name:
  228. return 'D0133001'
  229. if '雄安' == area_name:
  230. return 'D0130830'
  231. return 'HE001'
  232. # 应用get_city_no函数,生成city_no列
  233. df['city_no'] = df.apply(get_city_no, axis=1)
  234. # 定义函数,根据城市编号获取城市名称
  235. def get_city_name(x):
  236. city_no = x['city_no']
  237. third_org = org_map[city_no]
  238. city_name = third_org['name']
  239. return city_name
  240. # 应用get_city_name函数,生成city_name列
  241. df['city_name'] = df.apply(get_city_name, axis=1)
  242. # 输出DataFrame的基本信息
  243. print(df.info())
  244. # 将处理后的数据保存为CSV文件
  245. df.to_csv(path_or_buf=output_path,
  246. index=False,
  247. encoding='utf-8-sig')
  248. def data_import():
  249. # 定义 PowerShell 脚本的路径
  250. script_path = r"../../copy.ps1"
  251. # 目标表和文件信息
  252. table = "house.abnormal_data" # 数据库目标表名
  253. # 表字段列名,用于指定导入数据的列顺序
  254. columns = "third_unit,wei_guan_lian_tu_di_zheng,wei_guan_lian_fang_chan_zheng,wei_guan_lian_ju_zhi,jian_zhu_tu_di_ju_zhi_bu_dui_ying,tu_di_ju_zhi_bian_hao_bu_cun_zai,jian_zhu_ju_zhi_bian_hao_bu_cun_zai,jian_zhu_tu_di_biao_qian_hao_bu_cun_zai,dai_guan_lian_tu_di,dai_guan_lian_jian_zhu,ju_zhi_gte_three_tu_di,ju_zhi_gt_ten_tu_di,ju_zhi_gte_five_jian_zhu,ju_zhi_gte_ten_jian_zhu,tu_di_gte_seven_jian_zhu,tu_di_gte_ten_jian_zhu,tu_di_zheng_xia_ci_kong_bai,fang_chan_zheng_xia_ci_kong_bai,ju_zhi_di_duan_kong_bai,xian_zhi_jian_zhu_mian_ji,xian_zhi_tu_di_mian_ji,wu_jian_zhu_fei_xian_zhi_tu_di_mian_ji,tu_di_shi_yong_quan_qi_ta,year_month,second_unit,source,area_no,area_name,city_no,city_name"
  255. # 构造执行 PowerShell 脚本的命令
  256. command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  257. # 打印生成的命令,方便调试和日志记录
  258. logger.info("command: {}", command)
  259. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  260. completed_process = subprocess.run(
  261. command, # 执行的命令
  262. check=False, # 如果命令执行失败,不抛出异常
  263. text=True, # 将输出作为字符串处理
  264. capture_output=True, # 捕获标准输出和标准错误
  265. )
  266. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  267. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  268. completed_process.stderr)
  269. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  270. p = re.compile(r"^(COPY) (\d+)$")
  271. count = None # 初始化计数变量
  272. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  273. if matcher:
  274. count = int(matcher.group(2)) # 提取导入的数据行数
  275. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  276. if count is None:
  277. raise RuntimeError("导入数据失败")
  278. def upload_file():
  279. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  280. with paramiko.SSHClient() as ssh:
  281. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  282. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  283. # 连接到远程主机,传入主机地址、端口、用户名和密码
  284. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  285. # 执行远程命令,创建远程目录(如果不存在)
  286. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  287. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  288. with ssh.open_sftp() as sftp:
  289. for filename in os.listdir(input_dir):
  290. local_path = os.path.join(input_dir, filename)
  291. remote_path = f'{remote_dir_path}{filename}'
  292. if os.path.isfile(local_path):
  293. # 记录日志,提示即将上传的本地文件和远程目标路径
  294. logger.info("upload {} to {}", local_path, remote_path)
  295. # 使用SFTP的put方法将本地文件上传到远程主机
  296. sftp.put(local_path, remote_path)
  297. # 记录日志,提示文件已成功上传
  298. logger.info("uploaded {}", local_path)
  299. xls_to_xlsx()
  300. data_process()
  301. data_import()
  302. upload_file()