house_land.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458
  1. """不动产土地数据处理
  2. """
  3. # 导入必要的库
  4. import re # 用于正则表达式处理
  5. from datetime import datetime # 用于日期时间操作
  6. from dateutil.relativedelta import relativedelta # 用于相对日期计算
  7. from loguru import logger # 日志记录工具
  8. import pandas as pd # 数据处理库
  9. import psycopg # PostgreSQL数据库连接库
  10. import subprocess
  11. import paramiko
  12. # 配置日志记录器,将日志写入文件a.log
  13. logger.add(sink='a.log')
  14. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  15. ssh_port = 22 # 定义SSH服务的端口号
  16. ssh_username = 'app' # 定义登录远程主机的用户名
  17. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  18. # 服务器文件夹路径
  19. remote_dir_path = '/data/history/house/land/'
  20. # 数据库连接信息
  21. db_host = "172.16.107.5" # 数据库主机地址
  22. db_port = 5432 # 数据库端口号
  23. db_username = "finance" # 数据库用户名
  24. db_password = "Finance@unicom23" # 数据库密码
  25. dbname = "financialdb" # 数据库名称
  26. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  27. # 获取当前日期,并计算上个月的第一天
  28. today = datetime.today()
  29. start_date = today - relativedelta(months=1, day=1)
  30. year_month = start_date.strftime('%Y%m')
  31. # 数据文件路径
  32. input_path = 'data.xlsx'
  33. # 输出文件路径
  34. output_path = 'output.csv'
  35. def data_process():
  36. org_map = {} # 组织机构ID到组织机构信息的映射
  37. third_org_list_map = {} # 二级组织机构ID到其三级子组织列表的映射
  38. area_map = {} # 区域ID到区域信息的映射
  39. districts_list_map = {} # 城市ID到其区县列表的映射
  40. # 连接到PostgreSQL数据库
  41. with psycopg.connect(
  42. conninfo=conn_info,
  43. row_factory=psycopg.rows.dict_row # 使用字典格式返回查询结果
  44. ) as conn:
  45. with conn.cursor() as curs:
  46. # 查询所有一级组织机构(grade=1)
  47. sql = """
  48. select * from common.organization where grade = 1
  49. """
  50. logger.info(f"sql: {sql}") # 记录SQL语句到日志
  51. curs.execute(sql)
  52. second_orgs = curs.fetchall() # 获取所有一级组织机构
  53. for x in second_orgs:
  54. third_org_list_map[x['id']] = [] # 初始化二级组织机构的三级子组织列表
  55. # 查询所有组织机构
  56. sql = """
  57. select * from common.organization
  58. """
  59. logger.info(f"sql: {sql}")
  60. curs.execute(sql)
  61. orgs = curs.fetchall()
  62. for x in orgs:
  63. if x['parent_id'] in third_org_list_map:
  64. # 如果该组织的父级是二级组织,则将其加入对应的三级子组织列表
  65. third_org_list_map[x['parent_id']].append(x)
  66. org_map[x['id']] = x # 将组织机构信息存入全局映射
  67. # 查询所有一级区域(area_grade=1)
  68. sql = """
  69. select * from common.area where area_grade = 1 order by area_id
  70. """
  71. logger.info(f"sql: {sql}")
  72. curs.execute(sql)
  73. cities = curs.fetchall() # 获取所有一级区域(城市)
  74. for x in cities:
  75. districts_list_map[x['area_id']] = [] # 初始化城市的区县列表
  76. # 查询所有区域
  77. sql = """
  78. select * from common.area
  79. """
  80. logger.info(f"sql: {sql}")
  81. curs.execute(sql)
  82. areas = curs.fetchall()
  83. for x in areas:
  84. if x['parent_id'] in districts_list_map:
  85. # 如果该区域的父级是城市,则将其加入对应城市的区县列表
  86. districts_list_map[x['parent_id']].append(x)
  87. area_map[x['area_id']] = x # 将区域信息存入全局映射
  88. # 读取Excel文件中的数据
  89. df = pd.read_excel(io=input_path)
  90. # 获取当前 DataFrame 的列名列表
  91. columns = df.columns.tolist()
  92. # 定义所需的字段列表
  93. required_columns = ['资产所属单位(一级)', '资产所属单位(二级)', '资产所属单位(三级)', '土地别名', '盘点情况',
  94. '盘点状态', '是否修改', '是否存在待核对信息', '土地编号', '土地ID', '土地性质', '使用权类型',
  95. '地类用途', '取得日期', '闲置开始时间', '局址别名', '局址ID', '标准地址', '投资主体',
  96. '管理层级', '权属状态', '使用状态', '土地总面积(㎡)', '土地自用面积(㎡)', '土地闲置面积(㎡)',
  97. '土地出租面积(㎡)', '土地不可使用面积(㎡)', '是否有土地证', '无土地证原因', '是否有保地风险',
  98. '是否独立成宗土地', '是否空地', '是否有院落', '是否有资产卡片', '资产编号', '资产标签号',
  99. '责任部门', '责任人', '经度', '纬度', '实际产权人', '特殊说明']
  100. # 检查是否有缺失的字段
  101. missing_columns = [col for col in required_columns if col not in columns]
  102. # 检查是否有多余的字段
  103. ex_columns = [col for col in columns if col not in required_columns]
  104. # 如果存在缺失字段,则抛出运行时错误并提示缺少哪些字段
  105. if missing_columns or ex_columns:
  106. raise RuntimeError(f"缺少以下字段: {missing_columns};存在以下多余字段:{ex_columns}")
  107. # 删除字符串字段中的空白字符
  108. df = df.map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
  109. # 去重:根据“土地ID”列去重,保留最后一条记录
  110. df.drop_duplicates(subset=['土地ID'], keep='last', inplace=True)
  111. # 定义函数:获取二级组织机构编码
  112. def get_area_no(x):
  113. second_unit = x['资产所属单位(二级)']
  114. third_unit = x['资产所属单位(三级)']
  115. if '河北' == second_unit:
  116. return '-12'
  117. if '长途通信传输局' == second_unit:
  118. return '-11'
  119. if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
  120. return '782'
  121. for second_org in second_orgs:
  122. area_name = second_org['name']
  123. area_no = second_org['id']
  124. if area_name in second_unit:
  125. return area_no
  126. raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}')
  127. # 应用函数,生成“二级组织机构编码”列
  128. df['area_no'] = df.apply(get_area_no, axis=1)
  129. # 定义函数:获取二级组织机构名称
  130. def get_area_name(x):
  131. area_no = x['area_no']
  132. second_org = org_map[area_no]
  133. area_name = second_org['name']
  134. return area_name
  135. # 应用函数,生成“二级组织机构名称”列
  136. df['area_name'] = df.apply(get_area_name, axis=1)
  137. # 定义函数:获取三级组织机构编码
  138. def get_city_no(x):
  139. third_unit = x['资产所属单位(三级)']
  140. area_name = x['area_name']
  141. area_no = x['area_no']
  142. if area_name == '石家庄':
  143. if '矿区' in third_unit:
  144. return 'D0130185'
  145. if '井陉' in third_unit:
  146. return 'D0130121'
  147. if area_name == '秦皇岛':
  148. if '北戴河新区' in third_unit:
  149. return 'D0130185'
  150. if '北戴河' in third_unit:
  151. return 'D0130304'
  152. if area_name == '唐山':
  153. if '滦县' in third_unit:
  154. return 'D0130223'
  155. if '高新技术开发区' in third_unit:
  156. return 'D0130205'
  157. if area_name == '邢台':
  158. if '内丘' in third_unit:
  159. return 'D0130523'
  160. if '任泽' in third_unit:
  161. return 'D0130526'
  162. if area_name == '邯郸':
  163. if '峰峰' in third_unit:
  164. return 'D0130406'
  165. if area_name == '省机动局':
  166. if '沧州' in third_unit:
  167. return 'HECS180'
  168. if '唐山' in third_unit:
  169. return 'HECS181'
  170. if '秦皇岛' in third_unit:
  171. return 'HECS182'
  172. if '廊坊' in third_unit:
  173. return 'HECS183'
  174. if '张家口' in third_unit:
  175. return 'HECS184'
  176. if '邢台' in third_unit:
  177. return 'HECS185'
  178. if '邯郸' in third_unit:
  179. return 'HECS186'
  180. if '保定' in third_unit:
  181. return 'HECS187'
  182. if '石家庄' in third_unit:
  183. return 'HECS188'
  184. if '承德' in third_unit:
  185. return 'HECS189'
  186. if '衡水' in third_unit:
  187. return 'HECS720'
  188. if '雄安' in third_unit:
  189. return 'HECS728'
  190. return 'HECS018'
  191. if '雄安' == area_name:
  192. third_unit = third_unit.replace('雄安新区', '')
  193. third_org_list = third_org_list_map[area_no]
  194. for third_org in third_org_list:
  195. city_name = third_org['name']
  196. if city_name in third_unit:
  197. return third_org['id']
  198. if '沧州' == area_name:
  199. return 'D0130911'
  200. if '唐山' == area_name:
  201. return 'D0130202'
  202. if '秦皇岛' == area_name:
  203. return 'D0130302'
  204. if '廊坊' == area_name:
  205. return 'D0131000'
  206. if '张家口' == area_name:
  207. return 'D0130701'
  208. if '邢台' == area_name:
  209. return 'D0130502'
  210. if '邯郸' == area_name:
  211. return 'D0130402'
  212. if '保定' == area_name:
  213. return 'D0130601'
  214. if '石家庄' == area_name:
  215. return 'D0130186'
  216. if '承德' == area_name:
  217. return 'D0130801'
  218. if '衡水' == area_name:
  219. return 'D0133001'
  220. if '雄安' == area_name:
  221. return 'D0130830'
  222. return 'HE001'
  223. # 应用函数,生成“三级组织机构编码”列
  224. df['city_no'] = df.apply(get_city_no, axis=1)
  225. # 定义函数:获取三级组织机构名称
  226. def get_city_name(x):
  227. city_no = x['city_no']
  228. third_org = org_map[city_no]
  229. city_name = third_org['name']
  230. return city_name
  231. # 应用函数,生成“三级组织机构名称”列
  232. df['city_name'] = df.apply(get_city_name, axis=1)
  233. # 定义函数:获取城市ID
  234. def get_city_id(x):
  235. address = x['标准地址']
  236. second_unit = x['资产所属单位(二级)']
  237. third_unit = x['资产所属单位(三级)']
  238. if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
  239. return '133100'
  240. for city in cities:
  241. area_name = city['short_name']
  242. area_id = city['area_id']
  243. if area_name in second_unit:
  244. return area_id
  245. if area_name in third_unit:
  246. return area_id
  247. if area_name in address:
  248. return area_id
  249. return ''
  250. # 应用函数,生成“city_id”列
  251. df['city_id'] = df.apply(get_city_id, axis=1)
  252. # 定义函数:获取城市名称
  253. def get_city(x):
  254. city_id = x['city_id']
  255. area = area_map.get(city_id)
  256. if pd.notna(area):
  257. city = area['area_name']
  258. return city
  259. return ''
  260. # 应用函数,生成“city”列
  261. df['city'] = df.apply(get_city, axis=1)
  262. # 定义函数:获取区县ID
  263. def get_district_id(x):
  264. address = x['标准地址']
  265. city = x['city']
  266. city_id = x['city_id']
  267. if pd.isna(city) or pd.isna(address):
  268. return ''
  269. if city == '石家庄':
  270. if '矿区' in address:
  271. return '130107'
  272. if '井陉' in address:
  273. return '130121'
  274. if city == '唐山':
  275. if '滦县' in address:
  276. return '130284'
  277. if city == '邢台':
  278. if '内邱' in address:
  279. return '130523'
  280. if '任县' in address:
  281. return '130505'
  282. if city == '雄安':
  283. address = address.replace('雄安新区', '')
  284. districts = districts_list_map.get(city_id)
  285. if not districts:
  286. return ''
  287. for district in districts:
  288. district_name = district['short_name']
  289. if district_name in address:
  290. return district['area_id']
  291. return ''
  292. # 应用函数,生成“district_id”列
  293. df['district_id'] = df.apply(get_district_id, axis=1)
  294. # 定义函数:获取区县名称
  295. def get_district(x):
  296. district_id = x['district_id']
  297. area = area_map.get(district_id)
  298. if pd.notna(area):
  299. district = area['area_name']
  300. return district
  301. return ''
  302. # 应用函数,生成“district”列
  303. df['district'] = df.apply(get_district, axis=1)
  304. # 在DataFrame中插入“年月”列
  305. df.insert(0, 'year_month', year_month)
  306. df.rename(
  307. columns={'资产所属单位(一级)': 'first_unit', '资产所属单位(二级)': 'second_unit', '资产所属单位(三级)': 'third_unit',
  308. '土地别名': 'land_name', '盘点情况': 'inventory_situation', '盘点状态': 'inventory_status',
  309. '是否修改': 'modify', '是否存在待核对信息': 'to_be_verified', '土地编号': 'land_no', '土地ID': 'land_id',
  310. '土地性质': 'land_ownership', '使用权类型': 'use_right_type', '地类用途': 'land_use',
  311. '取得日期': 'acquisition_date', '闲置开始时间': 'idle_start_date', '局址别名': 'site_name',
  312. '局址ID': 'site_id', '标准地址': 'address', '投资主体': 'investor', '管理层级': 'management_level',
  313. '权属状态': 'ownership_status', '使用状态': 'usage_status', '土地总面积(㎡)': 'total_land_area',
  314. '土地自用面积(㎡)': 'land_area_self_use', '土地闲置面积(㎡)': 'land_area_idle',
  315. '土地出租面积(㎡)': 'land_area_rent', '土地不可使用面积(㎡)': 'land_area_unusable',
  316. '是否有土地证': 'has_land_deed', '无土地证原因': 'no_land_deed_reason',
  317. '是否有保地风险': 'land_preservation_risk', '是否独立成宗土地': 'independent_parcel_of_land',
  318. '是否空地': 'open_space', '是否有院落': 'courtyard', '是否有资产卡片': 'has_asset_card',
  319. '资产编号': 'assets_num', '资产标签号': 'assets_tag_num', '责任部门': 'responsible_department',
  320. '责任人': 'person_in_charge', '经度': 'lng_jt', '纬度': 'lat_jt', '实际产权人': 'property_owner',
  321. '特殊说明': 'special_specification'}, inplace=True)
  322. df = df[['year_month', 'first_unit', 'second_unit', 'third_unit', 'land_name', 'inventory_situation',
  323. 'inventory_status', 'modify', 'to_be_verified', 'land_no', 'land_id', 'land_ownership',
  324. 'use_right_type', 'land_use', 'acquisition_date', 'idle_start_date', 'site_name', 'site_id', 'address',
  325. 'investor', 'management_level', 'ownership_status', 'usage_status', 'total_land_area',
  326. 'land_area_self_use', 'land_area_idle', 'land_area_rent', 'land_area_unusable', 'has_land_deed',
  327. 'no_land_deed_reason', 'land_preservation_risk', 'independent_parcel_of_land', 'open_space', 'courtyard',
  328. 'has_asset_card', 'assets_num', 'assets_tag_num', 'responsible_department', 'person_in_charge', 'lng_jt',
  329. 'lat_jt','property_owner', 'special_specification', 'area_no', 'area_name', 'city_no', 'city_name',
  330. 'city_id', 'city', 'district_id', 'district']]
  331. # 打印DataFrame的基本信息
  332. df.info()
  333. # 将处理后的数据保存为CSV文件
  334. df.to_csv(path_or_buf=output_path, index=False, encoding='utf-8-sig', lineterminator='\n')
  335. def data_import():
  336. # 定义 PowerShell 脚本的路径
  337. script_path = r"../../copy.ps1"
  338. # 目标表和文件信息
  339. table = "house.land_month" # 数据库目标表名
  340. # 表字段列名,用于指定导入数据的列顺序
  341. columns = "year_month,first_unit,second_unit,third_unit,land_name,inventory_situation,inventory_status,modify,to_be_verified,land_no,land_id,land_ownership,use_right_type,land_use,acquisition_date,idle_start_date,site_name,site_id,address,investor,management_level,ownership_status,usage_status,total_land_area,land_area_self_use,land_area_idle,land_area_rent,land_area_unusable,has_land_deed,no_land_deed_reason,land_preservation_risk,independent_parcel_of_land,open_space,courtyard,has_asset_card,assets_num,assets_tag_num,responsible_department,person_in_charge,lng_jt,lat_jt,property_owner,special_specification,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
  342. # 构造执行 PowerShell 脚本的命令
  343. command = f"powershell -NoProfile -NonInteractive -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  344. # 打印生成的命令,方便调试和日志记录
  345. logger.info("command: {}", command)
  346. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  347. completed_process = subprocess.run(
  348. command, # 执行的命令
  349. check=False, # 如果命令执行失败,不抛出异常
  350. text=True, # 将输出作为字符串处理
  351. capture_output=True, # 捕获标准输出和标准错误
  352. )
  353. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  354. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  355. completed_process.stderr)
  356. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  357. p = re.compile(r"^(COPY) (\d+)$")
  358. count = None # 初始化计数变量
  359. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  360. if matcher:
  361. count = int(matcher.group(2)) # 提取导入的数据行数
  362. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  363. if count is None:
  364. raise RuntimeError("导入数据失败")
  365. def upload_file():
  366. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  367. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  368. with paramiko.SSHClient() as ssh:
  369. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  370. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  371. # 连接到远程主机,传入主机地址、端口、用户名和密码
  372. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  373. # 执行远程命令,创建远程目录(如果不存在)
  374. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  375. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  376. with ssh.open_sftp() as sftp:
  377. # 记录日志,提示即将上传的本地文件和远程目标路径
  378. logger.info("upload {} to {}", input_path, remote_path)
  379. # 使用SFTP的put方法将本地文件上传到远程主机
  380. sftp.put(input_path, remote_path)
  381. # 记录日志,提示文件已成功上传
  382. logger.info("uploaded {}", input_path)
  383. def data_update():
  384. with psycopg.connect(
  385. conninfo=conn_info,
  386. ) as conn:
  387. with conn.cursor() as curs:
  388. # 更新局址编号
  389. sql = f"""
  390. update
  391. house.land_month a
  392. set
  393. site_num = b.site_num
  394. from
  395. house.site_month b
  396. where
  397. a.site_id = b.site_id
  398. and a.year_month = b.year_month
  399. and a.year_month = {year_month}
  400. """
  401. logger.info(f"sql: {sql}")
  402. curs.execute(sql)
  403. logger.info(f"update {curs.rowcount}")
  404. data_process()
  405. data_import()
  406. upload_file()
  407. data_update()