"""车辆租赁合同数据处理 """ import re import subprocess from datetime import datetime from dateutil.relativedelta import relativedelta from loguru import logger import pandas as pd import paramiko # 添加日志记录,将日志输出到文件 a.log logger.add(sink='a.log') ssh_hostname = '172.16.107.4' # 定义远程主机地址 ssh_port = 22 # 定义SSH服务的端口号 ssh_username = 'app' # 定义登录远程主机的用户名 ssh_password = '(l4w0ST_' # 定义登录远程主机的密码 # 服务器文件夹路径 remote_dir_path = '/data/history/car/zu-lin/' # 数据库连接信息 db_host = "172.16.107.5" # 数据库主机地址 db_port = 5432 # 数据库端口号 db_username = "finance" # 数据库用户名 db_password = "Finance@unicom23" # 数据库密码 dbname = "financialdb" # 数据库名称 # 数据文件路径 input_path = 'data.xlsx' # 输出文件路径 output_path = 'output.csv' def data_process(): # 正则表达式匹配车牌省份简称(如京、津、晋等) has_che_pai_province_pattern = re.compile( "[" + re.escape("京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁国防") + "]") # 正则表达式匹配非车牌字符,排除车牌可能包含的字符(如字母、数字、特殊标志等) not_che_pai_pattern = re.compile( "[^京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁新港澳学挂领试超练警国防A-Z\\d]") # 正则表达式匹配完整的车牌号格式 che_pai_pattern = re.compile( r"([京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z]" r"(([DF]((?![IO])[A-Z0-9](?![IO]))\d{4})|(\d{5}[DF]))|" r"[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z][A-Z0-9]{4}[A-Z0-9挂学警港澳])" ) # 定义二级行政区划映射表(地级市及其下属区县) er_ji_map = { "石家庄": ["鹿泉", "藁城", "栾城", "井陉矿区", "井陉", "无极", "正定", "元氏", "新乐", "晋州", "平山", "灵寿", "赞皇", "赵县", "行唐", "高邑", "辛集", "深泽"], "唐山": ["唐山高开区", "迁西", "海港", "开平", "丰南", "滦县", "乐亭", "丰润", "玉田", "古冶", "曹妃甸", "遵化", "滦南", "迁安"], "秦皇岛": ["北戴河新区", "北戴河", "山海关", "昌黎", "卢龙", "青龙", "抚宁"], "邯郸": ["曲周", "魏县", "馆陶", "磁县", "大名", "鸡泽", "成安", "涉县", "永年", "武安", "峰峰", "广平", "临漳", "邱县", "肥乡"], "邢台": ["新河", "南宫", "隆尧", "内邱", "平乡", "宁晋", "广宗", "清河", "临西", "任县", "巨鹿", "沙河", "威县", "临城", "柏乡", "南和"], "保定": ["涞水", "蠡县", "顺平", "博野", "安国", "涞源", "唐县", "定州", "高阳", "曲阳", "阜平", "清苑", "高碑店", "满城", "涿州", "易县", "望都", "徐水", "定兴", "白沟"], "张家口": ["张北", "崇礼", "康保", "赤城", "阳原", "万全", "下花园", "尚义", "怀安", "怀来", "蔚县", "涿鹿", "沽源", "宣化"], "承德": ["承德县", "兴隆", "宽城", "平泉", "营子", "隆化", "滦平", "围场", "丰宁", "双滦"], "廊坊": ["文安", "霸州", "大城", "廊坊开发区", "三河", "香河", "永清", "胜芳", "燕郊", "固安", "大厂"], "沧州": ["东光", "吴桥", "黄骅", "盐山", "孟村", "泊头", "献县", "南皮", "渤海新区", "海兴", "沧县", "河间", "青县", "任丘", "肃宁"], "衡水": ["景县", "阜城", "枣强", "深州", "饶阳", "故城", "武强", "武邑", "冀州", "安平"], "雄安": ["容城", "雄县", "安新"] } # 读取 Excel 文件中的数据 df = pd.read_excel(io=input_path) # 对需要清理的列进行字符串清理,移除多余的空白字符 df = df.map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x) df.columns = ["year_month", "raw_che_pai_hao_he_tong", "che_xing", "che_liang_suo_shu_dan_wei", "he_tong_ming_cheng", "he_tong_bian_hao", "jia_shui_he_ji_jin_e", "bu_han_shui_jin_e", "shui_e", "zu_qi", "raw_che_pai_hao_ti_huan", "ti_huan_nian_yue", "bei_zhu"] # 定义函数,用于提取并标准化车牌号 def get_che_pai(che_pai): # 如果车牌号为空或无效,则返回空字符串 if pd.isna(che_pai) or not che_pai or not che_pai.strip(): return "" # 将车牌号转换为大写 upper_case = che_pai.upper() # 移除车牌号中不符合规则的字符 s = not_che_pai_pattern.sub("", upper_case) # 使用正则表达式匹配合法的车牌号 m = che_pai_pattern.search(s) if m: return m.group(0) # 如果车牌号包含省份简称但未匹配成功,记录警告日志 if has_che_pai_province_pattern.search(che_pai): logger.warning(f"车牌匹配失败: {che_pai} -> {s}") return s # 如果完全无法匹配,记录警告日志并返回原车牌号 logger.warning(f"车牌匹配失败: {che_pai} -> {upper_case}") return upper_case # 定义函数,用于标记车牌号是否匹配失败 def che_pai_fail(che_pai): # 如果车牌号为空或无效,则标记为失败 if pd.isna(che_pai) or not che_pai or not che_pai.strip(): return "1" # 移除车牌号中不符合规则的字符 s = not_che_pai_pattern.sub("", che_pai.upper()) # 使用正则表达式匹配合法的车牌号 m = che_pai_pattern.search(s) if m: return "0" # 匹配成功 return "1" # 匹配失败 # 定义函数,用于提取一级单位 def get_first_unit(unit): # 如果单位为空或无效,则返回空字符串 if pd.isna(unit) or not unit or not unit.strip(): return "" # 根据单位名称中的关键词返回对应的一级单位 if "机动通信局" in unit or "机动局" in unit or "传输局" in unit or "线路维护中心" in unit: return "机动局" if "雄安基地建设部" in unit: return "雄安基地建设部" if "华北基地建设部" in unit: return "华北基地建设部" # 遍历 er_ji_map 的键,寻找匹配的一级单位 for yj in er_ji_map.keys(): if yj in unit: return yj return "省公司本部" # 默认返回省公司本部 # 结果列表 result_list = [] # 遍历每一行 for _, row in df.iterrows(): # 提取 year_month 并转换为整数 year_month = row['year_month'] year_month_integer = int(year_month) # 解析日期 local_date = datetime.strptime(str(year_month_integer), "%Y%m") # 获取原始车牌号并处理 raw_che_pai_hao_he_tong = row['raw_che_pai_hao_he_tong'] che_pai_hao_he_tong = get_che_pai(raw_che_pai_hao_he_tong) che_pai_he_tong_fail = che_pai_fail(raw_che_pai_hao_he_tong) # 获取替换车牌号并处理 raw_che_pai_hao_ti_huan = row['raw_che_pai_hao_ti_huan'] che_pai_hao_ti_huan = get_che_pai(raw_che_pai_hao_ti_huan) che_pai_ti_huan_fail = che_pai_fail(raw_che_pai_hao_ti_huan) # 获取车辆所属单位并提取第一个单位 che_liang_suo_shu_dan_wei = row['che_liang_suo_shu_dan_wei'] first_unit = get_first_unit(che_liang_suo_shu_dan_wei) # 获取租期并转换为整数 zu_qi = row['zu_qi'] zu_qi_int = int(zu_qi) # 获取替换年月并处理 ti_huan_nian_yue = row['ti_huan_nian_yue'] ti_huan_nian_yue_integer = int(ti_huan_nian_yue) if pd.notna(ti_huan_nian_yue) else None # 确定当前车牌号 che_pai_hao = ( che_pai_hao_ti_huan if ti_huan_nian_yue_integer and year_month_integer >= ti_huan_nian_yue_integer else che_pai_hao_he_tong ) # 日志记录(Python 中使用 print 模拟) if pd.notna(che_pai_hao_ti_huan) and che_pai_hao_ti_huan: logger.info("{} -> {} -> {} -> {}", che_pai_hao_he_tong, year_month_integer, ti_huan_nian_yue_integer, ti_huan_nian_yue_integer and year_month_integer >= ti_huan_nian_yue_integer) # 构造结果字典 result_dict = { **row.to_dict(), "che_pai_hao_he_tong": che_pai_hao_he_tong, "che_pai_he_tong_fail": che_pai_he_tong_fail, "che_pai_hao_ti_huan": che_pai_hao_ti_huan, "che_pai_ti_huan_fail": che_pai_ti_huan_fail, "first_unit": first_unit, "che_pai_hao": che_pai_hao, "year_no": local_date.strftime("%Y"), "month_no": local_date.strftime("%m"), } # 添加到结果列表 result_list.append(result_dict) next_local_date = local_date # 处理后续月份 for _ in range(1, zu_qi_int): next_local_date = next_local_date + relativedelta(months=1, day=1) next_month_integer = int(next_local_date.strftime("%Y%m")) # 确定当前车牌号 next_che_pai_hao = ( che_pai_hao_ti_huan if ti_huan_nian_yue_integer is not None and next_month_integer >= ti_huan_nian_yue_integer else che_pai_hao_he_tong ) # 构造后续月份的结果字典 next_result_dict = { **result_dict, "year_month": str(next_month_integer), "year_no": next_local_date.strftime("%Y"), "month_no": next_local_date.strftime("%m"), "che_pai_hao": next_che_pai_hao, } # 添加到结果列表 result_list.append(next_result_dict) # 转换为 DataFrame result_df = pd.DataFrame(result_list) result_df['ti_huan_nian_yue'] = result_df['ti_huan_nian_yue'].astype('Int64') # 打印DataFrame的信息 print(result_df.info()) # 将处理后的数据保存到CSV文件中 result_df.to_csv(path_or_buf=output_path, index=False, encoding='utf-8-sig') def data_import(): # 定义 PowerShell 脚本的路径 script_path = r"../../copy.ps1" # 目标表和文件信息 table = "car.car_zu_lin" # 数据库目标表名 # 表字段列名,用于指定导入数据的列顺序 columns = "year_month,raw_che_pai_hao_he_tong,che_xing,che_liang_suo_shu_dan_wei,he_tong_ming_cheng,he_tong_bian_hao,jia_shui_he_ji_jin_e,bu_han_shui_jin_e,shui_e,zu_qi,raw_che_pai_hao_ti_huan,ti_huan_nian_yue,bei_zhu,che_pai_hao_he_tong,che_pai_he_tong_fail,che_pai_hao_ti_huan,che_pai_ti_huan_fail,first_unit,che_pai_hao,year_no,month_no" # 构造执行 PowerShell 脚本的命令 command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}" # 打印生成的命令,方便调试和日志记录 logger.info("command: {}", command) # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出 completed_process = subprocess.run( command, # 执行的命令 check=False, # 如果命令执行失败,不抛出异常 text=True, # 将输出作为字符串处理 capture_output=True, # 捕获标准输出和标准错误 ) # 打印命令执行的结果,包括返回码、标准输出和标准错误 logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout, completed_process.stderr) # 定义正则表达式,用于匹配标准输出中的 COPY 结果 p = re.compile(r"^(COPY) (\d+)$") count = None # 初始化计数变量 matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果 if matcher: count = int(matcher.group(2)) # 提取导入的数据行数 # 如果没有成功提取到导入数据的行数,抛出运行时异常 if count is None: raise RuntimeError("导入数据失败") def upload_file(): # 获取当前日期,并计算上个月的第一天 today = datetime.today() start_date = today - relativedelta(months=1, day=1) remote_path = f'{remote_dir_path}{start_date.strftime('%Y%m')}.xlsx' # 定义远程主机的目标文件路径 # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文 with paramiko.SSHClient() as ssh: # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) # 连接到远程主机,传入主机地址、端口、用户名和密码 ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password) # 执行远程命令,创建远程目录(如果不存在) ssh.exec_command(f'mkdir -p {remote_dir_path}') # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文 with ssh.open_sftp() as sftp: # 记录日志,提示即将上传的本地文件和远程目标路径 logger.info("upload {} to {}", input_path, remote_path) # 使用SFTP的put方法将本地文件上传到远程主机 sftp.put(input_path, remote_path) # 记录日志,提示文件已成功上传 logger.info("uploaded {}", input_path) data_process() data_import() upload_file()