"""不动产建筑数据处理 """ # 导入必要的库 import re # 正则表达式库,用于字符串处理 from datetime import datetime # 日期时间库,用于处理日期和时间 from dateutil.relativedelta import relativedelta # 日期时间相对偏移库,用于计算相对日期 from decimal import Decimal # 高精度小数库,用于精确的数值计算 from loguru import logger # 日志库,用于记录日志信息 import pandas as pd # 数据分析库,用于处理数据框 import psycopg # PostgreSQL数据库连接库,用于与PostgreSQL交互 import subprocess import paramiko # 配置日志记录器,将日志写入文件a.log logger.add(sink='a.log') ssh_hostname = '172.16.107.4' # 定义远程主机地址 ssh_port = 22 # 定义SSH服务的端口号 ssh_username = 'app' # 定义登录远程主机的用户名 ssh_password = '(l4w0ST_' # 定义登录远程主机的密码 # 服务器文件夹路径 remote_dir_path = '/data/history/house/building/' # 数据库连接信息 db_host = "172.16.107.5" # 数据库主机地址 db_port = 5432 # 数据库端口号 db_username = "finance" # 数据库用户名 db_password = "Finance@unicom23" # 数据库密码 dbname = "financialdb" # 数据库名称 conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'" # 获取当前日期,并计算上个月的第一天 today = datetime.today() start_date = today - relativedelta(months=1, day=1) year_month = start_date.strftime('%Y%m') # 数据文件路径 input_path = 'data.xlsx' # 输出文件路径 output_path = 'output.csv' def data_process(): # 初始化全局变量,用于存储组织、区域等映射关系 org_map = {} # 组织ID到组织信息的映射 third_org_list_map = {} # 二级组织ID到其下属三级组织列表的映射 area_map = {} # 区域ID到区域信息的映射 districts_list_map = {} # 城市ID到其下属区县列表的映射 # 连接到PostgreSQL数据库,并获取组织和区域数据 with psycopg.connect( conninfo=conn_info, row_factory=psycopg.rows.dict_row ) as conn: with conn.cursor() as curs: # 查询所有一级组织(grade=1) sql = """ select * from common.organization where grade = 1 """ logger.info(f"sql: {sql}") curs.execute(sql) second_orgs = curs.fetchall() for x in second_orgs: third_org_list_map[x['id']] = [] # 初始化每个二级组织的三级组织列表为空 # 查询所有组织 sql = """ select * from common.organization """ logger.info(f"sql: {sql}") curs.execute(sql) orgs = curs.fetchall() for x in orgs: if x['parent_id'] in third_org_list_map: third_org_list_map[x['parent_id']].append(x) # 将三级组织添加到对应的二级组织列表中 org_map[x['id']] = x # 构建组织ID到组织信息的映射 # 查询所有省级区域(area_grade=1) sql = """ select * from common.area where area_grade = 1 order by area_id """ logger.info(f"sql: {sql}") curs.execute(sql) cities = curs.fetchall() for x in cities: districts_list_map[x['area_id']] = [] # 初始化每个城市的区县列表为空 # 查询所有区域 sql = """ select * from common.area """ logger.info(f"sql: {sql}") curs.execute(sql) areas = curs.fetchall() for x in areas: if x['parent_id'] in districts_list_map: districts_list_map[x['parent_id']].append(x) # 将区县添加到对应的城市列表中 area_map[x['area_id']] = x # 构建区域ID到区域信息的映射 # 读取Excel文件中的数据并进行预处理 df = pd.read_excel(io=input_path) # 读取Excel文件 df = df.map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x) # 去除字符串字段中的多余空格 df.drop_duplicates(subset=['建筑ID'], keep='last', inplace=True) # 去重,保留最后一条记录 # 定义函数:根据资产所属单位获取二级组织机构编码 def get_area_no(x): second_unit = x['资产所属单位(二级)'] third_unit = x['资产所属单位(三级)'] if '河北' == second_unit: return '-12' if '长途通信传输局' == second_unit: return '-11' if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit): return '782' for second_org in second_orgs: area_name = second_org['name'] area_no = second_org['id'] if area_name in second_unit: return area_no raise RuntimeError(f'二级组织机构编码匹配失败: {second_unit}') df['二级组织机构编码'] = df.apply(get_area_no, axis=1) # 应用函数,生成二级组织机构编码列 # 定义函数:根据二级组织机构编码获取二级组织机构名称 def get_area_name(x): area_no = x['二级组织机构编码'] second_org = org_map[area_no] area_name = second_org['name'] return area_name df['二级组织机构名称'] = df.apply(get_area_name, axis=1) # 应用函数,生成二级组织机构名称列 # 定义函数:根据资产所属单位获取三级组织机构编码 def get_city_no(x): third_unit = x['资产所属单位(三级)'] area_name = x['二级组织机构名称'] area_no = x['二级组织机构编码'] if area_name == '石家庄': if '矿区' in third_unit: return 'D0130185' if '井陉' in third_unit: return 'D0130121' if area_name == '秦皇岛': if '北戴河新区' in third_unit: return 'D0130185' if '北戴河' in third_unit: return 'D0130304' if area_name == '唐山': if '滦县' in third_unit: return 'D0130223' if '高新技术开发区' in third_unit: return 'D0130205' if area_name == '邢台': if '内丘' in third_unit: return 'D0130523' if '任泽' in third_unit: return 'D0130526' if area_name == '邯郸': if '峰峰' in third_unit: return 'D0130406' if area_name == '省机动局': if '沧州' in third_unit: return 'HECS180' if '唐山' in third_unit: return 'HECS181' if '秦皇岛' in third_unit: return 'HECS182' if '廊坊' in third_unit: return 'HECS183' if '张家口' in third_unit: return 'HECS184' if '邢台' in third_unit: return 'HECS185' if '邯郸' in third_unit: return 'HECS186' if '保定' in third_unit: return 'HECS187' if '石家庄' in third_unit: return 'HECS188' if '承德' in third_unit: return 'HECS189' if '衡水' in third_unit: return 'HECS720' if '雄安' in third_unit: return 'HECS728' return 'HECS018' if '雄安' == area_name: third_unit = third_unit.replace('雄安新区', '') third_org_list = third_org_list_map[area_no] for third_org in third_org_list: city_name = third_org['name'] if city_name in third_unit: return third_org['id'] if '沧州' == area_name: return 'D0130911' if '唐山' == area_name: return 'D0130202' if '秦皇岛' == area_name: return 'D0130302' if '廊坊' == area_name: return 'D0131000' if '张家口' == area_name: return 'D0130701' if '邢台' == area_name: return 'D0130502' if '邯郸' == area_name: return 'D0130402' if '保定' == area_name: return 'D0130601' if '石家庄' == area_name: return 'D0130186' if '承德' == area_name: return 'D0130801' if '衡水' == area_name: return 'D0133001' if '雄安' == area_name: return 'D0130830' return 'HE001' df['三级组织机构编码'] = df.apply(get_city_no, axis=1) # 应用函数,生成三级组织机构编码列 # 定义函数:根据三级组织机构编码获取三级组织机构名称 def get_city_name(x): city_no = x['三级组织机构编码'] third_org = org_map[city_no] city_name = third_org['name'] return city_name df['三级组织机构名称'] = df.apply(get_city_name, axis=1) # 应用函数,生成三级组织机构名称列 # 定义函数:根据标准地址获取城市ID def get_city_id(x): address = x['标准地址'] second_unit = x['资产所属单位(二级)'] third_unit = x['资产所属单位(三级)'] if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)): return '133100' for city in cities: area_name = city['short_name'] area_id = city['area_id'] if area_name in second_unit: return area_id if area_name in third_unit: return area_id if area_name in address: return area_id return '' df['city_id'] = df.apply(get_city_id, axis=1) # 应用函数,生成城市ID列 # 定义函数:根据城市ID获取城市名称 def get_city(x): city_id = x['city_id'] area = area_map.get(city_id) if pd.notna(area): city = area['area_name'] return city return '' df['city'] = df.apply(get_city, axis=1) # 应用函数,生成城市名称列 # 定义函数:根据标准地址获取区县ID def get_district_id(x): address = x['标准地址'] city = x['city'] city_id = x['city_id'] if pd.isna(city) or pd.isna(address): return '' if city == '石家庄': if '矿区' in address: return '130107' if '井陉' in address: return '130121' if city == '唐山': if '滦县' in address: return '130284' if city == '邢台': if '内邱' in address: return '130523' if '任县' in address: return '130505' if city == '雄安': address = address.replace('雄安新区', '') districts = districts_list_map.get(city_id) if not districts: return '' for district in districts: district_name = district['short_name'] if district_name in address: return district['area_id'] return '' df['district_id'] = df.apply(get_district_id, axis=1) # 应用函数,生成区县ID列 # 定义函数:根据区县ID获取区县名称 def get_district(x): district_id = x['district_id'] area = area_map.get(district_id) if pd.notna(area): district = area['area_name'] return district return '' df['district'] = df.apply(get_district, axis=1) # 应用函数,生成区县名称列 # 定义函数:将百分比字符串转换为小数 def convert_percentage_to_number(x): if pd.notna(x) and isinstance(x, str) and x.endswith('%'): return Decimal(x[:-1]) / Decimal('100') return x df['得房率'] = df['得房率'].apply(convert_percentage_to_number) # 应用函数,将得房率转换为小数 df['year_no'] = start_date.year # 年份列 df['month_no'] = start_date.month # 月份列 def get_int(x): try: return int(x) except Exception: return "" df['房龄开始年份'] = df['房龄开始年份'].apply(get_int) # 定义函数:计算房龄 def get_house_age(x): house_year_began = x['房龄开始年份'] if pd.notna(house_year_began) and house_year_began: current_year = start_date.year return current_year - house_year_began return '' df['house_age'] = df.apply(get_house_age, axis=1) # 应用函数,生成房龄列 df.insert(0, '年月', year_month) # 在数据框第一列插入年月列 # 打印数据框信息 print(df.info()) # 将结果保存为CSV文件 df.to_csv( path_or_buf=output_path, index=False, header=[ 'year_month', 'first_unit', 'second_unit', 'third_unit', 'building_name', 'building_id', 'housing_acquisition_rate', 'site_name', 'site_id', 'land_name', 'housing_source', 'acquisition_date', 'house_year_began', 'investor', 'management_level', 'building_structure', 'total_floors', 'frontage', 'courtyard', 'whole_building', 'property_ownership_certificate', 'no_property_ownership_certificate_reason', 'unrelated_assets', 'assets_num', 'assets_tag_num', 'usage_status', 'building_use', 'ownership_status', 'floor_area', 'building_area', 'building_area_self_use', 'building_area_rent', 'building_area_idle', 'building_area_unusable', 'usable_area', 'usable_area_self_use', 'usable_area_rent', 'usable_area_idle', 'usable_area_unusable', 'community_assistant_name', 'community_assistant_unit', 'lng_jt', 'lat_jt', 'address', 'property_owner', 'checked', 'area_no', 'area_name', 'city_no', 'city_name', 'city_id', 'city', 'district_id', 'district', 'year_no', 'month_no', 'house_age' ], encoding='utf-8-sig' ) def data_import(): # 定义 PowerShell 脚本的路径 script_path = r"../../copy.ps1" # 目标表和文件信息 table = "house.building_month" # 数据库目标表名 # 表字段列名,用于指定导入数据的列顺序 columns = "year_month,first_unit,second_unit,third_unit,building_name,building_id,housing_acquisition_rate,site_name,site_id,land_name,housing_source,acquisition_date,house_year_began,investor,management_level,building_structure,total_floors,frontage,courtyard,whole_building,property_ownership_certificate,no_property_ownership_certificate_reason,unrelated_assets,assets_num,assets_tag_num,usage_status,building_use,ownership_status,floor_area,building_area,building_area_self_use,building_area_rent,building_area_idle,building_area_unusable,usable_area,usable_area_self_use,usable_area_rent,usable_area_idle,usable_area_unusable,community_assistant_name,community_assistant_unit,lng_jt,lat_jt,address,property_owner,checked,area_no,area_name,city_no,city_name,city_id,city,district_id,district,year_no,month_no,house_age" # 构造执行 PowerShell 脚本的命令 command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}" # 打印生成的命令,方便调试和日志记录 logger.info("command: {}", command) # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出 completed_process = subprocess.run( command, # 执行的命令 check=False, # 如果命令执行失败,不抛出异常 text=True, # 将输出作为字符串处理 capture_output=True, # 捕获标准输出和标准错误 ) # 打印命令执行的结果,包括返回码、标准输出和标准错误 logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout, completed_process.stderr) # 定义正则表达式,用于匹配标准输出中的 COPY 结果 p = re.compile(r"^(COPY) (\d+)$") count = None # 初始化计数变量 matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果 if matcher: count = int(matcher.group(2)) # 提取导入的数据行数 # 如果没有成功提取到导入数据的行数,抛出运行时异常 if count is None: raise RuntimeError("导入数据失败") def upload_file(): remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径 # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文 with paramiko.SSHClient() as ssh: # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接到远程主机,传入主机地址、端口、用户名和密码 ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password) # 执行远程命令,创建远程目录(如果不存在) ssh.exec_command(f'mkdir -p {remote_dir_path}') # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文 with ssh.open_sftp() as sftp: # 记录日志,提示即将上传的本地文件和远程目标路径 logger.info("upload {} to {}", input_path, remote_path) # 使用SFTP的put方法将本地文件上传到远程主机 sftp.put(input_path, remote_path) # 记录日志,提示文件已成功上传 logger.info("uploaded {}", input_path) def data_update(): with psycopg.connect( conninfo=conn_info, ) as conn: with conn.cursor() as curs: # 更新局址信息 sql = f""" update house.building_month a set site_num = b.site_num, city_level = b.city_level, city_region = b.city_region, area_sector = b.area_sector, has_land = b.has_land from house.site_month b where a.site_id = b.site_id and a.year_month = b.year_month and a.year_month = {year_month} """ logger.info(f"sql: {sql}") curs.execute(sql) logger.info(f"update {curs.rowcount}") # 更新经纬度 sql = f""" with t101 as ( select * from house.building_month where year_month = 202312 ) update house.building_month a set lng_wgs84 = b.lng_wgs84, lat_wgs84 = b.lat_wgs84, lng_bd09 = b.lng_bd09, lat_bd09 = b.lat_bd09, building_img = b.building_img from t101 b where a.year_month = {year_month} and a.building_id = b.building_id """ logger.info(f"sql: {sql}") curs.execute(sql) logger.info(f"update {curs.rowcount}") # 更新闲置建筑面积超过1000平米策略 sql = f""" insert into house.building_idle_strategy ( year_month, building_id, first_unit, second_unit, third_unit, site_num, site_name, address, city_level, city_region, area_sector, has_land, site_id, building_name, housing_acquisition_rate, housing_source, acquisition_date, house_year_began, investor, management_level, building_structure, total_floors, assets_num, assets_tag_num, usage_status, building_use, ownership_status, floor_area, building_area, building_area_self_use, building_area_rent, building_area_idle, building_area_unusable, usable_area, usable_area_self_use, usable_area_rent, usable_area_idle, usable_area_unusable, city, district, lng_wgs84, lat_wgs84, lng_bd09, lat_bd09, building_img, area_no, area_name, city_no, city_name, year_no, month_no, house_age, land_name, frontage, courtyard, whole_building, property_ownership_certificate, no_property_ownership_certificate_reason, unrelated_assets, community_assistant_name, community_assistant_unit, lng_jt, lat_jt, property_owner, checked, city_id, district_id ) select year_month, building_id, first_unit, second_unit, third_unit, site_num, site_name, address, city_level, city_region, area_sector, has_land, site_id, building_name, housing_acquisition_rate, housing_source, acquisition_date, house_year_began, investor, management_level, building_structure, total_floors, assets_num, assets_tag_num, usage_status, building_use, ownership_status, floor_area, building_area, building_area_self_use, building_area_rent, building_area_idle, building_area_unusable, usable_area, usable_area_self_use, usable_area_rent, usable_area_idle, usable_area_unusable, city, district, lng_wgs84, lat_wgs84, lng_bd09, lat_bd09, building_img, area_no, area_name, city_no, city_name, year_no, month_no, house_age, land_name, frontage, courtyard, whole_building, property_ownership_certificate, no_property_ownership_certificate_reason, unrelated_assets, community_assistant_name, community_assistant_unit, lng_jt, lat_jt, property_owner, checked, city_id, district_id from house.building_month where building_area_idle > 1000 and year_month = {year_month} order by building_area_idle desc """ logger.info(f"sql: {sql}") curs.execute(sql) logger.info(f"update {curs.rowcount}") sql = f""" with t101 as ( select *, row_number() over ( order by building_area_idle desc) as sort from house.building_idle_strategy where year_month = {year_month} ), t201 as ( select area_no, area_name, city_no, city_name, 'kpi_301320_155_01' as kpi_code, '闲置建筑面积' as kpi_name, round(building_area_idle, 2)::varchar as kpi_value, '1' as kpi_type, building_id as jk_object_no, building_name as jk_object, sort from t101 ), t202 as ( select area_no, area_name, city_no, city_name, 'kpi_301320_155_02' as kpi_code, '房产名称' as kpi_name, building_name as kpi_value, '0' as kpi_type, building_id as jk_object_no, building_name as jk_object, sort from t101 ), t203 as ( select area_no, area_name, city_no, city_name, 'kpi_301320_155_03' as kpi_code, '房产编号' as kpi_name, building_id as kpi_value, '0' as kpi_type, building_id as jk_object_no, building_name as jk_object, sort from t101 ), t204 as ( select area_no, area_name, city_no, city_name, 'kpi_301320_155_04' as kpi_code, '房产总建筑面积' as kpi_name, round(building_area, 2)::varchar as kpi_value, '0' as kpi_type, building_id as jk_object_no, building_name as jk_object, sort from t101 ), t301 as ( select * from t201 union all select * from t202 union all select * from t203 union all select * from t204 ) insert into publish.house_building_idle_strategy ( acct_date, dept_code, dept_name, strategy_code, area_no, area_name, city_no, city_name, sale_no, sale_name, jk_object_no, jk_object, kpi_code, kpi_name, kpi_value, kpi_type, sort ) select {year_month} as acct_date, '301320' as dept_code, '河北省分公司纵横运营中心' as dept_name, '301320_155' as strategy_code, area_no, area_name, city_no, city_name, '' as sale_no, '' as sale_name, jk_object_no, jk_object, kpi_code, kpi_name, kpi_value, kpi_type, sort from t301 order by sort, kpi_code """ logger.info(f"sql: {sql}") curs.execute(sql) logger.info(f"update {curs.rowcount}") data_process() data_import() upload_file() data_update()