house_site.py 15 KB


  1. """不动产局址数据处理
  2. """
  3. import re
  4. from datetime import datetime
  5. from dateutil.relativedelta import relativedelta
  6. from loguru import logger
  7. import pandas as pd
  8. import psycopg
  9. import subprocess
  10. import paramiko
  11. # 配置日志记录器,将日志写入文件a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/house/site/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. # 定义全局字典变量,用于存储组织和区域的映射关系
  36. org_map = {}
  37. third_org_list_map = {}
  38. area_map = {}
  39. districts_list_map = {}
  40. # 使用 psycopg 连接 PostgreSQL 数据库
  41. with psycopg.connect(
  42. conninfo=conn_info,
  43. row_factory=psycopg.rows.dict_row # 使用字典格式返回查询结果
  44. ) as conn:
  45. with conn.cursor() as curs:
  46. # 查询 grade=1 的二级组织信息
  47. sql = """
  48. select * from common.organization where grade = 1
  49. """
  50. logger.info(f"sql: {sql}")
  51. curs.execute(sql)
  52. second_orgs = curs.fetchall()
  53. # 初始化 third_org_list_map,key 为二级组织的 id,value 为空列表
  54. for x in second_orgs:
  55. third_org_list_map[x['id']] = []
  56. # 查询所有组织信息
  57. sql = """
  58. select * from common.organization
  59. """
  60. logger.info(f"sql: {sql}")
  61. curs.execute(sql)
  62. orgs = curs.fetchall()
  63. # 将组织信息填充到 third_org_list_map 和 org_map 中
  64. for x in orgs:
  65. if x['parent_id'] in third_org_list_map:
  66. third_org_list_map[x['parent_id']].append(x)
  67. org_map[x['id']] = x
  68. # 查询 area_grade=1 的一级区域信息
  69. sql = """
  70. select * from common.area where area_grade = 1 order by area_id
  71. """
  72. logger.info(f"sql: {sql}")
  73. curs.execute(sql)
  74. cities = curs.fetchall()
  75. # 初始化 districts_list_map,key 为一级区域的 area_id,value 为空列表
  76. for x in cities:
  77. districts_list_map[x['area_id']] = []
  78. # 查询所有区域信息
  79. sql = """
  80. select * from common.area
  81. """
  82. logger.info(f"sql: {sql}")
  83. curs.execute(sql)
  84. areas = curs.fetchall()
  85. # 将区域信息填充到 districts_list_map 和 area_map 中
  86. for x in areas:
  87. if x['parent_id'] in districts_list_map:
  88. districts_list_map[x['parent_id']].append(x)
  89. area_map[x['area_id']] = x
  90. # 读取 Excel 文件数据
  91. df = pd.read_excel(io=input_path)
  92. # 删除字符串类型的列中的空白字符
  93. df = df.map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
  94. # 删除重复行,基于 '局址ID' 列,保留最后一行
  95. df.drop_duplicates(subset=['局址ID'], keep='last', inplace=True)
  96. def get_area_no(x):
  97. """
  98. 获取二级组织机构编码
  99. """
  100. second_unit = x['资产所属单位(二级)']
  101. third_unit = x['资产所属单位(三级)']
  102. if '河北' == second_unit:
  103. return '-12'
  104. if '长途通信传输局' == second_unit:
  105. return '-11'
  106. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  107. return '782'
  108. for second_org in second_orgs:
  109. area_name = second_org['name']
  110. area_no = second_org['id']
  111. if area_name in second_unit:
  112. return area_no
  113. raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}')
  114. df['二级组织机构编码'] = df.apply(get_area_no, axis=1)
  115. def get_area_name(x):
  116. """
  117. 根据二级组织机构编码获取二级组织机构名称
  118. """
  119. area_no = x['二级组织机构编码']
  120. second_org = org_map[area_no]
  121. area_name = second_org['name']
  122. return area_name
  123. df['二级组织机构名称'] = df.apply(get_area_name, axis=1)
  124. def get_city_no(x):
  125. """
  126. 获取三级组织机构编码
  127. """
  128. third_unit = x['资产所属单位(三级)']
  129. area_name = x['二级组织机构名称']
  130. area_no = x['二级组织机构编码']
  131. if area_name == '石家庄':
  132. if '矿区' in third_unit:
  133. return 'D0130185'
  134. if '井陉' in third_unit:
  135. return 'D0130121'
  136. if area_name == '秦皇岛':
  137. if '北戴河新区' in third_unit:
  138. return 'D0130185'
  139. if '北戴河' in third_unit:
  140. return 'D0130304'
  141. if area_name == '唐山':
  142. if '滦县' in third_unit:
  143. return 'D0130223'
  144. if '高新技术开发区' in third_unit:
  145. return 'D0130205'
  146. if area_name == '邢台':
  147. if '内丘' in third_unit:
  148. return 'D0130523'
  149. if '任泽' in third_unit:
  150. return 'D0130526'
  151. if area_name == '邯郸':
  152. if '峰峰' in third_unit:
  153. return 'D0130406'
  154. if area_name == '省机动局':
  155. if '沧州' in third_unit:
  156. return 'HECS180'
  157. if '唐山' in third_unit:
  158. return 'HECS181'
  159. if '秦皇岛' in third_unit:
  160. return 'HECS182'
  161. if '廊坊' in third_unit:
  162. return 'HECS183'
  163. if '张家口' in third_unit:
  164. return 'HECS184'
  165. if '邢台' in third_unit:
  166. return 'HECS185'
  167. if '邯郸' in third_unit:
  168. return 'HECS186'
  169. if '保定' in third_unit:
  170. return 'HECS187'
  171. if '石家庄' in third_unit:
  172. return 'HECS188'
  173. if '承德' in third_unit:
  174. return 'HECS189'
  175. if '衡水' in third_unit:
  176. return 'HECS720'
  177. if '雄安' in third_unit:
  178. return 'HECS728'
  179. return 'HECS018'
  180. if '雄安' == area_name:
  181. third_unit = third_unit.replace('雄安新区', '')
  182. third_org_list = third_org_list_map[area_no]
  183. for third_org in third_org_list:
  184. city_name = third_org['name']
  185. if city_name in third_unit:
  186. return third_org['id']
  187. if '沧州' == area_name:
  188. return 'D0130911'
  189. if '唐山' == area_name:
  190. return 'D0130202'
  191. if '秦皇岛' == area_name:
  192. return 'D0130302'
  193. if '廊坊' == area_name:
  194. return 'D0131000'
  195. if '张家口' == area_name:
  196. return 'D0130701'
  197. if '邢台' == area_name:
  198. return 'D0130502'
  199. if '邯郸' == area_name:
  200. return 'D0130402'
  201. if '保定' == area_name:
  202. return 'D0130601'
  203. if '石家庄' == area_name:
  204. return 'D0130186'
  205. if '承德' == area_name:
  206. return 'D0130801'
  207. if '衡水' == area_name:
  208. return 'D0133001'
  209. if '雄安' == area_name:
  210. return 'D0130830'
  211. return 'HE001'
  212. df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
  213. def get_city_name(x):
  214. """
  215. 根据三级组织机构编码获取三级组织机构名称
  216. """
  217. city_no = x['三级组织机构编码']
  218. third_org = org_map[city_no]
  219. city_name = third_org['name']
  220. return city_name
  221. df['三级组织机构名称'] = df.apply(get_city_name, axis=1)
  222. def get_city_id(x):
  223. """
  224. 根据标准地址、资产所属单位(二级、三级)获取城市 ID
  225. """
  226. address = x['标准地址']
  227. second_unit = x['资产所属单位(二级)']
  228. third_unit = x['资产所属单位(三级)']
  229. if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
  230. return '133100'
  231. for city in cities:
  232. area_name = city['short_name']
  233. area_id = city['area_id']
  234. if area_name in second_unit:
  235. return area_id
  236. if area_name in third_unit:
  237. return area_id
  238. if area_name in address:
  239. return area_id
  240. return ''
  241. df['city_id'] = df.apply(get_city_id, axis=1)
  242. def get_city(x):
  243. """
  244. 根据城市 ID 获取城市名称
  245. """
  246. city_id = x['city_id']
  247. area = area_map.get(city_id)
  248. if pd.notna(area):
  249. city = area['area_name']
  250. return city
  251. return ''
  252. df['city'] = df.apply(get_city, axis=1)
  253. def get_district_id(x):
  254. """
  255. 根据标准地址、城市名称和城市 ID 获取区县 ID
  256. """
  257. address = x['标准地址']
  258. city = x['city']
  259. city_id = x['city_id']
  260. if pd.isna(city) or pd.isna(address):
  261. return ''
  262. if city == '石家庄':
  263. if '矿区' in address:
  264. return '130107'
  265. if '井陉' in address:
  266. return '130121'
  267. if city == '唐山':
  268. if '滦县' in address:
  269. return '130284'
  270. if city == '邢台':
  271. if '内邱' in address:
  272. return '130523'
  273. if '任县' in address:
  274. return '130505'
  275. if city == '雄安':
  276. address = address.replace('雄安新区', '')
  277. districts = districts_list_map.get(city_id)
  278. if not districts:
  279. return ''
  280. for district in districts:
  281. district_name = district['short_name']
  282. if district_name in address:
  283. return district['area_id']
  284. return ''
  285. df['district_id'] = df.apply(get_district_id, axis=1)
  286. def get_district(x):
  287. """
  288. 根据区县 ID 获取区县名称
  289. """
  290. district_id = x['district_id']
  291. area = area_map.get(district_id)
  292. if pd.notna(area):
  293. district = area['area_name']
  294. return district
  295. return ''
  296. df['district'] = df.apply(get_district, axis=1)
  297. # 在 DataFrame 中插入 '年月' 列
  298. df.insert(0, '年月', year_month)
  299. # 打印 DataFrame 的基本信息
  300. print(df.info())
  301. # 将处理后的数据保存为 CSV 文件
  302. df.to_csv(path_or_buf=output_path,
  303. index=False,
  304. header=['year_month', 'site_id', 'first_unit', 'second_unit', 'third_unit', 'site_num', 'site_name',
  305. 'address', 'city_level', 'city_region', 'area_sector', 'has_land', 'area_no', 'area_name', 'city_no',
  306. 'city_name', 'city_id', 'city', 'district_id', 'district'],
  307. encoding='utf-8-sig')
  308. def data_import():
  309. # 定义 PowerShell 脚本的路径
  310. script_path = r"../../copy.ps1"
  311. # 目标表和文件信息
  312. table = "house.site_month" # 数据库目标表名
  313. # 表字段列名,用于指定导入数据的列顺序
  314. columns = "year_month,site_id,first_unit,second_unit,third_unit,site_num,site_name,address,city_level,city_region,area_sector,has_land,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
  315. # 构造执行 PowerShell 脚本的命令
  316. command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  317. # 打印生成的命令,方便调试和日志记录
  318. logger.info("command: {}", command)
  319. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  320. completed_process = subprocess.run(
  321. command, # 执行的命令
  322. check=False, # 如果命令执行失败,不抛出异常
  323. text=True, # 将输出作为字符串处理
  324. capture_output=True, # 捕获标准输出和标准错误
  325. )
  326. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  327. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  328. completed_process.stderr)
  329. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  330. p = re.compile(r"^(COPY) (\d+)$")
  331. count = None # 初始化计数变量
  332. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  333. if matcher:
  334. count = int(matcher.group(2)) # 提取导入的数据行数
  335. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  336. if count is None:
  337. raise RuntimeError("导入数据失败")
  338. def upload_file():
  339. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  340. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  341. with paramiko.SSHClient() as ssh:
  342. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  343. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  344. # 连接到远程主机,传入主机地址、端口、用户名和密码
  345. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  346. # 执行远程命令,创建远程目录(如果不存在)
  347. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  348. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  349. with ssh.open_sftp() as sftp:
  350. # 记录日志,提示即将上传的本地文件和远程目标路径
  351. logger.info("upload {} to {}", input_path, remote_path)
  352. # 使用SFTP的put方法将本地文件上传到远程主机
  353. sftp.put(input_path, remote_path)
  354. # 记录日志,提示文件已成功上传
  355. logger.info("uploaded {}", input_path)
  356. data_process()
  357. data_import()
  358. upload_file()