123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574 |
- """不动产房间数据处理
- """
- import re
- from datetime import datetime
- from dateutil.relativedelta import relativedelta
- from loguru import logger
- import pandas as pd
- import psycopg
- import subprocess
- import paramiko
- # 配置日志记录器,将日志写入文件a.log
- logger.add(sink='a.log')
- ssh_hostname = '172.16.107.4' # 定义远程主机地址
- ssh_port = 22 # 定义SSH服务的端口号
- ssh_username = 'app' # 定义登录远程主机的用户名
- ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
- # 服务器文件夹路径
- remote_dir_path = '/data/history/house/room/'
- # 数据库连接信息
- db_host = "172.16.107.5" # 数据库主机地址
- db_port = 5432 # 数据库端口号
- db_username = "finance" # 数据库用户名
- db_password = "Finance@unicom23" # 数据库密码
- dbname = "financialdb" # 数据库名称
- conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
- # 获取当前日期,并计算上个月的第一天
- today = datetime.today()
- start_date = today - relativedelta(months=1, day=1)
- year_month = start_date.strftime('%Y%m')
- # 数据文件路径
- input_path = 'data.xlsx'
- # 输出文件路径
- output_path = 'output.csv'
- def data_process():
- org_map = {} # 用于存储组织机构信息
- third_org_list_map = {} # 用于存储三级组织机构列表
- area_map = {} # 用于存储区域信息
- districts_list_map = {} # 用于存储区县信息
- # 连接到 PostgreSQL 数据库
- with psycopg.connect(
- conninfo=conn_info,
- row_factory=psycopg.rows.dict_row # 设置返回结果为字典格式
- ) as conn:
- with conn.cursor() as curs:
- # 查询所有二级组织机构(grade=1)
- sql = """
- select * from common.organization where grade = 1
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- second_orgs = curs.fetchall()
- # 初始化 third_org_list_map,以每个二级组织的 ID 为键,值为空列表
- for x in second_orgs:
- third_org_list_map[x['id']] = []
- # 查询所有组织机构信息
- sql = """
- select * from common.organization
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- orgs = curs.fetchall()
- # 构建 org_map 和 third_org_list_map
- for x in orgs:
- if x['parent_id'] in third_org_list_map:
- third_org_list_map[x['parent_id']].append(x)
- org_map[x['id']] = x
- # 查询所有一级行政区划(area_grade=1),并按 area_id 排序
- sql = """
- select * from common.area where area_grade = 1 order by area_id
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- cities = curs.fetchall()
- # 初始化 districts_list_map,以每个城市的 area_id 为键,值为空列表
- for x in cities:
- districts_list_map[x['area_id']] = []
- # 查询所有区域信息
- sql = """
- select * from common.area
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- areas = curs.fetchall()
- # 构建 area_map 和 districts_list_map
- for x in areas:
- if x['parent_id'] in districts_list_map:
- districts_list_map[x['parent_id']].append(x)
- area_map[x['area_id']] = x
- # 读取 Excel 文件中的数据
- df = pd.read_excel(io=input_path)
- # 清理 DataFrame 中的空白字符(排除特定列)
- columns_to_clean = list(filter(lambda x: x not in ('房间名称'), df.columns))
- df[columns_to_clean] = df[columns_to_clean].map(
- lambda x: re.sub(r'\s+', '', x) if type(x) is str else x
- )
- # 定义函数:根据资产所属单位获取二级组织机构编码
- def get_area_no(x):
- second_unit = x['资产所属单位(二级)']
- third_unit = x['资产所属单位(三级)']
- if '长途通信传输局' == second_unit:
- return '-11'
- if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
- return '782'
- for second_org in second_orgs:
- area_name = second_org['name']
- area_no = second_org['id']
- if area_name in second_unit:
- return area_no
- return '-12'
- # 应用 get_area_no 函数,生成二级组织机构编码列
- df['二级组织机构编码'] = df.apply(get_area_no, axis=1)
- # 定义函数:根据二级组织机构编码获取对应的名称
- def get_area_name(x):
- area_no = x['二级组织机构编码']
- second_org = org_map[area_no]
- area_name = second_org['name']
- return area_name
- # 应用 get_area_name 函数,生成二级组织机构名称列
- df['二级组织机构名称'] = df.apply(get_area_name, axis=1)
- # 定义函数:根据资产所属单位获取三级组织机构编码
- def get_city_no(x):
- third_unit = x['资产所属单位(三级)']
- area_name = x['二级组织机构名称']
- area_no = x['二级组织机构编码']
- if area_name == '石家庄':
- if '矿区' in third_unit:
- return 'D0130185'
- if '井陉' in third_unit:
- return 'D0130121'
- if area_name == '秦皇岛':
- if '北戴河新区' in third_unit:
- return 'D0130185'
- if '北戴河' in third_unit:
- return 'D0130304'
- if area_name == '唐山':
- if '滦县' in third_unit:
- return 'D0130223'
- if '高新技术开发区' in third_unit:
- return 'D0130205'
- if area_name == '邢台':
- if '内丘' in third_unit:
- return 'D0130523'
- if '任泽' in third_unit:
- return 'D0130526'
- if area_name == '邯郸':
- if '峰峰' in third_unit:
- return 'D0130406'
- if area_name == '省机动局':
- if '沧州' in third_unit:
- return 'HECS180'
- if '唐山' in third_unit:
- return 'HECS181'
- if '秦皇岛' in third_unit:
- return 'HECS182'
- if '廊坊' in third_unit:
- return 'HECS183'
- if '张家口' in third_unit:
- return 'HECS184'
- if '邢台' in third_unit:
- return 'HECS185'
- if '邯郸' in third_unit:
- return 'HECS186'
- if '保定' in third_unit:
- return 'HECS187'
- if '石家庄' in third_unit:
- return 'HECS188'
- if '承德' in third_unit:
- return 'HECS189'
- if '衡水' in third_unit:
- return 'HECS720'
- if '雄安' in third_unit:
- return 'HECS728'
- return 'HECS018'
- if '雄安' == area_name:
- third_unit = third_unit.replace('雄安新区', '')
- third_org_list = third_org_list_map[area_no]
- for third_org in third_org_list:
- city_name = third_org['name']
- if city_name in third_unit:
- return third_org['id']
- if '沧州' == area_name:
- return 'D0130911'
- if '唐山' == area_name:
- return 'D0130202'
- if '秦皇岛' == area_name:
- return 'D0130302'
- if '廊坊' == area_name:
- return 'D0131000'
- if '张家口' == area_name:
- return 'D0130701'
- if '邢台' == area_name:
- return 'D0130502'
- if '邯郸' == area_name:
- return 'D0130402'
- if '保定' == area_name:
- return 'D0130601'
- if '石家庄' == area_name:
- return 'D0130186'
- if '承德' == area_name:
- return 'D0130801'
- if '衡水' == area_name:
- return 'D0133001'
- if '雄安' == area_name:
- return 'D0130830'
- return 'HE001'
- # 应用 get_city_no 函数,生成三级组织机构编码列
- df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
- # 定义函数:根据三级组织机构编码获取对应的名称
- def get_city_name(x):
- city_no = x['三级组织机构编码']
- third_org = org_map[city_no]
- city_name = third_org['name']
- return city_name
- # 应用 get_city_name 函数,生成三级组织机构名称列
- df['三级组织机构名称'] = df.apply(get_city_name, axis=1)
- # 定义函数:根据地址和资产所属单位获取城市 ID
- def get_city_id(x):
- address = x['标准地址']
- second_unit = x['资产所属单位(二级)']
- third_unit = x['资产所属单位(三级)']
- if '雄安' in address or ('保定' in address and ('雄县' in address or '容城' in address or '安新' in address)):
- return '133100'
- for city in cities:
- area_name = city['short_name']
- area_id = city['area_id']
- if area_name in second_unit or area_name in third_unit or area_name in address:
- return area_id
- return ''
- # 应用 get_city_id 函数,生成城市 ID 列
- df['city_id'] = df.apply(get_city_id, axis=1)
- # 定义函数:根据城市 ID 获取城市名称
- def get_city(x):
- city_id = x['city_id']
- area = area_map.get(city_id)
- if pd.notna(area):
- city = area['area_name']
- return city
- return ''
- # 应用 get_city 函数,生成城市名称列
- df['city'] = df.apply(get_city, axis=1)
- # 定义函数:根据地址和城市信息获取区县 ID
- def get_district_id(x):
- address = x['标准地址']
- city = x['city']
- city_id = x['city_id']
- if pd.isna(city) or pd.isna(address):
- return ''
- if city == '石家庄':
- if '矿区' in address:
- return '130107'
- if '井陉' in address:
- return '130121'
- if city == '唐山':
- if '滦县' in address:
- return '130284'
- if city == '邢台':
- if '内邱' in address:
- return '130523'
- if '任县' in address:
- return '130505'
- if city == '雄安':
- address = address.replace('雄安新区', '')
- districts = districts_list_map.get(city_id)
- if not districts:
- return ''
- for district in districts:
- district_name = district['short_name']
- if district_name in address:
- return district['area_id']
- return ''
- # 应用 get_district_id 函数,生成区县 ID 列
- df['district_id'] = df.apply(get_district_id, axis=1)
- # 定义函数:根据区县 ID 获取区县名称
- def get_district(x):
- district_id = x['district_id']
- area = area_map.get(district_id)
- if pd.notna(area):
- district = area['area_name']
- return district
- return ''
- # 应用 get_district 函数,生成区县名称列
- df['district'] = df.apply(get_district, axis=1)
- def get_int(x):
- try:
- return int(x)
- except Exception:
- return ""
- df['工位总数'] = df['工位总数'].apply(get_int)
- # 在 DataFrame 中插入年月列
- df.insert(0, '年月', year_month)
- # 打印 DataFrame 的信息
- print(df.info())
- # 将处理后的数据保存为 CSV 文件
- df.to_csv(
- path_or_buf=output_path,
- index=False,
- header=[
- 'year_month', 'first_unit', 'second_unit', 'third_unit', 'building_name', 'address', 'floor',
- 'floor_building_area', 'floor_usable_area', 'room_name', 'room_status', 'rent_type',
- 'first_room_type', 'second_room_type', 'seat_num', 'frontage', 'building_area',
- 'building_area_self_use', 'building_area_idle', 'building_area_rent', 'building_area_unusable',
- 'usable_area', 'usable_area_self_use', 'usable_area_idle', 'usable_area_rent', 'usable_area_unusable',
- 'idle_start_date', 'unusable_reason', 'floor_height', 'load_bearing', 'area_no', 'area_name',
- 'city_no', 'city_name', 'city_id', 'city', 'district_id', 'district'
- ],
- encoding='utf-8-sig'
- )
- def data_import():
- # 定义 PowerShell 脚本的路径
- script_path = r"../../copy.ps1"
- # 目标表和文件信息
- table = "house.room_month" # 数据库目标表名
- # 表字段列名,用于指定导入数据的列顺序
- columns = "year_month,first_unit,second_unit,third_unit,building_name,address,floor,floor_building_area,floor_usable_area,room_name,room_status,rent_type,first_room_type,second_room_type,seat_num,frontage,building_area,building_area_self_use,building_area_idle,building_area_rent,building_area_unusable,usable_area,usable_area_self_use,usable_area_idle,usable_area_rent,usable_area_unusable,idle_start_date,unusable_reason,floor_height,load_bearing,area_no,area_name,city_no,city_name,city_id,city,district_id,district"
- # 构造执行 PowerShell 脚本的命令
- command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
- # 打印生成的命令,方便调试和日志记录
- logger.info("command: {}", command)
- # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
- completed_process = subprocess.run(
- command, # 执行的命令
- check=False, # 如果命令执行失败,不抛出异常
- text=True, # 将输出作为字符串处理
- capture_output=True, # 捕获标准输出和标准错误
- )
- # 打印命令执行的结果,包括返回码、标准输出和标准错误
- logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
- completed_process.stderr)
- # 定义正则表达式,用于匹配标准输出中的 COPY 结果
- p = re.compile(r"^(COPY) (\d+)$")
- count = None # 初始化计数变量
- matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
- if matcher:
- count = int(matcher.group(2)) # 提取导入的数据行数
- # 如果没有成功提取到导入数据的行数,抛出运行时异常
- if count is None:
- raise RuntimeError("导入数据失败")
- def upload_file():
- remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
- # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
- with paramiko.SSHClient() as ssh:
- # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- # 连接到远程主机,传入主机地址、端口、用户名和密码
- ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
- # 执行远程命令,创建远程目录(如果不存在)
- ssh.exec_command(f'mkdir -p {remote_dir_path}')
- # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
- with ssh.open_sftp() as sftp:
- # 记录日志,提示即将上传的本地文件和远程目标路径
- logger.info("upload {} to {}", input_path, remote_path)
- # 使用SFTP的put方法将本地文件上传到远程主机
- sftp.put(input_path, remote_path)
- # 记录日志,提示文件已成功上传
- logger.info("uploaded {}", input_path)
- def data_update():
- with psycopg.connect(
- conninfo=conn_info,
- ) as conn:
- with conn.cursor() as curs:
- # 更新人均办公面积
- sql = f"""
- with
- t100 as (
- select
- id as area_no,
- name as area_name,
- order_num as area_order
- from
- common.organization
- where
- id in ('-11', '-12')
- ),
- t101 as (
- select
- area_no,
- sum(building_area_self_use) as building_area_self_use_sum
- from
- house.room_month
- where
- second_room_type = '办公用房'
- and year_month = {year_month}
- and area_no in ('-11', '-12')
- group by
- area_no
- ),
- t102 as (
- select
- *
- from
- house.staff_second_unit
- where
- year_month = (
- select
- max(year_month)
- from
- house.staff_second_unit)
- and area_no in ('-11', '-12')
- ),
- t103 as (
- select
- t100.area_no,
- t100.area_name,
- '' as city_no,
- '' as city_name,
- t101.building_area_self_use_sum,
- t102.total,
- t100.area_order,
- 0 as city_order
- from
- t100
- left join t101 on
- t100.area_no = t101.area_no
- left join t102 on
- t100.area_no = t102.area_no
- ),
- t200 as (
- select
- b.id as area_no,
- b.name as area_name,
- a.id as city_no,
- a.name as city_name,
- b.order_num as area_order,
- a.order_num as city_order
- from
- common.organization a
- left join common.organization b on
- a.parent_id = b.id
- where
- a.unhide = 1
- and a.grade = 2
- and a.parent_id not in ('-11', '-12')
- order by
- b.id,
- a.id
- ),
- t201 as (
- select
- area_no,
- city_no,
- sum(building_area_self_use) as building_area_self_use_sum
- from
- house.room_month
- where
- second_room_type = '办公用房'
- and area_no not in ('-11', '-12')
- and year_month = {year_month}
- group by
- area_no,
- city_no
- ),
- t202 as (
- select
- *
- from
- house.staff_third_unit
- where
- year_month = (
- select
- max(year_month)
- from
- house.staff_third_unit)
- and area_no not in ('-11', '-12')
- ),
- t203 as (
- select
- t200.area_no,
- t200.area_name,
- t200.city_no,
- t200.city_name,
- t201.building_area_self_use_sum,
- t202.total,
- t200.area_order,
- t200.city_order
- from
- t200
- left join t201 on
- t200.area_no = t201.area_no
- and t200.city_no = t201.city_no
- left join t202 on
- t200.area_no = t202.area_no
- and t200.city_no = t202.city_no
- ),
- t301 as (
- select
- *
- from
- t103
- union all
- select
- *
- from
- t203
- )
- insert
- into
- house.building_office_area_stat
- (
- year_month,
- area_no,
- area_name,
- city_no,
- city_name,
- building_area_self_use_sum,
- total,
- area_avg,
- area_order,
- city_order
- )
- select
- {year_month} as year_month,
- area_no,
- area_name,
- city_no,
- city_name,
- coalesce(building_area_self_use_sum, 0) as building_area_self_use_sum,
- coalesce(total, 0) as total,
- case
- when total = 0 then null
- else round(coalesce(building_area_self_use_sum, 0) / total, 2)
- end as area_avg,
- area_order,
- city_order
- from
- t301
- order by
- area_order,
- city_order
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- logger.info(f"update {curs.rowcount}")
- data_process()
- data_import()
- upload_file()
- data_update()
|