"""不动产租入合同数据处理 """ import re # 导入正则表达式模块,用于字符串处理 import decimal # 导入decimal模块,用于高精度的数值计算 import subprocess from datetime import datetime # 导入datetime模块,用于日期和时间操作 from dateutil.relativedelta import relativedelta # 导入relativedelta模块,用于日期之间的相对差异计算 from loguru import logger # 导入loguru模块,用于日志记录 import pandas as pd # 导入pandas模块,用于数据处理和分析 import psycopg # 导入psycopg模块,用于连接PostgreSQL数据库 import paramiko # 配置日志记录器,将日志写入文件a.log logger.add(sink='a.log') ssh_hostname = '172.16.107.4' # 定义远程主机地址 ssh_port = 22 # 定义SSH服务的端口号 ssh_username = 'app' # 定义登录远程主机的用户名 ssh_password = '(l4w0ST_' # 定义登录远程主机的密码 # 服务器文件夹路径 remote_dir_path = '/data/history/house/zu-ru-he-tong/' # 数据库连接信息 db_host = "172.16.107.5" # 数据库主机地址 db_port = 5432 # 数据库端口号 db_username = "finance" # 数据库用户名 db_password = "Finance@unicom23" # 数据库密码 dbname = "financialdb" # 数据库名称 conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'" # 获取当前日期,并计算上个月的第一天 today = datetime.today() start_date = today - relativedelta(months=1, day=1) year_month = start_date.strftime('%Y%m') # 数据文件路径 input_path = 'data.xlsx' # 输出文件路径 output_path = 'output.csv' def data_process(): org_map = {} # 存储所有组织机构的ID与详细信息的映射 third_org_list_map = {} # 存储二级组织机构与其下属三级组织机构的映射 area_map = {} # 存储所有区域的ID与详细信息的映射 districts_list_map = {} # 存储一级区域与其下属子区域的映射 # 连接到PostgreSQL数据库,并使用字典格式返回查询结果 with psycopg.connect( conninfo=conn_info, row_factory=psycopg.rows.dict_row # 使用字典格式返回查询结果 ) as conn: with conn.cursor() as curs: # 查询grade为1的组织机构(二级组织机构) sql = """ select * from common.organization where grade = 1 """ logger.info(f"sql: {sql}") # 记录SQL语句到日志 curs.execute(sql) second_orgs = curs.fetchall() for x in second_orgs: third_org_list_map[x['id']] = [] # 初始化每个二级组织机构的三级组织机构列表 # 查询所有组织机构 sql = """ select * from common.organization """ logger.info(f"sql: {sql}") # 记录SQL语句到日志 curs.execute(sql) orgs = curs.fetchall() for x in orgs: if x['parent_id'] in third_org_list_map: third_org_list_map[x['parent_id']].append(x) # 将三级组织机构添加到对应二级组织机构的列表中 org_map[x['id']] = x # 将组织机构ID与详细信息存入org_map # 查询area_grade为1的区域(一级区域) sql = """ select * from common.area where area_grade = 1 order by area_id """ logger.info(f"sql: {sql}") # 记录SQL语句到日志 curs.execute(sql) cities = curs.fetchall() for x in cities: districts_list_map[x['area_id']] = [] # 初始化每个一级区域的子区域列表 # 查询所有区域 sql = """ select * from common.area """ logger.info(f"sql: {sql}") # 记录SQL语句到日志 curs.execute(sql) areas = curs.fetchall() for x in areas: if x['parent_id'] in districts_list_map: districts_list_map[x['parent_id']].append(x) # 将子区域添加到对应一级区域的列表中 area_map[x['area_id']] = x # 将区域ID与详细信息存入area_map # 读取Excel文件中的数据,并跳过第一行 df = pd.read_excel(io=input_path, skiprows=1) # 删除指定列中的空白字符 columns_to_clean = list(filter(lambda x: x not in ('签订时间'), df.columns)) # 排除“签订时间”列 df[columns_to_clean] = df[columns_to_clean].map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x) def get_area_no(x): """根据使用单位隶属的地市级公司名称获取二级组织机构编码""" second_unit = x['使用单位隶属的地市级公司'] if '河北' == second_unit: return '-12' if '长途通信传输局' == second_unit: return '-11' for second_org in second_orgs: area_name = second_org['name'] area_no = second_org['id'] if area_name in second_unit: return area_no raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}') df['二级组织机构编码'] = df.apply(get_area_no, axis=1) def get_area_name(x): """根据二级组织机构编码获取二级组织机构名称""" area_no = x['二级组织机构编码'] second_org = org_map[area_no] area_name = second_org['name'] return area_name df['二级组织机构名称'] = df.apply(get_area_name, axis=1) def get_city_no(x): """根据使用单位隶属的区县级公司名称获取三级组织机构编码""" third_unit = x['使用单位隶属的区县级公司'] area_name = x['二级组织机构名称'] area_no = x['二级组织机构编码'] if area_name == '石家庄': if '矿区' in third_unit: return 'D0130185' if '井陉' in third_unit: return 'D0130121' if area_name == '秦皇岛': if '北戴河新区' in third_unit: return 'D0130185' if '北戴河' in third_unit: return 'D0130304' if area_name == '唐山': if '滦县' in third_unit: return 'D0130223' if '高新技术开发区' in third_unit: return 'D0130205' if area_name == '邢台': if '内丘' in third_unit: return 'D0130523' if '任泽' in third_unit: return 'D0130526' if area_name == '邯郸': if '峰峰' in third_unit: return 'D0130406' if area_name == '省机动局': if '沧州' in third_unit: return 'HECS180' if '唐山' in third_unit: return 'HECS181' if '秦皇岛' in third_unit: return 'HECS182' if '廊坊' in third_unit: return 'HECS183' if '张家口' in third_unit: return 'HECS184' if '邢台' in third_unit: return 'HECS185' if '邯郸' in third_unit: return 'HECS186' if '保定' in third_unit: return 'HECS187' if '石家庄' in third_unit: return 'HECS188' if '承德' in third_unit: return 'HECS189' if '衡水' in third_unit: return 'HECS720' if '雄安' in third_unit: return 'HECS728' return 'HECS018' if '雄安' == area_name: third_unit = third_unit.replace('雄安新区', '') third_org_list = third_org_list_map[area_no] for third_org in third_org_list: city_name = third_org['name'] if city_name in third_unit: return third_org['id'] if '沧州' == area_name: return 'D0130911' if '唐山' == area_name: return 'D0130202' if '秦皇岛' == area_name: return 'D0130302' if '廊坊' == area_name: return 'D0131000' if '张家口' == area_name: return 'D0130701' if '邢台' == area_name: return 'D0130502' if '邯郸' == area_name: return 'D0130402' if '保定' == area_name: return 'D0130601' if '石家庄' == area_name: return 'D0130186' if '承德' == area_name: return 'D0130801' if '衡水' == area_name: return 'D0133001' if '雄安' == area_name: return 'D0130830' return 'HE001' df['三级组织机构编码'] = df.apply(get_city_no, axis=1) def get_city_name(x): """根据三级组织机构编码获取三级组织机构名称""" city_no = x['三级组织机构编码'] third_org = org_map[city_no] city_name = third_org['name'] return city_name df['三级组织机构名称'] = df.apply(get_city_name, axis=1) def get_rent_months(x): """根据租入开始时间和终止时间计算租期月数""" rent_start_date = x['租入开始时间(合同生效时间)'] rent_end_date = x['租入终止时间(合同终止时间)'] if pd.isna(rent_start_date) or pd.isna(rent_end_date): return '' rent_start_date = pd.to_datetime(rent_start_date) rent_end_date = pd.to_datetime(rent_end_date) delta = relativedelta(rent_end_date, rent_start_date) rent_months = delta.years * 12 + delta.months + (1 if delta.days > 0 else 0) return rent_months df['租期月数'] = df.apply(get_rent_months, axis=1) def get_gross_amount_month(x): """根据合同总金额和租期月数计算月含税合同额""" gross_amount = x['合同总金额(含税)(元)'] rent_months = x['租期月数'] if pd.notna(gross_amount) and pd.notna(rent_months) and rent_months and rent_months > 0: return (decimal.Decimal(gross_amount) / decimal.Decimal(rent_months)).quantize(decimal.Decimal('0.00')) return None df['月含税合同额'] = df.apply(get_gross_amount_month, axis=1) def get_unit_price(x): """根据租入建筑面积和月含税合同额计算每平米单价""" building_area = x['租入建筑面积(平米)'] gross_amount_month = x['月含税合同额'] if pd.notna(building_area) and pd.notna(gross_amount_month) and building_area > 0 and gross_amount_month > 0: return (decimal.Decimal(gross_amount_month) / decimal.Decimal(building_area)).quantize( decimal.Decimal('0.00')) return None df['每平米单价'] = df.apply(get_unit_price, axis=1) def get_rent_years(x): """根据租期月数计算租期年数""" rent_months = x['租期月数'] if pd.isna(rent_months) or not rent_months: return None return (decimal.Decimal(rent_months) / decimal.Decimal('12')).quantize(decimal.Decimal('0.00')) df['rent_years'] = df.apply(get_rent_years, axis=1) def get_unit_price2(x): """根据合同总金额、租入建筑面积和租期年数计算另一种每平米单价""" gross_amount = x['合同总金额(含税)(元)'] building_area = x['租入建筑面积(平米)'] rent_years = x['rent_years'] if pd.notna(building_area) and pd.notna(gross_amount) and pd.notna( rent_years) and building_area > 0 and gross_amount > 0 and rent_years > 0: return (decimal.Decimal(gross_amount) / decimal.Decimal(building_area) / decimal.Decimal( rent_years) / decimal.Decimal(12)).quantize(decimal.Decimal('0.00')) return None df['unit_price2'] = df.apply(get_unit_price2, axis=1) def remove_extra_dots(s): if pd.isna(s) or not s: return None match = re.search(r'\.', s) if match: first_dot_index = match.start() return s[:first_dot_index + 1] + s[first_dot_index + 1:].replace('.', '') else: return s df['地址经度坐标'] = df['地址经度坐标'].map(remove_extra_dots) df['地址纬度坐标'] = df['地址纬度坐标'].map(remove_extra_dots) df.insert(0, '年月', year_month) # 在数据框的第一列插入年月字段 # 打印数据框的基本信息 print(df.info()) # 将处理后的数据保存到CSV文件中 df.to_csv(path_or_buf=output_path, index=False, header=['year_month', 'serial_no', 'data_num', 'house_name', 'owner_type', 'rent_type', 'first_address', 'second_address', 'third_address', 'fourth_address', 'city_region', 'area_sector', 'lng', 'lat', 'building_area', 'usable_area', 'investor', 'unit_level', 'first_unit', 'second_unit', 'third_unit', 'field', 'use_type', 'use_description', 'building_area_self_use', 'building_area_sublet', 'first_rent_date', 'contract_no', 'contract_name', 'contract_type', 'sign_date', 'lessee', 'lessor', 'gross_amount', 'vat', 'rent_start_date', 'rent_end_date', 'undertaking_department', 'undertaker', 'phone', 'amount_accrued', 'amount_reimbursement', 'contract_nature', 'contract_status', 'area_no', 'area_name', 'city_no', 'city_name', 'rent_months', 'gross_amount_month', 'unit_price', 'rent_years', 'unit_price2'], encoding='utf-8-sig') def data_import(): # 定义 PowerShell 脚本的路径 script_path = r"../../copy.ps1" # 目标表和文件信息 table = "house.rent_in_month" # 数据库目标表名 # 表字段列名,用于指定导入数据的列顺序 columns = "year_month,serial_no,data_num,house_name,owner_type,rent_type,first_address,second_address,third_address,fourth_address,city_region,area_sector,lng,lat,building_area,usable_area,investor,unit_level,first_unit,second_unit,third_unit,field,use_type,use_description,building_area_self_use,building_area_sublet,first_rent_date,contract_no,contract_name,contract_type,sign_date,lessee,lessor,gross_amount,vat,rent_start_date,rent_end_date,undertaking_department,undertaker,phone,amount_accrued,amount_reimbursement,contract_nature,contract_status,area_no,area_name,city_no,city_name,rent_months,gross_amount_month,unit_price,rent_years,unit_price2" # 构造执行 PowerShell 脚本的命令 command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}" # 打印生成的命令,方便调试和日志记录 logger.info("command: {}", command) # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出 completed_process = subprocess.run( command, # 执行的命令 check=False, # 如果命令执行失败,不抛出异常 text=True, # 将输出作为字符串处理 capture_output=True, # 捕获标准输出和标准错误 ) # 打印命令执行的结果,包括返回码、标准输出和标准错误 logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout, completed_process.stderr) # 定义正则表达式,用于匹配标准输出中的 COPY 结果 p = re.compile(r"^(COPY) (\d+)$") count = None # 初始化计数变量 matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果 if matcher: count = int(matcher.group(2)) # 提取导入的数据行数 # 如果没有成功提取到导入数据的行数,抛出运行时异常 if count is None: raise RuntimeError("导入数据失败") def upload_file(): remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径 # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文 with paramiko.SSHClient() as ssh: # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接到远程主机,传入主机地址、端口、用户名和密码 ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password) # 执行远程命令,创建远程目录(如果不存在) ssh.exec_command(f'mkdir -p {remote_dir_path}') # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文 with ssh.open_sftp() as sftp: # 记录日志,提示即将上传的本地文件和远程目标路径 logger.info("upload {} to {}", input_path, remote_path) # 使用SFTP的put方法将本地文件上传到远程主机 sftp.put(input_path, remote_path) # 记录日志,提示文件已成功上传 logger.info("uploaded {}", input_path) data_process() data_import() upload_file()