house_fang_jian.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
  1. """不动产房间数据处理
  2. """
  3. import re
  4. from datetime import datetime
  5. from dateutil.relativedelta import relativedelta
  6. from loguru import logger
  7. import pandas as pd
  8. import psycopg
  9. import subprocess
  10. import paramiko
  11. # 配置日志记录器,将日志写入文件a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = 'Hebei_123.' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/house/room/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. org_map = {} # 用于存储组织机构信息
  36. third_org_list_map = {} # 用于存储三级组织机构列表
  37. area_map = {} # 用于存储区域信息
  38. districts_list_map = {} # 用于存储区县信息
  39. # 连接到 PostgreSQL 数据库
  40. with psycopg.connect(
  41. conninfo=conn_info,
  42. row_factory=psycopg.rows.dict_row # 设置返回结果为字典格式
  43. ) as conn:
  44. with conn.cursor() as curs:
  45. # 查询所有二级组织机构(grade=1)
  46. sql = """
  47. select * from common.organization where grade = 1
  48. """
  49. logger.info(f"sql: {sql}")
  50. curs.execute(sql)
  51. second_orgs = curs.fetchall()
  52. # 初始化 third_org_list_map,以每个二级组织的 ID 为键,值为空列表
  53. for x in second_orgs:
  54. third_org_list_map[x['id']] = []
  55. # 查询所有组织机构信息
  56. sql = """
  57. select * from common.organization
  58. """
  59. logger.info(f"sql: {sql}")
  60. curs.execute(sql)
  61. orgs = curs.fetchall()
  62. # 构建 org_map 和 third_org_list_map
  63. for x in orgs:
  64. if x['parent_id'] in third_org_list_map:
  65. third_org_list_map[x['parent_id']].append(x)
  66. org_map[x['id']] = x
  67. # 查询所有一级行政区划(area_grade=1),并按 area_id 排序
  68. sql = """
  69. select * from common.area where area_grade = 1 order by area_id
  70. """
  71. logger.info(f"sql: {sql}")
  72. curs.execute(sql)
  73. cities = curs.fetchall()
  74. # 初始化 districts_list_map,以每个城市的 area_id 为键,值为空列表
  75. for x in cities:
  76. districts_list_map[x['area_id']] = []
  77. # 查询所有区域信息
  78. sql = """
  79. select * from common.area
  80. """
  81. logger.info(f"sql: {sql}")
  82. curs.execute(sql)
  83. areas = curs.fetchall()
  84. # 构建 area_map 和 districts_list_map
  85. for x in areas:
  86. if x['parent_id'] in districts_list_map:
  87. districts_list_map[x['parent_id']].append(x)
  88. area_map[x['area_id']] = x
  89. # 读取 Excel 文件中的数据
  90. df = pd.read_excel(io=input_path)
  91. # 清理 DataFrame 中的空白字符(排除特定列)
  92. columns_to_clean = list(filter(lambda x: x not in ('房间名称'), df.columns))
  93. df[columns_to_clean] = df[columns_to_clean].map(
  94. lambda x: re.sub(r'\s+', '', x) if type(x) is str else x
  95. )
  96. # 定义函数:根据资产所属单位获取二级组织机构编码
  97. def get_area_no(x):
  98. second_unit = x['资产所属单位(二级)']
  99. third_unit = x['资产所属单位(三级)']
  100. if '长途通信传输局' == second_unit:
  101. return '-11'
  102. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  103. return '782'
  104. for second_org in second_orgs:
  105. area_name = second_org['name']
  106. area_no = second_org['id']
  107. if area_name in second_unit:
  108. return area_no
  109. return '-12'
  110. # 应用 get_area_no 函数,生成二级组织机构编码列
  111. df['二级组织机构编码'] = df.apply(get_area_no, axis=1)
  112. # 定义函数:根据二级组织机构编码获取对应的名称
  113. def get_area_name(x):
  114. area_no = x['二级组织机构编码']
  115. second_org = org_map[area_no]
  116. area_name = second_org['name']
  117. return area_name
  118. # 应用 get_area_name 函数,生成二级组织机构名称列
  119. df['二级组织机构名称'] = df.apply(get_area_name, axis=1)
  120. # 定义函数:根据资产所属单位获取三级组织机构编码
  121. def get_city_no(x):
  122. third_unit = x['资产所属单位(三级)']
  123. area_name = x['二级组织机构名称']
  124. area_no = x['二级组织机构编码']
  125. # 根据特定规则匹配城市编码
  126. if area_name == '石家庄':
  127. if '矿区' in third_unit:
  128. return 'D0130185'
  129. if '井陉' in third_unit:
  130. return 'D0130121'
  131. if area_name == '秦皇岛':
  132. if '北戴河新区' in third_unit:
  133. return 'D0130185'
  134. if '北戴河' in third_unit:
  135. return 'D0130304'
  136. # 其他规则省略...
  137. return 'HE001'
  138. # 应用 get_city_no 函数,生成三级组织机构编码列
  139. df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
  140. # 定义函数:根据三级组织机构编码获取对应的名称
  141. def get_city_name(x):
  142. city_no = x['三级组织机构编码']
  143. third_org = org_map[city_no]
  144. city_name = third_org['name']
  145. return city_name
  146. # 应用 get_city_name 函数,生成三级组织机构名称列
  147. df['三级组织机构名称'] = df.apply(get_city_name, axis=1)
  148. # 定义函数:根据地址和资产所属单位获取城市 ID
  149. def get_city_id(x):
  150. address = x['标准地址']
  151. second_unit = x['资产所属单位(二级)']
  152. third_unit = x['资产所属单位(三级)']
  153. if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
  154. return '133100'
  155. for city in cities:
  156. area_name = city['short_name']
  157. area_id = city['area_id']
  158. if area_name in second_unit or area_name in third_unit or area_name in address:
  159. return area_id
  160. return ''
  161. # 应用 get_city_id 函数,生成城市 ID 列
  162. df['city_id'] = df.apply(get_city_id, axis=1)
  163. # 定义函数:根据城市 ID 获取城市名称
  164. def get_city(x):
  165. city_id = x['city_id']
  166. area = area_map.get(city_id)
  167. if pd.notna(area):
  168. city = area['area_name']
  169. return city
  170. return ''
  171. # 应用 get_city 函数,生成城市名称列
  172. df['city'] = df.apply(get_city, axis=1)
  173. # 定义函数:根据地址和城市信息获取区县 ID
  174. def get_district_id(x):
  175. address = x['标准地址']
  176. city = x['city']
  177. city_id = x['city_id']
  178. if pd.isna(city) or pd.isna(address):
  179. return ''
  180. if city == '石家庄':
  181. if '矿区' in address:
  182. return '130107'
  183. if '井陉' in address:
  184. return '130121'
  185. # 其他规则省略...
  186. return ''
  187. # 应用 get_district_id 函数,生成区县 ID 列
  188. df['district_id'] = df.apply(get_district_id, axis=1)
  189. # 定义函数:根据区县 ID 获取区县名称
  190. def get_district(x):
  191. district_id = x['district_id']
  192. area = area_map.get(district_id)
  193. if pd.notna(area):
  194. district = area['area_name']
  195. return district
  196. return ''
  197. # 应用 get_district 函数,生成区县名称列
  198. df['district'] = df.apply(get_district, axis=1)
  199. def get_int(x):
  200. try:
  201. return int(x)
  202. except Exception:
  203. return ""
  204. df['工位总数'] = df['工位总数'].apply(get_int)
  205. # 在 DataFrame 中插入年月列
  206. df.insert(0, '年月', year_month)
  207. # 打印 DataFrame 的信息
  208. print(df.info())
  209. # 将处理后的数据保存为 CSV 文件
  210. df.to_csv(
  211. path_or_buf=output_path,
  212. index=False,
  213. header=[
  214. 'year_month', 'first_unit', 'second_unit', 'third_unit', 'building_name', 'address', 'floor',
  215. 'floor_building_area', 'floor_usable_area', 'room_name', 'room_status', 'rent_type',
  216. 'first_room_type', 'second_room_type', 'seat_num', 'frontage', 'building_area',
  217. 'building_area_self_use', 'building_area_idle', 'building_area_rent', 'building_area_unusable',
  218. 'usable_area', 'usable_area_self_use', 'usable_area_idle', 'usable_area_rent', 'usable_area_unusable',
  219. 'idle_start_date', 'unusable_reason', 'floor_height', 'load_bearing', 'area_no', 'area_name',
  220. 'city_no', 'city_name', 'city_id', 'city', 'district_id', 'district'
  221. ],
  222. encoding='utf-8-sig'
  223. )
  224. def data_import():
  225. # 定义 PowerShell 脚本的路径
  226. script_path = r"../../copy.ps1"
  227. # 目标表和文件信息
  228. table = "house.room_month" # 数据库目标表名
  229. # 表字段列名,用于指定导入数据的列顺序
  230. columns = "year_month,first_unit,second_unit,third_unit,building_name,address,floor,floor_building_area,floor_usable_area,room_name,room_status,rent_type,first_room_type,second_room_type,seat_num,frontage,building_area,building_area_self_use,building_area_idle,building_area_rent,building_area_unusable,usable_area,usable_area_self_use,usable_area_idle,usable_area_rent,usable_area_unusable,idle_start_date,unusable_reason,floor_height,load_bearing,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
  231. # 构造执行 PowerShell 脚本的命令
  232. command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  233. # 打印生成的命令,方便调试和日志记录
  234. logger.info("command: {}", command)
  235. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  236. completed_process = subprocess.run(
  237. command, # 执行的命令
  238. check=False, # 如果命令执行失败,不抛出异常
  239. text=True, # 将输出作为字符串处理
  240. capture_output=True, # 捕获标准输出和标准错误
  241. )
  242. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  243. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  244. completed_process.stderr)
  245. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  246. p = re.compile(r"^(COPY) (\d+)$")
  247. count = None # 初始化计数变量
  248. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  249. if matcher:
  250. count = int(matcher.group(2)) # 提取导入的数据行数
  251. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  252. if count is None:
  253. raise RuntimeError("导入数据失败")
  254. def upload_file():
  255. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  256. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  257. with paramiko.SSHClient() as ssh:
  258. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  259. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  260. # 连接到远程主机,传入主机地址、端口、用户名和密码
  261. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  262. # 执行远程命令,创建远程目录(如果不存在)
  263. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  264. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  265. with ssh.open_sftp() as sftp:
  266. # 记录日志,提示即将上传的本地文件和远程目标路径
  267. logger.info("upload {} to {}", input_path, remote_path)
  268. # 使用SFTP的put方法将本地文件上传到远程主机
  269. sftp.put(input_path, remote_path)
  270. # 记录日志,提示文件已成功上传
  271. logger.info("uploaded {}", input_path)
  272. def data_update():
  273. with psycopg.connect(
  274. conninfo=conn_info,
  275. ) as conn:
  276. with conn.cursor() as curs:
  277. # 更新人均办公面积
  278. sql = f"""
  279. with
  280. t100 as (
  281. select
  282. id as area_no,
  283. name as area_name,
  284. order_num as area_order
  285. from
  286. common.organization
  287. where
  288. id in ('-11', '-12')
  289. ),
  290. t101 as (
  291. select
  292. area_no,
  293. sum(building_area_self_use) as building_area_self_use_sum
  294. from
  295. house.room_month
  296. where
  297. second_room_type = '办公用房'
  298. and year_month = {year_month}
  299. and area_no in ('-11', '-12')
  300. group by
  301. area_no
  302. ),
  303. t102 as (
  304. select
  305. *
  306. from
  307. house.staff_second_unit
  308. where
  309. year_month = (
  310. select
  311. max(year_month)
  312. from
  313. house.staff_second_unit)
  314. and area_no in ('-11', '-12')
  315. ),
  316. t103 as (
  317. select
  318. t100.area_no,
  319. t100.area_name,
  320. '' as city_no,
  321. '' as city_name,
  322. t101.building_area_self_use_sum,
  323. t102.total,
  324. t100.area_order,
  325. 0 as city_order
  326. from
  327. t100
  328. left join t101 on
  329. t100.area_no = t101.area_no
  330. left join t102 on
  331. t100.area_no = t102.area_no
  332. ),
  333. t200 as (
  334. select
  335. b.id as area_no,
  336. b.name as area_name,
  337. a.id as city_no,
  338. a.name as city_name,
  339. b.order_num as area_order,
  340. a.order_num as city_order
  341. from
  342. common.organization a
  343. left join common.organization b on
  344. a.parent_id = b.id
  345. where
  346. a.unhide = 1
  347. and a.grade = 2
  348. and a.parent_id not in ('-11', '-12')
  349. order by
  350. b.id,
  351. a.id
  352. ),
  353. t201 as (
  354. select
  355. area_no,
  356. city_no,
  357. sum(building_area_self_use) as building_area_self_use_sum
  358. from
  359. house.room_month
  360. where
  361. second_room_type = '办公用房'
  362. and area_no not in ('-11', '-12')
  363. and year_month = {year_month}
  364. group by
  365. area_no,
  366. city_no
  367. ),
  368. t202 as (
  369. select
  370. *
  371. from
  372. house.staff_third_unit
  373. where
  374. year_month = (
  375. select
  376. max(year_month)
  377. from
  378. house.staff_third_unit)
  379. and area_no not in ('-11', '-12')
  380. ),
  381. t203 as (
  382. select
  383. t200.area_no,
  384. t200.area_name,
  385. t200.city_no,
  386. t200.city_name,
  387. t201.building_area_self_use_sum,
  388. t202.total,
  389. t200.area_order,
  390. t200.city_order
  391. from
  392. t200
  393. left join t201 on
  394. t200.area_no = t201.area_no
  395. and t200.city_no = t201.city_no
  396. left join t202 on
  397. t200.area_no = t202.area_no
  398. and t200.city_no = t202.city_no
  399. ),
  400. t301 as (
  401. select
  402. *
  403. from
  404. t103
  405. union all
  406. select
  407. *
  408. from
  409. t203
  410. )
  411. insert
  412. into
  413. house.building_office_area_stat
  414. (
  415. year_month,
  416. area_no,
  417. area_name,
  418. city_no,
  419. city_name,
  420. building_area_self_use_sum,
  421. total,
  422. area_avg,
  423. area_order,
  424. city_order
  425. )
  426. select
  427. {year_month} as year_month,
  428. area_no,
  429. area_name,
  430. city_no,
  431. city_name,
  432. coalesce(building_area_self_use_sum, 0) as building_area_self_use_sum,
  433. coalesce(total, 0) as total,
  434. case
  435. when total = 0 then null
  436. else round(coalesce(building_area_self_use_sum, 0) / total, 2)
  437. end as area_avg,
  438. area_order,
  439. city_order
  440. from
  441. t301
  442. order by
  443. area_order,
  444. city_order
  445. """
  446. logger.info(f"sql: {sql}")
  447. curs.execute(sql)
  448. logger.info(f"update {curs.rowcount}")
  449. data_process()
  450. data_import()
  451. upload_file()
  452. data_update()