house_fang_jian.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574
  1. """不动产房间数据处理
  2. """
  3. import re
  4. from datetime import datetime
  5. from dateutil.relativedelta import relativedelta
  6. from loguru import logger
  7. import pandas as pd
  8. import psycopg
  9. import subprocess
  10. import paramiko
  11. # 配置日志记录器,将日志写入文件a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/house/room/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. org_map = {} # 用于存储组织机构信息
  36. third_org_list_map = {} # 用于存储三级组织机构列表
  37. area_map = {} # 用于存储区域信息
  38. districts_list_map = {} # 用于存储区县信息
  39. # 连接到 PostgreSQL 数据库
  40. with psycopg.connect(
  41. conninfo=conn_info,
  42. row_factory=psycopg.rows.dict_row # 设置返回结果为字典格式
  43. ) as conn:
  44. with conn.cursor() as curs:
  45. # 查询所有二级组织机构(grade=1)
  46. sql = """
  47. select * from common.organization where grade = 1
  48. """
  49. logger.info(f"sql: {sql}")
  50. curs.execute(sql)
  51. second_orgs = curs.fetchall()
  52. # 初始化 third_org_list_map,以每个二级组织的 ID 为键,值为空列表
  53. for x in second_orgs:
  54. third_org_list_map[x['id']] = []
  55. # 查询所有组织机构信息
  56. sql = """
  57. select * from common.organization
  58. """
  59. logger.info(f"sql: {sql}")
  60. curs.execute(sql)
  61. orgs = curs.fetchall()
  62. # 构建 org_map 和 third_org_list_map
  63. for x in orgs:
  64. if x['parent_id'] in third_org_list_map:
  65. third_org_list_map[x['parent_id']].append(x)
  66. org_map[x['id']] = x
  67. # 查询所有一级行政区划(area_grade=1),并按 area_id 排序
  68. sql = """
  69. select * from common.area where area_grade = 1 order by area_id
  70. """
  71. logger.info(f"sql: {sql}")
  72. curs.execute(sql)
  73. cities = curs.fetchall()
  74. # 初始化 districts_list_map,以每个城市的 area_id 为键,值为空列表
  75. for x in cities:
  76. districts_list_map[x['area_id']] = []
  77. # 查询所有区域信息
  78. sql = """
  79. select * from common.area
  80. """
  81. logger.info(f"sql: {sql}")
  82. curs.execute(sql)
  83. areas = curs.fetchall()
  84. # 构建 area_map 和 districts_list_map
  85. for x in areas:
  86. if x['parent_id'] in districts_list_map:
  87. districts_list_map[x['parent_id']].append(x)
  88. area_map[x['area_id']] = x
  89. # 读取 Excel 文件中的数据
  90. df = pd.read_excel(io=input_path)
  91. # 清理 DataFrame 中的空白字符(排除特定列)
  92. columns_to_clean = list(filter(lambda x: x not in ('房间名称'), df.columns))
  93. df[columns_to_clean] = df[columns_to_clean].map(
  94. lambda x: re.sub(r'\s+', '', x) if type(x) is str else x
  95. )
  96. # 定义函数:根据资产所属单位获取二级组织机构编码
  97. def get_area_no(x):
  98. second_unit = x['资产所属单位(二级)']
  99. third_unit = x['资产所属单位(三级)']
  100. if '长途通信传输局' == second_unit:
  101. return '-11'
  102. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  103. return '782'
  104. for second_org in second_orgs:
  105. area_name = second_org['name']
  106. area_no = second_org['id']
  107. if area_name in second_unit:
  108. return area_no
  109. return '-12'
  110. # 应用 get_area_no 函数,生成二级组织机构编码列
  111. df['二级组织机构编码'] = df.apply(get_area_no, axis=1)
  112. # 定义函数:根据二级组织机构编码获取对应的名称
  113. def get_area_name(x):
  114. area_no = x['二级组织机构编码']
  115. second_org = org_map[area_no]
  116. area_name = second_org['name']
  117. return area_name
  118. # 应用 get_area_name 函数,生成二级组织机构名称列
  119. df['二级组织机构名称'] = df.apply(get_area_name, axis=1)
  120. # 定义函数:根据资产所属单位获取三级组织机构编码
  121. def get_city_no(x):
  122. third_unit = x['资产所属单位(三级)']
  123. area_name = x['二级组织机构名称']
  124. area_no = x['二级组织机构编码']
  125. if area_name == '石家庄':
  126. if '矿区' in third_unit:
  127. return 'D0130185'
  128. if '井陉' in third_unit:
  129. return 'D0130121'
  130. if area_name == '秦皇岛':
  131. if '北戴河新区' in third_unit:
  132. return 'D0130185'
  133. if '北戴河' in third_unit:
  134. return 'D0130304'
  135. if area_name == '唐山':
  136. if '滦县' in third_unit:
  137. return 'D0130223'
  138. if '高新技术开发区' in third_unit:
  139. return 'D0130205'
  140. if area_name == '邢台':
  141. if '内丘' in third_unit:
  142. return 'D0130523'
  143. if '任泽' in third_unit:
  144. return 'D0130526'
  145. if area_name == '邯郸':
  146. if '峰峰' in third_unit:
  147. return 'D0130406'
  148. if area_name == '省机动局':
  149. if '沧州' in third_unit:
  150. return 'HECS180'
  151. if '唐山' in third_unit:
  152. return 'HECS181'
  153. if '秦皇岛' in third_unit:
  154. return 'HECS182'
  155. if '廊坊' in third_unit:
  156. return 'HECS183'
  157. if '张家口' in third_unit:
  158. return 'HECS184'
  159. if '邢台' in third_unit:
  160. return 'HECS185'
  161. if '邯郸' in third_unit:
  162. return 'HECS186'
  163. if '保定' in third_unit:
  164. return 'HECS187'
  165. if '石家庄' in third_unit:
  166. return 'HECS188'
  167. if '承德' in third_unit:
  168. return 'HECS189'
  169. if '衡水' in third_unit:
  170. return 'HECS720'
  171. if '雄安' in third_unit:
  172. return 'HECS728'
  173. return 'HECS018'
  174. if '雄安' == area_name:
  175. third_unit = third_unit.replace('雄安新区', '')
  176. third_org_list = third_org_list_map[area_no]
  177. for third_org in third_org_list:
  178. city_name = third_org['name']
  179. if city_name in third_unit:
  180. return third_org['id']
  181. if '沧州' == area_name:
  182. return 'D0130911'
  183. if '唐山' == area_name:
  184. return 'D0130202'
  185. if '秦皇岛' == area_name:
  186. return 'D0130302'
  187. if '廊坊' == area_name:
  188. return 'D0131000'
  189. if '张家口' == area_name:
  190. return 'D0130701'
  191. if '邢台' == area_name:
  192. return 'D0130502'
  193. if '邯郸' == area_name:
  194. return 'D0130402'
  195. if '保定' == area_name:
  196. return 'D0130601'
  197. if '石家庄' == area_name:
  198. return 'D0130186'
  199. if '承德' == area_name:
  200. return 'D0130801'
  201. if '衡水' == area_name:
  202. return 'D0133001'
  203. if '雄安' == area_name:
  204. return 'D0130830'
  205. return 'HE001'
  206. # 应用 get_city_no 函数,生成三级组织机构编码列
  207. df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
  208. # 定义函数:根据三级组织机构编码获取对应的名称
  209. def get_city_name(x):
  210. city_no = x['三级组织机构编码']
  211. third_org = org_map[city_no]
  212. city_name = third_org['name']
  213. return city_name
  214. # 应用 get_city_name 函数,生成三级组织机构名称列
  215. df['三级组织机构名称'] = df.apply(get_city_name, axis=1)
  216. # 定义函数:根据地址和资产所属单位获取城市 ID
  217. def get_city_id(x):
  218. address = x['标准地址']
  219. second_unit = x['资产所属单位(二级)']
  220. third_unit = x['资产所属单位(三级)']
  221. if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
  222. return '133100'
  223. for city in cities:
  224. area_name = city['short_name']
  225. area_id = city['area_id']
  226. if area_name in second_unit or area_name in third_unit or area_name in address:
  227. return area_id
  228. return ''
  229. # 应用 get_city_id 函数,生成城市 ID 列
  230. df['city_id'] = df.apply(get_city_id, axis=1)
  231. # 定义函数:根据城市 ID 获取城市名称
  232. def get_city(x):
  233. city_id = x['city_id']
  234. area = area_map.get(city_id)
  235. if pd.notna(area):
  236. city = area['area_name']
  237. return city
  238. return ''
  239. # 应用 get_city 函数,生成城市名称列
  240. df['city'] = df.apply(get_city, axis=1)
  241. # 定义函数:根据地址和城市信息获取区县 ID
  242. def get_district_id(x):
  243. address = x['标准地址']
  244. city = x['city']
  245. city_id = x['city_id']
  246. if pd.isna(city) or pd.isna(address):
  247. return ''
  248. if city == '石家庄':
  249. if '矿区' in address:
  250. return '130107'
  251. if '井陉' in address:
  252. return '130121'
  253. if city == '唐山':
  254. if '滦县' in address:
  255. return '130284'
  256. if city == '邢台':
  257. if '内邱' in address:
  258. return '130523'
  259. if '任县' in address:
  260. return '130505'
  261. if city == '雄安':
  262. address = address.replace('雄安新区', '')
  263. districts = districts_list_map.get(city_id)
  264. if not districts:
  265. return ''
  266. for district in districts:
  267. district_name = district['short_name']
  268. if district_name in address:
  269. return district['area_id']
  270. return ''
  271. # 应用 get_district_id 函数,生成区县 ID 列
  272. df['district_id'] = df.apply(get_district_id, axis=1)
  273. # 定义函数:根据区县 ID 获取区县名称
  274. def get_district(x):
  275. district_id = x['district_id']
  276. area = area_map.get(district_id)
  277. if pd.notna(area):
  278. district = area['area_name']
  279. return district
  280. return ''
  281. # 应用 get_district 函数,生成区县名称列
  282. df['district'] = df.apply(get_district, axis=1)
  283. def get_int(x):
  284. try:
  285. return int(x)
  286. except Exception:
  287. return ""
  288. df['工位总数'] = df['工位总数'].apply(get_int)
  289. # 在 DataFrame 中插入年月列
  290. df.insert(0, '年月', year_month)
  291. # 打印 DataFrame 的信息
  292. print(df.info())
  293. # 将处理后的数据保存为 CSV 文件
  294. df.to_csv(
  295. path_or_buf=output_path,
  296. index=False,
  297. header=[
  298. 'year_month', 'first_unit', 'second_unit', 'third_unit', 'building_name', 'address', 'floor',
  299. 'floor_building_area', 'floor_usable_area', 'room_name', 'room_status', 'rent_type',
  300. 'first_room_type', 'second_room_type', 'seat_num', 'frontage', 'building_area',
  301. 'building_area_self_use', 'building_area_idle', 'building_area_rent', 'building_area_unusable',
  302. 'usable_area', 'usable_area_self_use', 'usable_area_idle', 'usable_area_rent', 'usable_area_unusable',
  303. 'idle_start_date', 'unusable_reason', 'floor_height', 'load_bearing', 'area_no', 'area_name',
  304. 'city_no', 'city_name', 'city_id', 'city', 'district_id', 'district'
  305. ],
  306. encoding='utf-8-sig'
  307. )
  308. def data_import():
  309. # 定义 PowerShell 脚本的路径
  310. script_path = r"../../copy.ps1"
  311. # 目标表和文件信息
  312. table = "house.room_month" # 数据库目标表名
  313. # 表字段列名,用于指定导入数据的列顺序
  314. columns = "year_month,first_unit,second_unit,third_unit,building_name,address,floor,floor_building_area,floor_usable_area,room_name,room_status,rent_type,first_room_type,second_room_type,seat_num,frontage,building_area,building_area_self_use,building_area_idle,building_area_rent,building_area_unusable,usable_area,usable_area_self_use,usable_area_idle,usable_area_rent,usable_area_unusable,idle_start_date,unusable_reason,floor_height,load_bearing,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
  315. # 构造执行 PowerShell 脚本的命令
  316. command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  317. # 打印生成的命令,方便调试和日志记录
  318. logger.info("command: {}", command)
  319. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  320. completed_process = subprocess.run(
  321. command, # 执行的命令
  322. check=False, # 如果命令执行失败,不抛出异常
  323. text=True, # 将输出作为字符串处理
  324. capture_output=True, # 捕获标准输出和标准错误
  325. )
  326. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  327. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  328. completed_process.stderr)
  329. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  330. p = re.compile(r"^(COPY) (\d+)$")
  331. count = None # 初始化计数变量
  332. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  333. if matcher:
  334. count = int(matcher.group(2)) # 提取导入的数据行数
  335. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  336. if count is None:
  337. raise RuntimeError("导入数据失败")
  338. def upload_file():
  339. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  340. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  341. with paramiko.SSHClient() as ssh:
  342. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  343. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  344. # 连接到远程主机,传入主机地址、端口、用户名和密码
  345. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  346. # 执行远程命令,创建远程目录(如果不存在)
  347. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  348. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  349. with ssh.open_sftp() as sftp:
  350. # 记录日志,提示即将上传的本地文件和远程目标路径
  351. logger.info("upload {} to {}", input_path, remote_path)
  352. # 使用SFTP的put方法将本地文件上传到远程主机
  353. sftp.put(input_path, remote_path)
  354. # 记录日志,提示文件已成功上传
  355. logger.info("uploaded {}", input_path)
  356. def data_update():
  357. with psycopg.connect(
  358. conninfo=conn_info,
  359. ) as conn:
  360. with conn.cursor() as curs:
  361. # 更新人均办公面积
  362. sql = f"""
  363. with
  364. t100 as (
  365. select
  366. id as area_no,
  367. name as area_name,
  368. order_num as area_order
  369. from
  370. common.organization
  371. where
  372. id in ('-11', '-12')
  373. ),
  374. t101 as (
  375. select
  376. area_no,
  377. sum(building_area_self_use) as building_area_self_use_sum
  378. from
  379. house.room_month
  380. where
  381. second_room_type = '办公用房'
  382. and year_month = {year_month}
  383. and area_no in ('-11', '-12')
  384. group by
  385. area_no
  386. ),
  387. t102 as (
  388. select
  389. *
  390. from
  391. house.staff_second_unit
  392. where
  393. year_month = (
  394. select
  395. max(year_month)
  396. from
  397. house.staff_second_unit)
  398. and area_no in ('-11', '-12')
  399. ),
  400. t103 as (
  401. select
  402. t100.area_no,
  403. t100.area_name,
  404. '' as city_no,
  405. '' as city_name,
  406. t101.building_area_self_use_sum,
  407. t102.total,
  408. t100.area_order,
  409. 0 as city_order
  410. from
  411. t100
  412. left join t101 on
  413. t100.area_no = t101.area_no
  414. left join t102 on
  415. t100.area_no = t102.area_no
  416. ),
  417. t200 as (
  418. select
  419. b.id as area_no,
  420. b.name as area_name,
  421. a.id as city_no,
  422. a.name as city_name,
  423. b.order_num as area_order,
  424. a.order_num as city_order
  425. from
  426. common.organization a
  427. left join common.organization b on
  428. a.parent_id = b.id
  429. where
  430. a.unhide = 1
  431. and a.grade = 2
  432. and a.parent_id not in ('-11', '-12')
  433. order by
  434. b.id,
  435. a.id
  436. ),
  437. t201 as (
  438. select
  439. area_no,
  440. city_no,
  441. sum(building_area_self_use) as building_area_self_use_sum
  442. from
  443. house.room_month
  444. where
  445. second_room_type = '办公用房'
  446. and area_no not in ('-11', '-12')
  447. and year_month = {year_month}
  448. group by
  449. area_no,
  450. city_no
  451. ),
  452. t202 as (
  453. select
  454. *
  455. from
  456. house.staff_third_unit
  457. where
  458. year_month = (
  459. select
  460. max(year_month)
  461. from
  462. house.staff_third_unit)
  463. and area_no not in ('-11', '-12')
  464. ),
  465. t203 as (
  466. select
  467. t200.area_no,
  468. t200.area_name,
  469. t200.city_no,
  470. t200.city_name,
  471. t201.building_area_self_use_sum,
  472. t202.total,
  473. t200.area_order,
  474. t200.city_order
  475. from
  476. t200
  477. left join t201 on
  478. t200.area_no = t201.area_no
  479. and t200.city_no = t201.city_no
  480. left join t202 on
  481. t200.area_no = t202.area_no
  482. and t200.city_no = t202.city_no
  483. ),
  484. t301 as (
  485. select
  486. *
  487. from
  488. t103
  489. union all
  490. select
  491. *
  492. from
  493. t203
  494. )
  495. insert
  496. into
  497. house.building_office_area_stat
  498. (
  499. year_month,
  500. area_no,
  501. area_name,
  502. city_no,
  503. city_name,
  504. building_area_self_use_sum,
  505. total,
  506. area_avg,
  507. area_order,
  508. city_order
  509. )
  510. select
  511. {year_month} as year_month,
  512. area_no,
  513. area_name,
  514. city_no,
  515. city_name,
  516. coalesce(building_area_self_use_sum, 0) as building_area_self_use_sum,
  517. coalesce(total, 0) as total,
  518. case
  519. when total = 0 then null
  520. else round(coalesce(building_area_self_use_sum, 0) / total, 2)
  521. end as area_avg,
  522. area_order,
  523. city_order
  524. from
  525. t301
  526. order by
  527. area_order,
  528. city_order
  529. """
  530. logger.info(f"sql: {sql}")
  531. curs.execute(sql)
  532. logger.info(f"update {curs.rowcount}")
  533. data_process()
  534. data_import()
  535. upload_file()
  536. data_update()