house_zu_ru_he_tong.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. """不动产租入合同数据处理
  2. """
  3. import re # 导入正则表达式模块,用于字符串处理
  4. import decimal # 导入decimal模块,用于高精度的数值计算
  5. import subprocess
  6. from datetime import datetime # 导入datetime模块,用于日期和时间操作
  7. from dateutil.relativedelta import relativedelta # 导入relativedelta模块,用于日期之间的相对差异计算
  8. from loguru import logger # 导入loguru模块,用于日志记录
  9. import pandas as pd # 导入pandas模块,用于数据处理和分析
  10. import psycopg # 导入psycopg模块,用于连接PostgreSQL数据库
  11. import paramiko
  12. # 配置日志记录器,将日志写入文件a.log
  13. logger.add(sink='a.log')
  14. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  15. ssh_port = 22 # 定义SSH服务的端口号
  16. ssh_username = 'app' # 定义登录远程主机的用户名
  17. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  18. # 服务器文件夹路径
  19. remote_dir_path = '/data/history/house/zu-ru-he-tong/'
  20. # 数据库连接信息
  21. db_host = "172.16.107.5" # 数据库主机地址
  22. db_port = 5432 # 数据库端口号
  23. db_username = "finance" # 数据库用户名
  24. db_password = "Finance@unicom23" # 数据库密码
  25. dbname = "financialdb" # 数据库名称
  26. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  27. # 获取当前日期,并计算上个月的第一天
  28. today = datetime.today()
  29. start_date = today - relativedelta(months=1, day=1)
  30. year_month = start_date.strftime('%Y%m')
  31. # 数据文件路径
  32. input_path = 'data.xlsx'
  33. # 输出文件路径
  34. output_path = 'output.csv'
  35. def data_process():
  36. org_map = {} # 存储所有组织机构的ID与详细信息的映射
  37. third_org_list_map = {} # 存储二级组织机构与其下属三级组织机构的映射
  38. area_map = {} # 存储所有区域的ID与详细信息的映射
  39. districts_list_map = {} # 存储一级区域与其下属子区域的映射
  40. # 连接到PostgreSQL数据库,并使用字典格式返回查询结果
  41. with psycopg.connect(
  42. conninfo=conn_info,
  43. row_factory=psycopg.rows.dict_row # 使用字典格式返回查询结果
  44. ) as conn:
  45. with conn.cursor() as curs:
  46. # 查询grade为1的组织机构(二级组织机构)
  47. sql = """
  48. select * from common.organization where grade = 1
  49. """
  50. logger.info(f"sql: {sql}") # 记录SQL语句到日志
  51. curs.execute(sql)
  52. second_orgs = curs.fetchall()
  53. for x in second_orgs:
  54. third_org_list_map[x['id']] = [] # 初始化每个二级组织机构的三级组织机构列表
  55. # 查询所有组织机构
  56. sql = """
  57. select * from common.organization
  58. """
  59. logger.info(f"sql: {sql}") # 记录SQL语句到日志
  60. curs.execute(sql)
  61. orgs = curs.fetchall()
  62. for x in orgs:
  63. if x['parent_id'] in third_org_list_map:
  64. third_org_list_map[x['parent_id']].append(x) # 将三级组织机构添加到对应二级组织机构的列表中
  65. org_map[x['id']] = x # 将组织机构ID与详细信息存入org_map
  66. # 查询area_grade为1的区域(一级区域)
  67. sql = """
  68. select * from common.area where area_grade = 1 order by area_id
  69. """
  70. logger.info(f"sql: {sql}") # 记录SQL语句到日志
  71. curs.execute(sql)
  72. cities = curs.fetchall()
  73. for x in cities:
  74. districts_list_map[x['area_id']] = [] # 初始化每个一级区域的子区域列表
  75. # 查询所有区域
  76. sql = """
  77. select * from common.area
  78. """
  79. logger.info(f"sql: {sql}") # 记录SQL语句到日志
  80. curs.execute(sql)
  81. areas = curs.fetchall()
  82. for x in areas:
  83. if x['parent_id'] in districts_list_map:
  84. districts_list_map[x['parent_id']].append(x) # 将子区域添加到对应一级区域的列表中
  85. area_map[x['area_id']] = x # 将区域ID与详细信息存入area_map
  86. # 读取Excel文件中的数据,并跳过第一行
  87. df = pd.read_excel(io=input_path, skiprows=1)
  88. # 获取当前 DataFrame 的列名列表
  89. columns = df.columns.tolist()
  90. # 定义所需的字段列表
  91. required_columns = ['序号', '数据编码', '租入房屋名称', '房屋所有权人属性', '租入形式', '标准地址(一级)',
  92. '标准地址(二级)', '标准地址(三级)', '街/路/村门牌号', '城市区域', '地段', '地址经度坐标',
  93. '地址纬度坐标', '租入建筑面积(平米)', '租入使用面积(平米)', '投资主体', '使用单位层级',
  94. '使用单位隶属的省级公司', '使用单位隶属的地市级公司', '使用单位隶属的区县级公司', '使用专业线',
  95. '租入实际用途', '租入其他用途说明', '自用建筑面积(平米)', '转租建筑面积(平米)', '首次租用时间',
  96. '合同编号', '合同名称', '合同类型', '签订时间', '我方签约主体', '出租方名称',
  97. '合同总金额(含税)(元)', '增值税金额(元)', '租入开始时间(合同生效时间)',
  98. '租入终止时间(合同终止时间)', '承办部门', '承办人', '联系电话', '累计计提金额(元)',
  99. '费用报账总金额(元)', '合同性质', '合同状态']
  100. # 检查是否有缺失的字段
  101. missing_columns = [col for col in required_columns if col not in columns]
  102. # 检查是否有多余的字段
  103. ex_columns = [col for col in columns if col not in required_columns]
  104. # 如果存在缺失字段,则抛出运行时错误并提示缺少哪些字段
  105. if missing_columns or ex_columns:
  106. raise RuntimeError(f"缺少以下字段: {missing_columns};存在以下多余字段:{ex_columns}")
  107. # 删除指定列中的空白字符
  108. columns_to_clean = list(filter(lambda x: x not in ('签订时间'), df.columns)) # 排除“签订时间”列
  109. df[columns_to_clean] = df[columns_to_clean].map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
  110. def get_area_no(x):
  111. """根据使用单位隶属的地市级公司名称获取二级组织机构编码"""
  112. second_unit = x['使用单位隶属的地市级公司']
  113. if '河北' == second_unit:
  114. return '-12'
  115. if '长途通信传输局' == second_unit:
  116. return '-11'
  117. for second_org in second_orgs:
  118. area_name = second_org['name']
  119. area_no = second_org['id']
  120. if area_name in second_unit:
  121. return area_no
  122. raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}')
  123. df['area_no'] = df.apply(get_area_no, axis=1)
  124. def get_area_name(x):
  125. """根据二级组织机构编码获取二级组织机构名称"""
  126. area_no = x['area_no']
  127. second_org = org_map[area_no]
  128. area_name = second_org['name']
  129. return area_name
  130. df['area_name'] = df.apply(get_area_name, axis=1)
  131. def get_city_no(x):
  132. """根据使用单位隶属的区县级公司名称获取三级组织机构编码"""
  133. third_unit = x['使用单位隶属的区县级公司']
  134. area_name = x['area_name']
  135. area_no = x['area_no']
  136. if area_name == '石家庄':
  137. if '矿区' in third_unit:
  138. return 'D0130185'
  139. if '井陉' in third_unit:
  140. return 'D0130121'
  141. if area_name == '秦皇岛':
  142. if '北戴河新区' in third_unit:
  143. return 'D0130185'
  144. if '北戴河' in third_unit:
  145. return 'D0130304'
  146. if area_name == '唐山':
  147. if '滦县' in third_unit:
  148. return 'D0130223'
  149. if '高新技术开发区' in third_unit:
  150. return 'D0130205'
  151. if area_name == '邢台':
  152. if '内丘' in third_unit:
  153. return 'D0130523'
  154. if '任泽' in third_unit:
  155. return 'D0130526'
  156. if area_name == '邯郸':
  157. if '峰峰' in third_unit:
  158. return 'D0130406'
  159. if area_name == '省机动局':
  160. if '沧州' in third_unit:
  161. return 'HECS180'
  162. if '唐山' in third_unit:
  163. return 'HECS181'
  164. if '秦皇岛' in third_unit:
  165. return 'HECS182'
  166. if '廊坊' in third_unit:
  167. return 'HECS183'
  168. if '张家口' in third_unit:
  169. return 'HECS184'
  170. if '邢台' in third_unit:
  171. return 'HECS185'
  172. if '邯郸' in third_unit:
  173. return 'HECS186'
  174. if '保定' in third_unit:
  175. return 'HECS187'
  176. if '石家庄' in third_unit:
  177. return 'HECS188'
  178. if '承德' in third_unit:
  179. return 'HECS189'
  180. if '衡水' in third_unit:
  181. return 'HECS720'
  182. if '雄安' in third_unit:
  183. return 'HECS728'
  184. return 'HECS018'
  185. if '雄安' == area_name:
  186. third_unit = third_unit.replace('雄安新区', '')
  187. third_org_list = third_org_list_map[area_no]
  188. for third_org in third_org_list:
  189. city_name = third_org['name']
  190. if city_name in third_unit:
  191. return third_org['id']
  192. if '沧州' == area_name:
  193. return 'D0130911'
  194. if '唐山' == area_name:
  195. return 'D0130202'
  196. if '秦皇岛' == area_name:
  197. return 'D0130302'
  198. if '廊坊' == area_name:
  199. return 'D0131000'
  200. if '张家口' == area_name:
  201. return 'D0130701'
  202. if '邢台' == area_name:
  203. return 'D0130502'
  204. if '邯郸' == area_name:
  205. return 'D0130402'
  206. if '保定' == area_name:
  207. return 'D0130601'
  208. if '石家庄' == area_name:
  209. return 'D0130186'
  210. if '承德' == area_name:
  211. return 'D0130801'
  212. if '衡水' == area_name:
  213. return 'D0133001'
  214. if '雄安' == area_name:
  215. return 'D0130830'
  216. return 'HE001'
  217. df['city_no'] = df.apply(get_city_no, axis=1)
  218. def get_city_name(x):
  219. """根据三级组织机构编码获取三级组织机构名称"""
  220. city_no = x['city_no']
  221. third_org = org_map[city_no]
  222. city_name = third_org['name']
  223. return city_name
  224. df['city_name'] = df.apply(get_city_name, axis=1)
  225. def get_rent_months(x):
  226. """根据租入开始时间和终止时间计算租期月数"""
  227. rent_start_date = x['租入开始时间(合同生效时间)']
  228. rent_end_date = x['租入终止时间(合同终止时间)']
  229. if pd.isna(rent_start_date) or pd.isna(rent_end_date):
  230. return ''
  231. rent_start_date = pd.to_datetime(rent_start_date)
  232. rent_end_date = pd.to_datetime(rent_end_date)
  233. delta = relativedelta(rent_end_date, rent_start_date)
  234. rent_months = delta.years * 12 + delta.months + (1 if delta.days > 0 else 0)
  235. return rent_months
  236. df['rent_months'] = df.apply(get_rent_months, axis=1)
  237. def get_gross_amount_month(x):
  238. """根据合同总金额和租期月数计算月含税合同额"""
  239. gross_amount = x['合同总金额(含税)(元)']
  240. rent_months = x['rent_months']
  241. if pd.notna(gross_amount) and pd.notna(rent_months) and rent_months and rent_months > 0:
  242. return (decimal.Decimal(gross_amount) / decimal.Decimal(rent_months)).quantize(decimal.Decimal('0.00'))
  243. return None
  244. df['gross_amount_month'] = df.apply(get_gross_amount_month, axis=1)
  245. def get_unit_price(x):
  246. """根据租入建筑面积和月含税合同额计算每平米单价"""
  247. building_area = x['租入建筑面积(平米)']
  248. gross_amount_month = x['gross_amount_month']
  249. if pd.notna(building_area) and pd.notna(gross_amount_month) and building_area > 0 and gross_amount_month > 0:
  250. return (decimal.Decimal(gross_amount_month) / decimal.Decimal(building_area)).quantize(
  251. decimal.Decimal('0.00'))
  252. return None
  253. df['unit_price'] = df.apply(get_unit_price, axis=1)
  254. def get_rent_years(x):
  255. """根据租期月数计算租期年数"""
  256. rent_months = x['rent_months']
  257. if pd.isna(rent_months) or not rent_months:
  258. return None
  259. return (decimal.Decimal(rent_months) / decimal.Decimal('12')).quantize(decimal.Decimal('0.00'))
  260. df['rent_years'] = df.apply(get_rent_years, axis=1)
  261. def get_unit_price2(x):
  262. """根据合同总金额、租入建筑面积和租期年数计算另一种每平米单价"""
  263. gross_amount = x['合同总金额(含税)(元)']
  264. building_area = x['租入建筑面积(平米)']
  265. rent_years = x['rent_years']
  266. if pd.notna(building_area) and pd.notna(gross_amount) and pd.notna(
  267. rent_years) and building_area > 0 and gross_amount > 0 and rent_years > 0:
  268. return (decimal.Decimal(gross_amount) / decimal.Decimal(building_area) / decimal.Decimal(
  269. rent_years) / decimal.Decimal(12)).quantize(decimal.Decimal('0.00'))
  270. return None
  271. df['unit_price2'] = df.apply(get_unit_price2, axis=1)
  272. def remove_extra_dots(s):
  273. if pd.isna(s) or not s:
  274. return None
  275. match = re.search(r'\.', s)
  276. if match:
  277. first_dot_index = match.start()
  278. return s[:first_dot_index + 1] + s[first_dot_index + 1:].replace('.', '')
  279. else:
  280. return s
  281. df['地址经度坐标'] = df['地址经度坐标'].map(remove_extra_dots)
  282. df['地址纬度坐标'] = df['地址纬度坐标'].map(remove_extra_dots)
  283. df.insert(0, 'year_month', year_month) # 在数据框的第一列插入年月字段
  284. df.rename(
  285. columns={'序号': 'serial_no', '数据编码': 'data_num', '租入房屋名称': 'house_name', '房屋所有权人属性': 'owner_type',
  286. '租入形式': 'rent_type', '标准地址(一级)': 'first_address', '标准地址(二级)': 'second_address',
  287. '标准地址(三级)': 'third_address', '街/路/村门牌号': 'fourth_address', '城市区域': 'city_region',
  288. '地段': 'area_sector', '地址经度坐标': 'lng', '地址纬度坐标': 'lat', '租入建筑面积(平米)': 'building_area',
  289. '租入使用面积(平米)': 'usable_area', '投资主体': 'investor', '使用单位层级': 'unit_level',
  290. '使用单位隶属的省级公司': 'first_unit', '使用单位隶属的地市级公司': 'second_unit',
  291. '使用单位隶属的区县级公司': 'third_unit', '使用专业线': 'field', '租入实际用途': 'use_type',
  292. '租入其他用途说明': 'use_description', '自用建筑面积(平米)': 'building_area_self_use',
  293. '转租建筑面积(平米)': 'building_area_sublet', '首次租用时间': 'first_rent_date', '合同编号': 'contract_no',
  294. '合同名称': 'contract_name', '合同类型': 'contract_type', '签订时间': 'sign_date', '我方签约主体': 'lessee',
  295. '出租方名称': 'lessor', '合同总金额(含税)(元)': 'gross_amount', '增值税金额(元)': 'vat',
  296. '租入开始时间(合同生效时间)': 'rent_start_date', '租入终止时间(合同终止时间)': 'rent_end_date',
  297. '承办部门': 'undertaking_department', '承办人': 'undertaker', '联系电话': 'phone',
  298. '累计计提金额(元)': 'amount_accrued', '费用报账总金额(元)': 'amount_reimbursement',
  299. '合同性质': 'contract_nature', '合同状态': 'contract_status'}, inplace=True)
  300. df = df[['year_month', 'serial_no', 'data_num', 'house_name', 'owner_type', 'rent_type', 'first_address',
  301. 'second_address', 'third_address', 'fourth_address', 'city_region', 'area_sector', 'lng', 'lat',
  302. 'building_area', 'usable_area', 'investor', 'unit_level', 'first_unit', 'second_unit',
  303. 'third_unit', 'field', 'use_type', 'use_description', 'building_area_self_use',
  304. 'building_area_sublet', 'first_rent_date', 'contract_no', 'contract_name', 'contract_type',
  305. 'sign_date', 'lessee', 'lessor', 'gross_amount', 'vat', 'rent_start_date', 'rent_end_date',
  306. 'undertaking_department', 'undertaker', 'phone', 'amount_accrued', 'amount_reimbursement',
  307. 'contract_nature', 'contract_status', 'area_no', 'area_name', 'city_no', 'city_name',
  308. 'rent_months', 'gross_amount_month', 'unit_price', 'rent_years', 'unit_price2']]
  309. # 打印数据框的基本信息
  310. df.info()
  311. # 将处理后的数据保存到CSV文件中
  312. df.to_csv(path_or_buf=output_path, index=False, encoding='utf-8-sig', lineterminator='\n')
  313. def data_import():
  314. # 定义 PowerShell 脚本的路径
  315. script_path = r"../../copy.ps1"
  316. # 目标表和文件信息
  317. table = "house.rent_in_month" # 数据库目标表名
  318. # 表字段列名,用于指定导入数据的列顺序
  319. columns = "year_month,serial_no,data_num,house_name,owner_type,rent_type,first_address,second_address,third_address,fourth_address,city_region,area_sector,lng,lat,building_area,usable_area,investor,unit_level,first_unit,second_unit,third_unit,field,use_type,use_description,building_area_self_use,building_area_sublet,first_rent_date,contract_no,contract_name,contract_type,sign_date,lessee,lessor,gross_amount,vat,rent_start_date,rent_end_date,undertaking_department,undertaker,phone,amount_accrued,amount_reimbursement,contract_nature,contract_status,area_no,area_name,city_no,city_name,rent_months,gross_amount_month,unit_price,rent_years,unit_price2"
  320. # 构造执行 PowerShell 脚本的命令
  321. command = f"powershell -NoProfile -NonInteractive -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  322. # 打印生成的命令,方便调试和日志记录
  323. logger.info("command: {}", command)
  324. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  325. completed_process = subprocess.run(
  326. command, # 执行的命令
  327. check=False, # 如果命令执行失败,不抛出异常
  328. text=True, # 将输出作为字符串处理
  329. capture_output=True, # 捕获标准输出和标准错误
  330. )
  331. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  332. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  333. completed_process.stderr)
  334. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  335. p = re.compile(r"^(COPY) (\d+)$")
  336. count = None # 初始化计数变量
  337. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  338. if matcher:
  339. count = int(matcher.group(2)) # 提取导入的数据行数
  340. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  341. if count is None:
  342. raise RuntimeError("导入数据失败")
  343. def upload_file():
  344. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  345. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  346. with paramiko.SSHClient() as ssh:
  347. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  348. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  349. # 连接到远程主机,传入主机地址、端口、用户名和密码
  350. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  351. # 执行远程命令,创建远程目录(如果不存在)
  352. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  353. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  354. with ssh.open_sftp() as sftp:
  355. # 记录日志,提示即将上传的本地文件和远程目标路径
  356. logger.info("upload {} to {}", input_path, remote_path)
  357. # 使用SFTP的put方法将本地文件上传到远程主机
  358. sftp.put(input_path, remote_path)
  359. # 记录日志,提示文件已成功上传
  360. logger.info("uploaded {}", input_path)
  361. data_process()
  362. data_import()
  363. upload_file()