123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- """不动产异常数据稽核数据处理
- """
- import os # 导入os模块,用于处理文件和目录操作
- import re # 导入re模块,用于正则表达式操作
- from loguru import logger # 导入loguru模块,用于日志记录
- import pandas as pd # 导入pandas模块,用于数据处理和分析
- import psycopg # 导入psycopg模块,用于连接PostgreSQL数据库
- from datetime import datetime # 导入datetime模块,用于日期和时间操作
- from dateutil.relativedelta import relativedelta # 导入relativedelta模块,用于日期的相对计算
- import xlwings as xw # 导入xlwings模块,用于操作Excel文件
- import subprocess
- import paramiko
- # 配置日志记录器,将日志输出到文件a.log
- logger.add(sink='a.log')
- ssh_hostname = '172.16.107.4' # 定义远程主机地址
- ssh_port = 22 # 定义SSH服务的端口号
- ssh_username = 'app' # 定义登录远程主机的用户名
- ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
- # 服务器文件夹路径
- remote_dir_path = '/data/history/house/abnormal-data/'
- # 数据库连接信息
- db_host = "172.16.107.5" # 数据库主机地址
- db_port = 5432 # 数据库端口号
- db_username = "finance" # 数据库用户名
- db_password = "Finance@unicom23" # 数据库密码
- dbname = "financialdb" # 数据库名称
- conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
- # 创建一个不可见的Excel应用程序实例
- app = xw.App(visible=False)
- # 获取当前日期和时间
- today = datetime.today()
- # 计算上个月的第一天作为起始日期
- start_date = today - relativedelta(months=1, day=1)
- # 将起始日期格式化为YYYYMM的形式
- year_month = start_date.strftime('%Y%m')
- # 定义数据输入和输出目录
- input_dir = 'data/'
- output_dir = 'output/'
- # 输出文件路径
- output_path = 'output.csv'
- remote_dir_path = f"{remote_dir_path}{year_month}/"
- def xls_to_xlsx():
- # 如果输出目录不存在,则创建该目录
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- # 遍历数据目录中的所有文件
- for filename in os.listdir(input_dir):
- # 只处理以.xls结尾的文件
- if filename.endswith(".xls"):
- file_path = os.path.join(input_dir, filename) # 构造文件路径
- workbook = app.books.open(file_path) # 打开Excel文件
- pattern = r'\D+(\d+)' # 定义正则表达式,用于匹配非数字部分
- # 使用正则表达式替换文件名,并将扩展名改为.xlsx
- new_file_name = re.sub(pattern, year_month + '_', filename).replace(".xls", ".xlsx")
- # 保存修改后的文件到输出目录
- workbook.save(os.path.join(output_dir, new_file_name))
- logger.info(f"{filename} -> {new_file_name}") # 记录日志
- workbook.close() # 关闭工作簿
- def data_process():
- # 初始化组织映射和三级组织列表映射
- org_map = {}
- third_org_list_map = {}
- # 连接PostgreSQL数据库
- with psycopg.connect(
- conninfo=conn_info,
- row_factory=psycopg.rows.dict_row # 使用字典格式返回查询结果
- ) as conn:
- with conn.cursor() as curs:
- # 查询一级组织信息
- sql = """
- select * from common.organization where grade = 1
- """
- logger.info(f"sql: {sql}") # 记录SQL语句
- curs.execute(sql)
- second_orgs = curs.fetchall()
- # 初始化三级组织列表映射
- for x in second_orgs:
- third_org_list_map[x['id']] = []
- # 查询所有组织信息
- sql = """
- select * from common.organization
- """
- logger.info(f"sql: {sql}") # 记录SQL语句
- curs.execute(sql)
- orgs = curs.fetchall()
- # 构建组织映射和三级组织列表映射
- for x in orgs:
- if x['parent_id'] in third_org_list_map:
- third_org_list_map[x['parent_id']].append(x)
- org_map[x['id']] = x
- files = os.listdir(output_dir)
- file_list = list(filter(lambda x: x.endswith('.xlsx'), files)) # 筛选出以.xlsx结尾的文件
- logger.info('file_list: {}', file_list) # 记录文件列表
- # 初始化数据列表
- data_list = []
- # 遍历文件列表,读取Excel文件内容并进行处理
- for t in file_list:
- logger.info(f'{t}') # 记录当前文件名
- ny = t.replace('.xlsx', '').split('_')[0] # 提取年月信息
- ds = t.replace('.xlsx', '').split('_')[1] # 提取地市信息
- tmp = pd.read_excel(output_dir + t, skiprows=8, header=None) # 读取Excel文件内容
- if '省本部' in ds or '省公司' in ds:
- tmp = pd.read_excel(output_dir + t, skiprows=8, header=None, nrows=1) # 特殊处理省本部文件
- tmp['年月'] = ny # 添加年月列
- tmp['地市'] = ds # 添加地市列
- tmp['source'] = t # 添加来源文件名列
- data_list.append(tmp) # 将数据添加到数据列表中
- # 合并所有数据到一个DataFrame中
- df = pd.concat(data_list)
- # 去除字符串中的多余空格
- df = df.map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
- # 定义DataFrame列名
- df.columns = ['third_unit', 'wei_guan_lian_tu_di_zheng', 'wei_guan_lian_fang_chan_zheng', 'wei_guan_lian_ju_zhi',
- 'jian_zhu_tu_di_ju_zhi_bu_dui_ying', 'tu_di_ju_zhi_bian_hao_bu_cun_zai',
- 'jian_zhu_ju_zhi_bian_hao_bu_cun_zai', 'jian_zhu_tu_di_biao_qian_hao_bu_cun_zai',
- 'dai_guan_lian_tu_di', 'dai_guan_lian_jian_zhu', 'ju_zhi_gte_three_tu_di', 'ju_zhi_gt_ten_tu_di',
- 'ju_zhi_gte_five_jian_zhu', 'ju_zhi_gte_ten_jian_zhu', 'tu_di_gte_seven_jian_zhu',
- 'tu_di_gte_ten_jian_zhu', 'tu_di_zheng_xia_ci_kong_bai', 'fang_chan_zheng_xia_ci_kong_bai',
- 'ju_zhi_di_duan_kong_bai', 'xian_zhi_jian_zhu_mian_ji', 'xian_zhi_tu_di_mian_ji',
- 'wu_jian_zhu_fei_xian_zhi_tu_di_mian_ji', 'tu_di_shi_yong_quan_qi_ta', 'year_month', 'second_unit',
- 'source']
- # 定义函数,根据二级单位和三级单位获取区域编号
- def get_area_no(x):
- second_unit = x['second_unit']
- third_unit = x['third_unit']
- if '长途局' in second_unit or '长途通信传输局' in second_unit or '机动局' in second_unit or '传输局' in second_unit:
- return '-11'
- if '保定' in second_unit and ('雄县' in third_unit or '容城' in third_unit or '安新' in third_unit):
- return '782'
- for second_org in second_orgs:
- area_name = second_org['name']
- area_no = second_org['id']
- if area_name in second_unit:
- return area_no
- return '-12'
- # 应用get_area_no函数,生成area_no列
- df['area_no'] = df.apply(get_area_no, axis=1)
- # 定义函数,根据区域编号获取区域名称
- def get_area_name(x):
- area_no = x['area_no']
- second_org = org_map[area_no]
- area_name = second_org['name']
- return area_name
- # 应用get_area_name函数,生成area_name列
- df['area_name'] = df.apply(get_area_name, axis=1)
- # 定义函数,根据三级单位、区域名称和区域编号获取城市编号
- def get_city_no(x):
- third_unit = x['third_unit']
- area_name = x['area_name']
- area_no = x['area_no']
- if area_name == '石家庄':
- if '矿区' in third_unit:
- return 'D0130185'
- if '井陉' in third_unit:
- return 'D0130121'
- if area_name == '秦皇岛':
- if '北戴河新区' in third_unit:
- return 'D0130185'
- if '北戴河' in third_unit:
- return 'D0130304'
- if area_name == '唐山':
- if '滦县' in third_unit:
- return 'D0130223'
- if '高新技术开发区' in third_unit:
- return 'D0130205'
- if area_name == '邢台':
- if '内丘' in third_unit:
- return 'D0130523'
- if '任泽' in third_unit:
- return 'D0130526'
- if area_name == '邯郸':
- if '峰峰' in third_unit:
- return 'D0130406'
- if area_name == '省机动局':
- if '沧州' in third_unit:
- return 'HECS180'
- if '唐山' in third_unit:
- return 'HECS181'
- if '秦皇岛' in third_unit:
- return 'HECS182'
- if '廊坊' in third_unit:
- return 'HECS183'
- if '张家口' in third_unit:
- return 'HECS184'
- if '邢台' in third_unit:
- return 'HECS185'
- if '邯郸' in third_unit:
- return 'HECS186'
- if '保定' in third_unit:
- return 'HECS187'
- if '石家庄' in third_unit:
- return 'HECS188'
- if '承德' in third_unit:
- return 'HECS189'
- if '衡水' in third_unit:
- return 'HECS720'
- if '雄安' in third_unit:
- return 'HECS728'
- return 'HECS018'
- if '雄安' == area_name:
- third_unit = third_unit.replace('雄安新区', '')
- third_org_list = third_org_list_map[area_no]
- for third_org in third_org_list:
- city_name = third_org['name']
- if city_name in third_unit:
- return third_org['id']
- if '沧州' == area_name:
- return 'D0130911'
- if '唐山' == area_name:
- return 'D0130202'
- if '秦皇岛' == area_name:
- return 'D0130302'
- if '廊坊' == area_name:
- return 'D0131000'
- if '张家口' == area_name:
- return 'D0130701'
- if '邢台' == area_name:
- return 'D0130502'
- if '邯郸' == area_name:
- return 'D0130402'
- if '保定' == area_name:
- return 'D0130601'
- if '石家庄' == area_name:
- return 'D0130186'
- if '承德' == area_name:
- return 'D0130801'
- if '衡水' == area_name:
- return 'D0133001'
- if '雄安' == area_name:
- return 'D0130830'
- return 'HE001'
- # 应用get_city_no函数,生成city_no列
- df['city_no'] = df.apply(get_city_no, axis=1)
- # 定义函数,根据城市编号获取城市名称
- def get_city_name(x):
- city_no = x['city_no']
- third_org = org_map[city_no]
- city_name = third_org['name']
- return city_name
- # 应用get_city_name函数,生成city_name列
- df['city_name'] = df.apply(get_city_name, axis=1)
- # 输出DataFrame的基本信息
- print(df.info())
- # 将处理后的数据保存为CSV文件
- df.to_csv(path_or_buf=output_path,
- index=False,
- encoding='utf-8-sig')
- def data_import():
- # 定义 PowerShell 脚本的路径
- script_path = r"../../copy.ps1"
- # 目标表和文件信息
- table = "house.abnormal_data" # 数据库目标表名
- # 表字段列名,用于指定导入数据的列顺序
- columns = "third_unit,wei_guan_lian_tu_di_zheng,wei_guan_lian_fang_chan_zheng,wei_guan_lian_ju_zhi,jian_zhu_tu_di_ju_zhi_bu_dui_ying,tu_di_ju_zhi_bian_hao_bu_cun_zai,jian_zhu_ju_zhi_bian_hao_bu_cun_zai,jian_zhu_tu_di_biao_qian_hao_bu_cun_zai,dai_guan_lian_tu_di,dai_guan_lian_jian_zhu,ju_zhi_gte_three_tu_di,ju_zhi_gt_ten_tu_di,ju_zhi_gte_five_jian_zhu,ju_zhi_gte_ten_jian_zhu,tu_di_gte_seven_jian_zhu,tu_di_gte_ten_jian_zhu,tu_di_zheng_xia_ci_kong_bai,fang_chan_zheng_xia_ci_kong_bai,ju_zhi_di_duan_kong_bai,xian_zhi_jian_zhu_mian_ji,xian_zhi_tu_di_mian_ji,wu_jian_zhu_fei_xian_zhi_tu_di_mian_ji,tu_di_shi_yong_quan_qi_ta,year_month,second_unit,source,area_no,area_name,city_no,city_name"
- # 构造执行 PowerShell 脚本的命令
- command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
- # 打印生成的命令,方便调试和日志记录
- logger.info("command: {}", command)
- # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
- completed_process = subprocess.run(
- command, # 执行的命令
- check=False, # 如果命令执行失败,不抛出异常
- text=True, # 将输出作为字符串处理
- capture_output=True, # 捕获标准输出和标准错误
- )
- # 打印命令执行的结果,包括返回码、标准输出和标准错误
- logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
- completed_process.stderr)
- # 定义正则表达式,用于匹配标准输出中的 COPY 结果
- p = re.compile(r"^(COPY) (\d+)$")
- count = None # 初始化计数变量
- matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
- if matcher:
- count = int(matcher.group(2)) # 提取导入的数据行数
- # 如果没有成功提取到导入数据的行数,抛出运行时异常
- if count is None:
- raise RuntimeError("导入数据失败")
- def upload_file():
- # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
- with paramiko.SSHClient() as ssh:
- # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- # 连接到远程主机,传入主机地址、端口、用户名和密码
- ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
- # 执行远程命令,创建远程目录(如果不存在)
- ssh.exec_command(f'mkdir -p {remote_dir_path}')
- # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
- with ssh.open_sftp() as sftp:
- for filename in os.listdir(input_dir):
- local_path = os.path.join(input_dir, filename)
- remote_path = f'{remote_dir_path}{filename}'
- if os.path.isfile(local_path):
- # 记录日志,提示即将上传的本地文件和远程目标路径
- logger.info("upload {} to {}", local_path, remote_path)
- # 使用SFTP的put方法将本地文件上传到远程主机
- sftp.put(local_path, remote_path)
- # 记录日志,提示文件已成功上传
- logger.info("uploaded {}", local_path)
- xls_to_xlsx()
- data_process()
- data_import()
- upload_file()
|