house_fang_jian.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. """不动产房间数据处理
  2. """
  3. import re
  4. from datetime import datetime
  5. from dateutil.relativedelta import relativedelta
  6. from loguru import logger
  7. import pandas as pd
  8. import psycopg
  9. import subprocess
  10. import paramiko
  11. # 配置日志记录器,将日志写入文件a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/house/room/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. org_map = {} # 用于存储组织机构信息
  36. third_org_list_map = {} # 用于存储三级组织机构列表
  37. area_map = {} # 用于存储区域信息
  38. districts_list_map = {} # 用于存储区县信息
  39. # 连接到 PostgreSQL 数据库
  40. with psycopg.connect(
  41. conninfo=conn_info,
  42. row_factory=psycopg.rows.dict_row # 设置返回结果为字典格式
  43. ) as conn:
  44. with conn.cursor() as curs:
  45. # 查询所有二级组织机构(grade=1)
  46. sql = """
  47. select * from common.organization where grade = 1
  48. """
  49. logger.info(f"sql: {sql}")
  50. curs.execute(sql)
  51. second_orgs = curs.fetchall()
  52. # 初始化 third_org_list_map,以每个二级组织的 ID 为键,值为空列表
  53. for x in second_orgs:
  54. third_org_list_map[x['id']] = []
  55. # 查询所有组织机构信息
  56. sql = """
  57. select * from common.organization
  58. """
  59. logger.info(f"sql: {sql}")
  60. curs.execute(sql)
  61. orgs = curs.fetchall()
  62. # 构建 org_map 和 third_org_list_map
  63. for x in orgs:
  64. if x['parent_id'] in third_org_list_map:
  65. third_org_list_map[x['parent_id']].append(x)
  66. org_map[x['id']] = x
  67. # 查询所有一级行政区划(area_grade=1),并按 area_id 排序
  68. sql = """
  69. select * from common.area where area_grade = 1 order by area_id
  70. """
  71. logger.info(f"sql: {sql}")
  72. curs.execute(sql)
  73. cities = curs.fetchall()
  74. # 初始化 districts_list_map,以每个城市的 area_id 为键,值为空列表
  75. for x in cities:
  76. districts_list_map[x['area_id']] = []
  77. # 查询所有区域信息
  78. sql = """
  79. select * from common.area
  80. """
  81. logger.info(f"sql: {sql}")
  82. curs.execute(sql)
  83. areas = curs.fetchall()
  84. # 构建 area_map 和 districts_list_map
  85. for x in areas:
  86. if x['parent_id'] in districts_list_map:
  87. districts_list_map[x['parent_id']].append(x)
  88. area_map[x['area_id']] = x
  89. # 读取 Excel 文件中的数据
  90. df = pd.read_excel(io=input_path)
  91. # 清理 DataFrame 中的空白字符(排除特定列)
  92. columns_to_clean = list(filter(lambda x: x not in ('房间名称'), df.columns))
  93. df[columns_to_clean] = df[columns_to_clean].map(
  94. lambda x: re.sub(r'\s+', '', x) if type(x) is str else x
  95. )
  96. # 定义函数:根据资产所属单位获取二级组织机构编码
  97. def get_area_no(x):
  98. second_unit = x['资产所属单位(二级)']
  99. third_unit = x['资产所属单位(三级)']
  100. if '河北' == second_unit:
  101. return '-12'
  102. if '长途通信传输局' == second_unit:
  103. return '-11'
  104. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  105. return '782'
  106. for second_org in second_orgs:
  107. area_name = second_org['name']
  108. area_no = second_org['id']
  109. if area_name in second_unit:
  110. return area_no
  111. raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}')
  112. # 应用 get_area_no 函数,生成二级组织机构编码列
  113. df['二级组织机构编码'] = df.apply(get_area_no, axis=1)
  114. # 定义函数:根据二级组织机构编码获取对应的名称
  115. def get_area_name(x):
  116. area_no = x['二级组织机构编码']
  117. second_org = org_map[area_no]
  118. area_name = second_org['name']
  119. return area_name
  120. # 应用 get_area_name 函数,生成二级组织机构名称列
  121. df['二级组织机构名称'] = df.apply(get_area_name, axis=1)
  122. # 定义函数:根据资产所属单位获取三级组织机构编码
  123. def get_city_no(x):
  124. third_unit = x['资产所属单位(三级)']
  125. area_name = x['二级组织机构名称']
  126. area_no = x['二级组织机构编码']
  127. if area_name == '石家庄':
  128. if '矿区' in third_unit:
  129. return 'D0130185'
  130. if '井陉' in third_unit:
  131. return 'D0130121'
  132. if area_name == '秦皇岛':
  133. if '北戴河新区' in third_unit:
  134. return 'D0130185'
  135. if '北戴河' in third_unit:
  136. return 'D0130304'
  137. if area_name == '唐山':
  138. if '滦县' in third_unit:
  139. return 'D0130223'
  140. if '高新技术开发区' in third_unit:
  141. return 'D0130205'
  142. if area_name == '邢台':
  143. if '内丘' in third_unit:
  144. return 'D0130523'
  145. if '任泽' in third_unit:
  146. return 'D0130526'
  147. if area_name == '邯郸':
  148. if '峰峰' in third_unit:
  149. return 'D0130406'
  150. if area_name == '省机动局':
  151. if '沧州' in third_unit:
  152. return 'HECS180'
  153. if '唐山' in third_unit:
  154. return 'HECS181'
  155. if '秦皇岛' in third_unit:
  156. return 'HECS182'
  157. if '廊坊' in third_unit:
  158. return 'HECS183'
  159. if '张家口' in third_unit:
  160. return 'HECS184'
  161. if '邢台' in third_unit:
  162. return 'HECS185'
  163. if '邯郸' in third_unit:
  164. return 'HECS186'
  165. if '保定' in third_unit:
  166. return 'HECS187'
  167. if '石家庄' in third_unit:
  168. return 'HECS188'
  169. if '承德' in third_unit:
  170. return 'HECS189'
  171. if '衡水' in third_unit:
  172. return 'HECS720'
  173. if '雄安' in third_unit:
  174. return 'HECS728'
  175. return 'HECS018'
  176. if '雄安' == area_name:
  177. third_unit = third_unit.replace('雄安新区', '')
  178. third_org_list = third_org_list_map[area_no]
  179. for third_org in third_org_list:
  180. city_name = third_org['name']
  181. if city_name in third_unit:
  182. return third_org['id']
  183. if '沧州' == area_name:
  184. return 'D0130911'
  185. if '唐山' == area_name:
  186. return 'D0130202'
  187. if '秦皇岛' == area_name:
  188. return 'D0130302'
  189. if '廊坊' == area_name:
  190. return 'D0131000'
  191. if '张家口' == area_name:
  192. return 'D0130701'
  193. if '邢台' == area_name:
  194. return 'D0130502'
  195. if '邯郸' == area_name:
  196. return 'D0130402'
  197. if '保定' == area_name:
  198. return 'D0130601'
  199. if '石家庄' == area_name:
  200. return 'D0130186'
  201. if '承德' == area_name:
  202. return 'D0130801'
  203. if '衡水' == area_name:
  204. return 'D0133001'
  205. if '雄安' == area_name:
  206. return 'D0130830'
  207. return 'HE001'
  208. # 应用 get_city_no 函数,生成三级组织机构编码列
  209. df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
  210. # 定义函数:根据三级组织机构编码获取对应的名称
  211. def get_city_name(x):
  212. city_no = x['三级组织机构编码']
  213. third_org = org_map[city_no]
  214. city_name = third_org['name']
  215. return city_name
  216. # 应用 get_city_name 函数,生成三级组织机构名称列
  217. df['三级组织机构名称'] = df.apply(get_city_name, axis=1)
  218. # 定义函数:根据地址和资产所属单位获取城市 ID
  219. def get_city_id(x):
  220. address = x['标准地址']
  221. second_unit = x['资产所属单位(二级)']
  222. third_unit = x['资产所属单位(三级)']
  223. if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
  224. return '133100'
  225. for city in cities:
  226. area_name = city['short_name']
  227. area_id = city['area_id']
  228. if area_name in second_unit or area_name in third_unit or area_name in address:
  229. return area_id
  230. return ''
  231. # 应用 get_city_id 函数,生成城市 ID 列
  232. df['city_id'] = df.apply(get_city_id, axis=1)
  233. # 定义函数:根据城市 ID 获取城市名称
  234. def get_city(x):
  235. city_id = x['city_id']
  236. area = area_map.get(city_id)
  237. if pd.notna(area):
  238. city = area['area_name']
  239. return city
  240. return ''
  241. # 应用 get_city 函数,生成城市名称列
  242. df['city'] = df.apply(get_city, axis=1)
  243. # 定义函数:根据地址和城市信息获取区县 ID
  244. def get_district_id(x):
  245. address = x['标准地址']
  246. city = x['city']
  247. city_id = x['city_id']
  248. if pd.isna(city) or pd.isna(address):
  249. return ''
  250. if city == '石家庄':
  251. if '矿区' in address:
  252. return '130107'
  253. if '井陉' in address:
  254. return '130121'
  255. if city == '唐山':
  256. if '滦县' in address:
  257. return '130284'
  258. if city == '邢台':
  259. if '内邱' in address:
  260. return '130523'
  261. if '任县' in address:
  262. return '130505'
  263. if city == '雄安':
  264. address = address.replace('雄安新区', '')
  265. districts = districts_list_map.get(city_id)
  266. if not districts:
  267. return ''
  268. for district in districts:
  269. district_name = district['short_name']
  270. if district_name in address:
  271. return district['area_id']
  272. return ''
  273. # 应用 get_district_id 函数,生成区县 ID 列
  274. df['district_id'] = df.apply(get_district_id, axis=1)
  275. # 定义函数:根据区县 ID 获取区县名称
  276. def get_district(x):
  277. district_id = x['district_id']
  278. area = area_map.get(district_id)
  279. if pd.notna(area):
  280. district = area['area_name']
  281. return district
  282. return ''
  283. # 应用 get_district 函数,生成区县名称列
  284. df['district'] = df.apply(get_district, axis=1)
  285. def get_int(x):
  286. try:
  287. return int(x)
  288. except Exception:
  289. return ""
  290. df['工位总数'] = df['工位总数'].apply(get_int)
  291. # 在 DataFrame 中插入年月列
  292. df.insert(0, '年月', year_month)
  293. # 打印 DataFrame 的信息
  294. print(df.info())
  295. # 将处理后的数据保存为 CSV 文件
  296. df.to_csv(
  297. path_or_buf=output_path,
  298. index=False,
  299. header=[
  300. 'year_month', 'first_unit', 'second_unit', 'third_unit', 'building_name', 'address', 'floor',
  301. 'floor_building_area', 'floor_usable_area', 'room_name', 'room_status', 'rent_type',
  302. 'first_room_type', 'second_room_type', 'seat_num', 'frontage', 'building_area',
  303. 'building_area_self_use', 'building_area_idle', 'building_area_rent', 'building_area_unusable',
  304. 'usable_area', 'usable_area_self_use', 'usable_area_idle', 'usable_area_rent', 'usable_area_unusable',
  305. 'idle_start_date', 'unusable_reason', 'floor_height', 'load_bearing', 'area_no', 'area_name',
  306. 'city_no', 'city_name', 'city_id', 'city', 'district_id', 'district'
  307. ],
  308. encoding='utf-8-sig'
  309. )
  310. def data_import():
  311. # 定义 PowerShell 脚本的路径
  312. script_path = r"../../copy.ps1"
  313. # 目标表和文件信息
  314. table = "house.room_month" # 数据库目标表名
  315. # 表字段列名,用于指定导入数据的列顺序
  316. columns = "year_month,first_unit,second_unit,third_unit,building_name,address,floor,floor_building_area,floor_usable_area,room_name,room_status,rent_type,first_room_type,second_room_type,seat_num,frontage,building_area,building_area_self_use,building_area_idle,building_area_rent,building_area_unusable,usable_area,usable_area_self_use,usable_area_idle,usable_area_rent,usable_area_unusable,idle_start_date,unusable_reason,floor_height,load_bearing,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
  317. # 构造执行 PowerShell 脚本的命令
  318. command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  319. # 打印生成的命令,方便调试和日志记录
  320. logger.info("command: {}", command)
  321. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  322. completed_process = subprocess.run(
  323. command, # 执行的命令
  324. check=False, # 如果命令执行失败,不抛出异常
  325. text=True, # 将输出作为字符串处理
  326. capture_output=True, # 捕获标准输出和标准错误
  327. )
  328. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  329. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  330. completed_process.stderr)
  331. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  332. p = re.compile(r"^(COPY) (\d+)$")
  333. count = None # 初始化计数变量
  334. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  335. if matcher:
  336. count = int(matcher.group(2)) # 提取导入的数据行数
  337. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  338. if count is None:
  339. raise RuntimeError("导入数据失败")
  340. def upload_file():
  341. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  342. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  343. with paramiko.SSHClient() as ssh:
  344. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  345. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  346. # 连接到远程主机,传入主机地址、端口、用户名和密码
  347. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  348. # 执行远程命令,创建远程目录(如果不存在)
  349. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  350. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  351. with ssh.open_sftp() as sftp:
  352. # 记录日志,提示即将上传的本地文件和远程目标路径
  353. logger.info("upload {} to {}", input_path, remote_path)
  354. # 使用SFTP的put方法将本地文件上传到远程主机
  355. sftp.put(input_path, remote_path)
  356. # 记录日志,提示文件已成功上传
  357. logger.info("uploaded {}", input_path)
  358. def data_update():
  359. with psycopg.connect(
  360. conninfo=conn_info,
  361. ) as conn:
  362. with conn.cursor() as curs:
  363. # 更新人均办公面积
  364. sql = f"""
  365. with
  366. t100 as (
  367. select
  368. id as area_no,
  369. name as area_name,
  370. order_num as area_order
  371. from
  372. common.organization
  373. where
  374. id in ('-11', '-12')
  375. ),
  376. t101 as (
  377. select
  378. area_no,
  379. sum(building_area_self_use) as building_area_self_use_sum
  380. from
  381. house.room_month
  382. where
  383. second_room_type = '办公用房'
  384. and year_month = {year_month}
  385. and area_no in ('-11', '-12')
  386. group by
  387. area_no
  388. ),
  389. t102 as (
  390. select
  391. *
  392. from
  393. house.staff_second_unit
  394. where
  395. year_month = (
  396. select
  397. max(year_month)
  398. from
  399. house.staff_second_unit)
  400. and area_no in ('-11', '-12')
  401. ),
  402. t103 as (
  403. select
  404. t100.area_no,
  405. t100.area_name,
  406. '' as city_no,
  407. '' as city_name,
  408. t101.building_area_self_use_sum,
  409. t102.total,
  410. t100.area_order,
  411. 0 as city_order
  412. from
  413. t100
  414. left join t101 on
  415. t100.area_no = t101.area_no
  416. left join t102 on
  417. t100.area_no = t102.area_no
  418. ),
  419. t200 as (
  420. select
  421. b.id as area_no,
  422. b.name as area_name,
  423. a.id as city_no,
  424. a.name as city_name,
  425. b.order_num as area_order,
  426. a.order_num as city_order
  427. from
  428. common.organization a
  429. left join common.organization b on
  430. a.parent_id = b.id
  431. where
  432. a.unhide = 1
  433. and a.grade = 2
  434. and a.parent_id not in ('-11', '-12')
  435. order by
  436. b.id,
  437. a.id
  438. ),
  439. t201 as (
  440. select
  441. area_no,
  442. city_no,
  443. sum(building_area_self_use) as building_area_self_use_sum
  444. from
  445. house.room_month
  446. where
  447. second_room_type = '办公用房'
  448. and area_no not in ('-11', '-12')
  449. and year_month = {year_month}
  450. group by
  451. area_no,
  452. city_no
  453. ),
  454. t202 as (
  455. select
  456. *
  457. from
  458. house.staff_third_unit
  459. where
  460. year_month = (
  461. select
  462. max(year_month)
  463. from
  464. house.staff_third_unit)
  465. and area_no not in ('-11', '-12')
  466. ),
  467. t203 as (
  468. select
  469. t200.area_no,
  470. t200.area_name,
  471. t200.city_no,
  472. t200.city_name,
  473. t201.building_area_self_use_sum,
  474. t202.total,
  475. t200.area_order,
  476. t200.city_order
  477. from
  478. t200
  479. left join t201 on
  480. t200.area_no = t201.area_no
  481. and t200.city_no = t201.city_no
  482. left join t202 on
  483. t200.area_no = t202.area_no
  484. and t200.city_no = t202.city_no
  485. ),
  486. t301 as (
  487. select
  488. *
  489. from
  490. t103
  491. union all
  492. select
  493. *
  494. from
  495. t203
  496. )
  497. insert
  498. into
  499. house.building_office_area_stat
  500. (
  501. year_month,
  502. area_no,
  503. area_name,
  504. city_no,
  505. city_name,
  506. building_area_self_use_sum,
  507. total,
  508. area_avg,
  509. area_order,
  510. city_order
  511. )
  512. select
  513. {year_month} as year_month,
  514. area_no,
  515. area_name,
  516. city_no,
  517. city_name,
  518. coalesce(building_area_self_use_sum, 0) as building_area_self_use_sum,
  519. coalesce(total, 0) as total,
  520. case
  521. when total = 0 then null
  522. else round(coalesce(building_area_self_use_sum, 0) / total, 2)
  523. end as area_avg,
  524. area_order,
  525. city_order
  526. from
  527. t301
  528. order by
  529. area_order,
  530. city_order
  531. """
  532. logger.info(f"sql: {sql}")
  533. curs.execute(sql)
  534. logger.info(f"update {curs.rowcount}")
  535. data_process()
  536. data_import()
  537. upload_file()
  538. data_update()