house_fang_jian.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599
  1. """不动产房间数据处理
  2. """
  3. import re
  4. from datetime import datetime
  5. from dateutil.relativedelta import relativedelta
  6. from loguru import logger
  7. import pandas as pd
  8. import psycopg
  9. import subprocess
  10. import paramiko
  11. # 配置日志记录器,将日志写入文件a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/house/room/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. org_map = {} # 用于存储组织机构信息
  36. third_org_list_map = {} # 用于存储三级组织机构列表
  37. area_map = {} # 用于存储区域信息
  38. districts_list_map = {} # 用于存储区县信息
  39. # 连接到 PostgreSQL 数据库
  40. with psycopg.connect(
  41. conninfo=conn_info,
  42. row_factory=psycopg.rows.dict_row # 设置返回结果为字典格式
  43. ) as conn:
  44. with conn.cursor() as curs:
  45. # 查询所有二级组织机构(grade=1)
  46. sql = """
  47. select * from common.organization where grade = 1
  48. """
  49. logger.info(f"sql: {sql}")
  50. curs.execute(sql)
  51. second_orgs = curs.fetchall()
  52. # 初始化 third_org_list_map,以每个二级组织的 ID 为键,值为空列表
  53. for x in second_orgs:
  54. third_org_list_map[x['id']] = []
  55. # 查询所有组织机构信息
  56. sql = """
  57. select * from common.organization
  58. """
  59. logger.info(f"sql: {sql}")
  60. curs.execute(sql)
  61. orgs = curs.fetchall()
  62. # 构建 org_map 和 third_org_list_map
  63. for x in orgs:
  64. if x['parent_id'] in third_org_list_map:
  65. third_org_list_map[x['parent_id']].append(x)
  66. org_map[x['id']] = x
  67. # 查询所有一级行政区划(area_grade=1),并按 area_id 排序
  68. sql = """
  69. select * from common.area where area_grade = 1 order by area_id
  70. """
  71. logger.info(f"sql: {sql}")
  72. curs.execute(sql)
  73. cities = curs.fetchall()
  74. # 初始化 districts_list_map,以每个城市的 area_id 为键,值为空列表
  75. for x in cities:
  76. districts_list_map[x['area_id']] = []
  77. # 查询所有区域信息
  78. sql = """
  79. select * from common.area
  80. """
  81. logger.info(f"sql: {sql}")
  82. curs.execute(sql)
  83. areas = curs.fetchall()
  84. # 构建 area_map 和 districts_list_map
  85. for x in areas:
  86. if x['parent_id'] in districts_list_map:
  87. districts_list_map[x['parent_id']].append(x)
  88. area_map[x['area_id']] = x
  89. # 读取 Excel 文件中的数据
  90. df = pd.read_excel(io=input_path)
  91. # 获取当前 DataFrame 的列名列表
  92. columns = df.columns.tolist()
  93. # 定义所需的字段列表
  94. required_columns = ['资产所属单位(一级)', '资产所属单位(二级)', '资产所属单位(三级)', '建筑名称', '标准地址',
  95. '所在楼层', '楼层建筑总面积(㎡)', '楼层使用总面积(㎡)', '房间名称', '房间状态', '租赁类别',
  96. '房间类别(一级)', '房间类别(二级)', '工位总数', '是否临街', '建筑面积(㎡)', '建筑面积-自用(㎡)',
  97. '建筑面积-闲置(㎡)', '建筑面积-出租(㎡)', '建筑面积-不可使用(㎡)', '使用面积(㎡)',
  98. '使用面积-自用(㎡)', '使用面积-闲置(㎡)', '使用面积-出租(㎡)', '使用面积-不可使用(㎡)',
  99. '闲置开始时间', '不可使用原因', '层高(m)', '承重(kg/㎡)']
  100. # 检查是否有缺失的字段
  101. missing_columns = [col for col in required_columns if col not in columns]
  102. # 检查是否有多余的字段
  103. ex_columns = [col for col in columns if col not in required_columns]
  104. # 如果存在缺失字段,则抛出运行时错误并提示缺少哪些字段
  105. if missing_columns or ex_columns:
  106. raise RuntimeError(f"缺少以下字段: {missing_columns};存在以下多余字段:{ex_columns}")
  107. # 清理 DataFrame 中的空白字符(排除特定列)
  108. columns_to_clean = list(filter(lambda x: x not in ('房间名称'), df.columns))
  109. df[columns_to_clean] = df[columns_to_clean].map(
  110. lambda x: re.sub(r'\s+', '', x) if type(x) is str else x
  111. )
  112. # 定义函数:根据资产所属单位获取二级组织机构编码
  113. def get_area_no(x):
  114. second_unit = x['资产所属单位(二级)']
  115. third_unit = x['资产所属单位(三级)']
  116. if '河北' == second_unit:
  117. return '-12'
  118. if '长途通信传输局' == second_unit:
  119. return '-11'
  120. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  121. return '782'
  122. for second_org in second_orgs:
  123. area_name = second_org['name']
  124. area_no = second_org['id']
  125. if area_name in second_unit:
  126. return area_no
  127. raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}')
  128. # 应用 get_area_no 函数,生成二级组织机构编码列
  129. df['area_no'] = df.apply(get_area_no, axis=1)
  130. # 定义函数:根据二级组织机构编码获取对应的名称
  131. def get_area_name(x):
  132. area_no = x['area_no']
  133. second_org = org_map[area_no]
  134. area_name = second_org['name']
  135. return area_name
  136. # 应用 get_area_name 函数,生成二级组织机构名称列
  137. df['area_name'] = df.apply(get_area_name, axis=1)
  138. # 定义函数:根据资产所属单位获取三级组织机构编码
  139. def get_city_no(x):
  140. third_unit = x['资产所属单位(三级)']
  141. area_name = x['area_name']
  142. area_no = x['area_no']
  143. if area_name == '石家庄':
  144. if '矿区' in third_unit:
  145. return 'D0130185'
  146. if '井陉' in third_unit:
  147. return 'D0130121'
  148. if area_name == '秦皇岛':
  149. if '北戴河新区' in third_unit:
  150. return 'D0130185'
  151. if '北戴河' in third_unit:
  152. return 'D0130304'
  153. if area_name == '唐山':
  154. if '滦县' in third_unit:
  155. return 'D0130223'
  156. if '高新技术开发区' in third_unit:
  157. return 'D0130205'
  158. if area_name == '邢台':
  159. if '内丘' in third_unit:
  160. return 'D0130523'
  161. if '任泽' in third_unit:
  162. return 'D0130526'
  163. if area_name == '邯郸':
  164. if '峰峰' in third_unit:
  165. return 'D0130406'
  166. if area_name == '省机动局':
  167. if '沧州' in third_unit:
  168. return 'HECS180'
  169. if '唐山' in third_unit:
  170. return 'HECS181'
  171. if '秦皇岛' in third_unit:
  172. return 'HECS182'
  173. if '廊坊' in third_unit:
  174. return 'HECS183'
  175. if '张家口' in third_unit:
  176. return 'HECS184'
  177. if '邢台' in third_unit:
  178. return 'HECS185'
  179. if '邯郸' in third_unit:
  180. return 'HECS186'
  181. if '保定' in third_unit:
  182. return 'HECS187'
  183. if '石家庄' in third_unit:
  184. return 'HECS188'
  185. if '承德' in third_unit:
  186. return 'HECS189'
  187. if '衡水' in third_unit:
  188. return 'HECS720'
  189. if '雄安' in third_unit:
  190. return 'HECS728'
  191. return 'HECS018'
  192. if '雄安' == area_name:
  193. third_unit = third_unit.replace('雄安新区', '')
  194. third_org_list = third_org_list_map[area_no]
  195. for third_org in third_org_list:
  196. city_name = third_org['name']
  197. if city_name in third_unit:
  198. return third_org['id']
  199. if '沧州' == area_name:
  200. return 'D0130911'
  201. if '唐山' == area_name:
  202. return 'D0130202'
  203. if '秦皇岛' == area_name:
  204. return 'D0130302'
  205. if '廊坊' == area_name:
  206. return 'D0131000'
  207. if '张家口' == area_name:
  208. return 'D0130701'
  209. if '邢台' == area_name:
  210. return 'D0130502'
  211. if '邯郸' == area_name:
  212. return 'D0130402'
  213. if '保定' == area_name:
  214. return 'D0130601'
  215. if '石家庄' == area_name:
  216. return 'D0130186'
  217. if '承德' == area_name:
  218. return 'D0130801'
  219. if '衡水' == area_name:
  220. return 'D0133001'
  221. if '雄安' == area_name:
  222. return 'D0130830'
  223. return 'HE001'
  224. # 应用 get_city_no 函数,生成三级组织机构编码列
  225. df['city_no'] = df.apply(get_city_no, axis=1)
  226. # 定义函数:根据三级组织机构编码获取对应的名称
  227. def get_city_name(x):
  228. city_no = x['city_no']
  229. third_org = org_map[city_no]
  230. city_name = third_org['name']
  231. return city_name
  232. # 应用 get_city_name 函数,生成三级组织机构名称列
  233. df['city_name'] = df.apply(get_city_name, axis=1)
  234. # 定义函数:根据地址和资产所属单位获取城市 ID
  235. def get_city_id(x):
  236. address = x['标准地址']
  237. second_unit = x['资产所属单位(二级)']
  238. third_unit = x['资产所属单位(三级)']
  239. if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
  240. return '133100'
  241. for city in cities:
  242. area_name = city['short_name']
  243. area_id = city['area_id']
  244. if area_name in second_unit or area_name in third_unit or area_name in address:
  245. return area_id
  246. return ''
  247. # 应用 get_city_id 函数,生成城市 ID 列
  248. df['city_id'] = df.apply(get_city_id, axis=1)
  249. # 定义函数:根据城市 ID 获取城市名称
  250. def get_city(x):
  251. city_id = x['city_id']
  252. area = area_map.get(city_id)
  253. if pd.notna(area):
  254. city = area['area_name']
  255. return city
  256. return ''
  257. # 应用 get_city 函数,生成城市名称列
  258. df['city'] = df.apply(get_city, axis=1)
  259. # 定义函数:根据地址和城市信息获取区县 ID
  260. def get_district_id(x):
  261. address = x['标准地址']
  262. city = x['city']
  263. city_id = x['city_id']
  264. if pd.isna(city) or pd.isna(address):
  265. return ''
  266. if city == '石家庄':
  267. if '矿区' in address:
  268. return '130107'
  269. if '井陉' in address:
  270. return '130121'
  271. if city == '唐山':
  272. if '滦县' in address:
  273. return '130284'
  274. if city == '邢台':
  275. if '内邱' in address:
  276. return '130523'
  277. if '任县' in address:
  278. return '130505'
  279. if city == '雄安':
  280. address = address.replace('雄安新区', '')
  281. districts = districts_list_map.get(city_id)
  282. if not districts:
  283. return ''
  284. for district in districts:
  285. district_name = district['short_name']
  286. if district_name in address:
  287. return district['area_id']
  288. return ''
  289. # 应用 get_district_id 函数,生成区县 ID 列
  290. df['district_id'] = df.apply(get_district_id, axis=1)
  291. # 定义函数:根据区县 ID 获取区县名称
  292. def get_district(x):
  293. district_id = x['district_id']
  294. area = area_map.get(district_id)
  295. if pd.notna(area):
  296. district = area['area_name']
  297. return district
  298. return ''
  299. # 应用 get_district 函数,生成区县名称列
  300. df['district'] = df.apply(get_district, axis=1)
  301. def get_int(x):
  302. try:
  303. return int(x)
  304. except Exception:
  305. logger.warning(f"转换失败: {x}")
  306. return ""
  307. df['工位总数'] = df['工位总数'].apply(get_int)
  308. # 在 DataFrame 中插入年月列
  309. df.insert(0, 'year_month', year_month)
  310. df.rename(
  311. columns={'资产所属单位(一级)': 'first_unit', '资产所属单位(二级)': 'second_unit', '资产所属单位(三级)': 'third_unit',
  312. '建筑名称': 'building_name', '标准地址': 'address', '所在楼层': 'floor',
  313. '楼层建筑总面积(㎡)': 'floor_building_area',
  314. '楼层使用总面积(㎡)': 'floor_usable_area', '房间名称': 'room_name', '房间状态': 'room_status',
  315. '租赁类别': 'rent_type', '房间类别(一级)': 'first_room_type', '房间类别(二级)': 'second_room_type',
  316. '工位总数': 'seat_num', '是否临街': 'frontage', '建筑面积(㎡)': 'building_area',
  317. '建筑面积-自用(㎡)': 'building_area_self_use', '建筑面积-闲置(㎡)': 'building_area_idle',
  318. '建筑面积-出租(㎡)': 'building_area_rent', '建筑面积-不可使用(㎡)': 'building_area_unusable',
  319. '使用面积(㎡)': 'usable_area', '使用面积-自用(㎡)': 'usable_area_self_use',
  320. '使用面积-闲置(㎡)': 'usable_area_idle', '使用面积-出租(㎡)': 'usable_area_rent',
  321. '使用面积-不可使用(㎡)': 'usable_area_unusable', '闲置开始时间': 'idle_start_date',
  322. '不可使用原因': 'unusable_reason', '层高(m)': 'floor_height', '承重(kg/㎡)': 'load_bearing'},
  323. inplace=True)
  324. df = df[['year_month', 'first_unit', 'second_unit', 'third_unit', 'building_name', 'address', 'floor',
  325. 'floor_building_area', 'floor_usable_area', 'room_name', 'room_status', 'rent_type',
  326. 'first_room_type', 'second_room_type', 'seat_num', 'frontage', 'building_area',
  327. 'building_area_self_use', 'building_area_idle', 'building_area_rent', 'building_area_unusable',
  328. 'usable_area', 'usable_area_self_use', 'usable_area_idle', 'usable_area_rent', 'usable_area_unusable',
  329. 'idle_start_date', 'unusable_reason', 'floor_height', 'load_bearing', 'area_no', 'area_name',
  330. 'city_no', 'city_name', 'city_id', 'city', 'district_id', 'district']]
  331. # 打印 DataFrame 的信息
  332. df.info()
  333. # 将处理后的数据保存为 CSV 文件
  334. df.to_csv(path_or_buf=output_path, index=False, encoding='utf-8-sig', lineterminator='\n')
  335. def data_import():
  336. # 定义 PowerShell 脚本的路径
  337. script_path = r"../../copy.ps1"
  338. # 目标表和文件信息
  339. table = "house.room_month" # 数据库目标表名
  340. # 表字段列名,用于指定导入数据的列顺序
  341. columns = "year_month,first_unit,second_unit,third_unit,building_name,address,floor,floor_building_area,floor_usable_area,room_name,room_status,rent_type,first_room_type,second_room_type,seat_num,frontage,building_area,building_area_self_use,building_area_idle,building_area_rent,building_area_unusable,usable_area,usable_area_self_use,usable_area_idle,usable_area_rent,usable_area_unusable,idle_start_date,unusable_reason,floor_height,load_bearing,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
  342. # 构造执行 PowerShell 脚本的命令
  343. command = f"powershell -NoProfile -NonInteractive -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  344. # 打印生成的命令,方便调试和日志记录
  345. logger.info("command: {}", command)
  346. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  347. completed_process = subprocess.run(
  348. command, # 执行的命令
  349. check=False, # 如果命令执行失败,不抛出异常
  350. text=True, # 将输出作为字符串处理
  351. capture_output=True, # 捕获标准输出和标准错误
  352. )
  353. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  354. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  355. completed_process.stderr)
  356. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  357. p = re.compile(r"^(COPY) (\d+)$")
  358. count = None # 初始化计数变量
  359. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  360. if matcher:
  361. count = int(matcher.group(2)) # 提取导入的数据行数
  362. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  363. if count is None:
  364. raise RuntimeError("导入数据失败")
  365. def upload_file():
  366. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  367. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  368. with paramiko.SSHClient() as ssh:
  369. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  370. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  371. # 连接到远程主机,传入主机地址、端口、用户名和密码
  372. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  373. # 执行远程命令,创建远程目录(如果不存在)
  374. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  375. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  376. with ssh.open_sftp() as sftp:
  377. # 记录日志,提示即将上传的本地文件和远程目标路径
  378. logger.info("upload {} to {}", input_path, remote_path)
  379. # 使用SFTP的put方法将本地文件上传到远程主机
  380. sftp.put(input_path, remote_path)
  381. # 记录日志,提示文件已成功上传
  382. logger.info("uploaded {}", input_path)
  383. def data_update():
  384. with psycopg.connect(
  385. conninfo=conn_info,
  386. ) as conn:
  387. with conn.cursor() as curs:
  388. # 更新人均办公面积
  389. sql = f"""
  390. with
  391. t100 as (
  392. select
  393. id as area_no,
  394. name as area_name,
  395. order_num as area_order
  396. from
  397. common.organization
  398. where
  399. id in ('-11', '-12')
  400. ),
  401. t101 as (
  402. select
  403. area_no,
  404. sum(building_area_self_use) as building_area_self_use_sum
  405. from
  406. house.room_month
  407. where
  408. second_room_type = '办公用房'
  409. and year_month = {year_month}
  410. and area_no in ('-11', '-12')
  411. group by
  412. area_no
  413. ),
  414. t102 as (
  415. select
  416. *
  417. from
  418. house.staff_second_unit
  419. where
  420. year_month = (
  421. select
  422. max(year_month)
  423. from
  424. house.staff_second_unit)
  425. and area_no in ('-11', '-12')
  426. ),
  427. t103 as (
  428. select
  429. t100.area_no,
  430. t100.area_name,
  431. '' as city_no,
  432. '' as city_name,
  433. t101.building_area_self_use_sum,
  434. t102.total,
  435. t100.area_order,
  436. 0 as city_order
  437. from
  438. t100
  439. left join t101 on
  440. t100.area_no = t101.area_no
  441. left join t102 on
  442. t100.area_no = t102.area_no
  443. ),
  444. t200 as (
  445. select
  446. b.id as area_no,
  447. b.name as area_name,
  448. a.id as city_no,
  449. a.name as city_name,
  450. b.order_num as area_order,
  451. a.order_num as city_order
  452. from
  453. common.organization a
  454. left join common.organization b on
  455. a.parent_id = b.id
  456. where
  457. a.unhide = 1
  458. and a.grade = 2
  459. and a.parent_id not in ('-11', '-12')
  460. order by
  461. b.id,
  462. a.id
  463. ),
  464. t201 as (
  465. select
  466. area_no,
  467. city_no,
  468. sum(building_area_self_use) as building_area_self_use_sum
  469. from
  470. house.room_month
  471. where
  472. second_room_type = '办公用房'
  473. and area_no not in ('-11', '-12')
  474. and year_month = {year_month}
  475. group by
  476. area_no,
  477. city_no
  478. ),
  479. t202 as (
  480. select
  481. *
  482. from
  483. house.staff_third_unit
  484. where
  485. year_month = (
  486. select
  487. max(year_month)
  488. from
  489. house.staff_third_unit)
  490. and area_no not in ('-11', '-12')
  491. ),
  492. t203 as (
  493. select
  494. t200.area_no,
  495. t200.area_name,
  496. t200.city_no,
  497. t200.city_name,
  498. t201.building_area_self_use_sum,
  499. t202.total,
  500. t200.area_order,
  501. t200.city_order
  502. from
  503. t200
  504. left join t201 on
  505. t200.area_no = t201.area_no
  506. and t200.city_no = t201.city_no
  507. left join t202 on
  508. t200.area_no = t202.area_no
  509. and t200.city_no = t202.city_no
  510. ),
  511. t301 as (
  512. select
  513. *
  514. from
  515. t103
  516. union all
  517. select
  518. *
  519. from
  520. t203
  521. )
  522. insert
  523. into
  524. house.building_office_area_stat
  525. (
  526. year_month,
  527. area_no,
  528. area_name,
  529. city_no,
  530. city_name,
  531. building_area_self_use_sum,
  532. total,
  533. area_avg,
  534. area_order,
  535. city_order
  536. )
  537. select
  538. {year_month} as year_month,
  539. area_no,
  540. area_name,
  541. city_no,
  542. city_name,
  543. coalesce(building_area_self_use_sum, 0) as building_area_self_use_sum,
  544. coalesce(total, 0) as total,
  545. case
  546. when total = 0 then null
  547. else round(coalesce(building_area_self_use_sum, 0) / total, 2)
  548. end as area_avg,
  549. area_order,
  550. city_order
  551. from
  552. t301
  553. order by
  554. area_order,
  555. city_order
  556. """
  557. logger.info(f"sql: {sql}")
  558. curs.execute(sql)
  559. logger.info(f"update {curs.rowcount}")
  560. data_process()
  561. data_import()
  562. upload_file()
  563. data_update()