123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665 |
- """车辆过检数据处理
- """
- import re
- import subprocess
- from datetime import datetime
- from dateutil.relativedelta import relativedelta
- from loguru import logger
- import pandas as pd
- import psycopg
- import paramiko
- # 添加日志记录,将日志输出到文件 a.log
- logger.add(sink='a.log')
- ssh_hostname = '172.16.107.4' # 定义远程主机地址
- ssh_port = 22 # 定义SSH服务的端口号
- ssh_username = 'app' # 定义登录远程主机的用户名
- ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
- # 服务器文件夹路径
- remote_dir_path = '/data/history/car/guo-jian/'
- # 数据库连接信息
- db_host = "172.16.107.5" # 数据库主机地址
- db_port = 5432 # 数据库端口号
- db_username = "finance" # 数据库用户名
- db_password = "Finance@unicom23" # 数据库密码
- dbname = "financialdb" # 数据库名称
- conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
- # 获取当前日期,并计算上个月的第一天
- today = datetime.today()
- start_date = today - relativedelta(months=1, day=1)
- year_month = start_date.strftime('%Y%m')
- # 数据文件路径
- input_path = 'data.xlsx'
- # 输出文件路径
- output_path = 'output.csv'
- def data_process():
- # 正则表达式匹配车牌省份简称(如京、津、晋等)
- has_che_pai_province_pattern = re.compile(
- "[" + re.escape("京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁国防") + "]")
- # 正则表达式匹配非车牌字符,排除车牌可能包含的字符(如字母、数字、特殊标志等)
- not_che_pai_pattern = re.compile(
- "[^京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁新港澳学挂领试超练警国防A-Z\\d]")
- # 正则表达式匹配完整的车牌号格式
- che_pai_pattern = re.compile(
- r"([京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z]"
- r"(([DF]((?![IO])[A-Z0-9](?![IO]))\d{4})|(\d{5}[DF]))|"
- r"[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z][A-Z0-9]{4}[A-Z0-9挂学警港澳])"
- )
- # 定义二级行政区划映射表(地级市及其下属区县)
- er_ji_map = {
- "石家庄": ["鹿泉", "藁城", "栾城", "井陉矿区", "井陉", "无极", "正定", "元氏", "新乐", "晋州", "平山", "灵寿",
- "赞皇", "赵县", "行唐", "高邑", "辛集", "深泽"],
- "唐山": ["唐山高开区", "迁西", "海港", "开平", "丰南", "滦县", "乐亭", "丰润", "玉田", "古冶", "曹妃甸", "遵化",
- "滦南", "迁安"],
- "秦皇岛": ["北戴河新区", "北戴河", "山海关", "昌黎", "卢龙", "青龙", "抚宁"],
- "邯郸": ["曲周", "魏县", "馆陶", "磁县", "大名", "鸡泽", "成安", "涉县", "永年", "武安", "峰峰", "广平", "临漳",
- "邱县", "肥乡"],
- "邢台": ["新河", "南宫", "隆尧", "内邱", "平乡", "宁晋", "广宗", "清河", "临西", "任县", "巨鹿", "沙河", "威县",
- "临城", "柏乡", "南和"],
- "保定": ["涞水", "蠡县", "顺平", "博野", "安国", "涞源", "唐县", "定州", "高阳", "曲阳", "阜平", "清苑",
- "高碑店",
- "满城", "涿州", "易县", "望都", "徐水", "定兴", "白沟"],
- "张家口": ["张北", "崇礼", "康保", "赤城", "阳原", "万全", "下花园", "尚义", "怀安", "怀来", "蔚县", "涿鹿",
- "沽源",
- "宣化"],
- "承德": ["承德县", "兴隆", "宽城", "平泉", "营子", "隆化", "滦平", "围场", "丰宁", "双滦"],
- "廊坊": ["文安", "霸州", "大城", "廊坊开发区", "三河", "香河", "永清", "胜芳", "燕郊", "固安", "大厂"],
- "沧州": ["东光", "吴桥", "黄骅", "盐山", "孟村", "泊头", "献县", "南皮", "渤海新区", "海兴", "沧县", "河间",
- "青县",
- "任丘", "肃宁"],
- "衡水": ["景县", "阜城", "枣强", "深州", "饶阳", "故城", "武强", "武邑", "冀州", "安平"],
- "雄安": ["容城", "雄县", "安新"]
- }
- # 初始化组织结构映射表
- org_map = {}
- third_org_map = {}
- third_org_list_map = {}
- area_map = {}
- district_list_map = {}
- # 连接PostgreSQL数据库
- with psycopg.connect(
- conninfo=conn_info,
- row_factory=psycopg.rows.dict_row
- ) as conn:
- with conn.cursor() as curs:
- # 查询一级组织数据,并按order_num排序
- sql = """
- select * from common.organization where grade = 1 order by order_num
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- second_orgs = curs.fetchall()
- # 遍历一级组织数据,构建org_map和third_org_list_map
- for x in second_orgs:
- org_map[x['id']] = x
- third_org_list_map[x['id']] = []
- # 查询二级组织数据,并按parent_id和order_num排序
- sql = """
- select * from common.organization where grade = 2 order by parent_id, order_num
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- third_orgs = curs.fetchall()
- # 遍历二级组织数据,构建org_map、third_org_list_map和third_org_map
- for x in third_orgs:
- org_map[x['id']] = x
- third_org_list_map[x['parent_id']].append(x)
- third_org_map[x['id']] = x
- # 查询一级行政区划数据,并按area_id排序
- sql = """
- select * from common.area where area_grade = 1 order by area_id
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- cities = curs.fetchall()
- # 遍历一级行政区划数据,构建area_map
- for city in cities:
- area_map[city['area_id']] = city
- # 查询二级行政区划数据,并按parent_id和area_id排序
- sql = """
- select * from common.area where area_grade = 2 order by parent_id, area_id
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- districts = curs.fetchall()
- # 遍历二级行政区划数据,构建area_map和district_list_map
- for district in districts:
- area_map[district['area_id']] = district
- # 构建城市与区县的映射关系
- for city in cities:
- district_list_map[city['area_id']] = []
- for district in districts:
- if city['area_id'] == district['parent_id']:
- district_list_map[city['area_id']].append(district)
- # 读取 Excel 文件中的数据
- df = pd.read_excel(io=input_path)
- # 获取需要清理的列名列表,排除 "登记日期" 和 "年检时间" 列
- columns_to_clean = list(filter(lambda x: x not in ('登记日期', '年检时间'), df.columns))
- # 对需要清理的列进行字符串清理,移除多余的空白字符
- df[columns_to_clean] = df[columns_to_clean].map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
- df['账期'] = year_month
- # 保存原始单位和车牌号信息到新的列中
- df['原始一级单位'] = df['一级单位']
- df['原始二级单位'] = df['二级单位']
- df['原始三级单位'] = df['三级单位']
- df['原始车牌号'] = df['车牌号']
- # 定义函数,用于提取并标准化车牌号
- def get_che_pai(che_pai):
- # 如果车牌号为空或无效,则返回空字符串
- if pd.isna(che_pai) or not che_pai or not che_pai.strip():
- return ""
- # 将车牌号转换为大写
- upper_case = che_pai.upper()
- # 移除车牌号中不符合规则的字符
- s = not_che_pai_pattern.sub("", upper_case)
- # 使用正则表达式匹配合法的车牌号
- m = che_pai_pattern.search(s)
- if m:
- return m.group(0)
- # 如果车牌号包含省份简称但未匹配成功,记录警告日志
- if has_che_pai_province_pattern.search(che_pai):
- logger.warning(f"车牌匹配失败: {che_pai} -> {s}")
- return s
- # 如果完全无法匹配,记录警告日志并返回原车牌号
- logger.warning(f"车牌匹配失败: {che_pai} -> {upper_case}")
- return upper_case
- # 应用 get_che_pai 函数处理车牌号列
- df['车牌号'] = df['车牌号'].apply(get_che_pai)
- # 去重
- df.drop_duplicates(subset=['车牌号'], keep='last', inplace=True)
- # 定义函数,用于标记车牌号是否匹配失败
- def che_pai_fail(che_pai):
- # 如果车牌号为空或无效,则标记为失败
- if pd.isna(che_pai) or not che_pai or not che_pai.strip():
- return "1"
- # 移除车牌号中不符合规则的字符
- s = not_che_pai_pattern.sub("", che_pai.upper())
- # 使用正则表达式匹配合法的车牌号
- m = che_pai_pattern.search(s)
- if m:
- return "0" # 匹配成功
- return "1" # 匹配失败
- # 应用 che_pai_fail 函数生成车牌匹配失败标记列
- df['车牌匹配失败'] = df['车牌号'].apply(che_pai_fail)
- # 定义函数,用于提取一级单位
- def get_first_unit(unit):
- # 如果单位为空或无效,则返回空字符串
- if pd.isna(unit) or not unit or not unit.strip():
- return ""
- # 根据单位名称中的关键词返回对应的一级单位
- if "机动通信局" in unit or "机动局" in unit or "传输局" in unit or "线路维护中心" in unit:
- return "机动局"
- if "雄安基地建设部" in unit:
- return "雄安基地建设部"
- if "华北基地建设部" in unit:
- return "华北基地建设部"
- # 遍历 er_ji_map 的键,寻找匹配的一级单位
- for yj in er_ji_map.keys():
- if yj in unit:
- return yj
- return "省公司本部" # 默认返回省公司本部
- # 应用 get_first_unit 函数生成一级单位列
- df['一级单位'] = df['原始一级单位'].apply(get_first_unit)
- # 定义函数,用于提取二级单位
- def get_second_unit(x):
- # 获取一级单位和原始二级单位
- first_unit = str(x['一级单位']) if pd.notna(x['一级单位']) else ""
- unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
- # 如果二级单位为空或无效,则返回一级单位
- if not unit or not unit.strip():
- return first_unit
- # 如果一级单位是省公司本部,则返回省公司本部
- if first_unit == "省公司本部":
- return first_unit
- # 如果一级单位是机动局,则根据单位名称进一步细化
- if first_unit == "机动局":
- for yj in er_ji_map.keys():
- if yj in unit:
- return f"机动局{yj}"
- return "机动局本部"
- # 根据特定城市和关键词返回对应的二级单位
- if first_unit == "石家庄":
- if "开发区" in unit:
- return "石家庄开发区"
- if first_unit == "廊坊":
- if "开发区" in unit:
- return "廊坊开发区"
- if first_unit == "邢台":
- if "内丘" in unit:
- return "内邱"
- if "任泽" in unit:
- return "任县"
- if first_unit == "唐山":
- if "高开区" in unit:
- return "唐山高开区"
- if "滦州" in unit:
- return "滦县"
- # 根据 er_ji_map 获取二级单位
- ejs = er_ji_map.get(first_unit)
- if not ejs:
- return first_unit
- if first_unit == "雄安":
- unit = unit.replace("雄安新区", "")
- for ej in ejs:
- if ej in unit:
- return ej
- return f"{first_unit}本部" # 默认返回一级单位本部
- # 应用 get_second_unit 函数生成二级单位列
- df['二级单位'] = df.apply(get_second_unit, axis=1)
- # 定义函数,用于提取三级单位
- def get_third_unit(x):
- # 获取二级单位和原始三级单位
- second_unit = str(x['二级单位']) if pd.notna(x['二级单位']) else ""
- unit = str(x['原始三级单位']) if pd.notna(x['原始三级单位']) else ""
- # 如果三级单位为空或无效,则返回二级单位
- if not unit or not unit.strip():
- return second_unit
- # 按下划线分割三级单位名称
- a = unit.split("_")
- if len(a) == 1:
- return unit
- if len(a) < 4:
- return second_unit
- return a[3] # 返回分割后的第四个部分作为三级单位
- # 应用 get_third_unit 函数生成三级单位列
- df['三级单位'] = df.apply(get_third_unit, axis=1)
- # 定义一个函数,用于根据单位名称获取二级组织机构编码
- def get_area_no(unit):
- # 如果单位为空或无效,则返回空字符串
- if pd.isna(unit) or not unit or not unit.strip():
- return ""
- # 如果单位包含特定关键词(如“机动通信局”等),返回固定编码"-11"
- if any(keyword in unit for keyword in ["机动通信局", "机动局", "传输局", "线路维护中心"]):
- return "-11"
- # 如果单位包含特定关键词(如“省公司本部”等),返回固定编码"-12"
- if any(keyword in unit for keyword in ["省公司本部", "雄安基地建设部", "华北基地建设部"]):
- return "-12"
- # 遍历second_orgs列表,匹配单位名称并返回对应的id
- for second_org in second_orgs:
- if second_org.get('name') in unit:
- return second_org.get('id')
- # 如果未匹配到任何规则,返回默认编码"-12"
- return "-12"
- # 将get_area_no函数应用到DataFrame的'原始一级单位'列,生成'二级组织机构编码'列
- df['二级组织机构编码'] = df['原始一级单位'].apply(get_area_no)
- # 定义一个函数,用于根据组织机构编码获取组织机构名称
- def get_org_name(org_no):
- # 如果编码为空或无效,则返回空字符串
- if pd.isna(org_no) or not org_no or not org_no.strip():
- return ""
- # 在org_map中查找对应编码的组织机构信息,并返回其名称
- po = org_map.get(org_no)
- if po is not None:
- return po.get('name')
- return ""
- # 将get_org_name函数应用到'二级组织机构编码'列,生成'二级组织机构名称'列
- df['二级组织机构名称'] = df['二级组织机构编码'].apply(get_org_name)
- # 定义一个函数,用于根据行数据获取三级组织机构编码
- def get_city_no(x):
- # 获取相关字段值,如果为空则设置为""
- area_no = str(x['二级组织机构编码']) if pd.notna(x['二级组织机构编码']) else ""
- area_name = str(x['二级组织机构名称']) if pd.notna(x['二级组织机构名称']) else ""
- unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
- # 如果二级组织机构编码或名称为空,则返回""
- if not area_no or not area_name:
- return ""
- # 根据不同的二级组织机构名称和单位内容,返回对应的三级组织机构编码
- if area_name == "石家庄":
- if "井陉矿区" in unit:
- return "D0130185"
- if "井陉" in unit:
- return "D0130121"
- if area_name == "秦皇岛":
- if "北戴河新区" in unit:
- return "D0130325"
- if "北戴河" in unit:
- return "D0130304"
- if area_name == "邯郸":
- if "峰峰" in unit:
- return "D0130406"
- if area_name == "邢台":
- if "内丘" in unit:
- return "D0130523"
- if "任泽" in unit:
- return "D0130526"
- if area_name == "省机动局":
- if "沧州" in unit:
- return "HECS180"
- if "唐山" in unit:
- return "HECS181"
- if "秦皇岛" in unit:
- return "HECS182"
- if "廊坊" in unit:
- return "HECS183"
- if "张家口" in unit:
- return "HECS184"
- if "邢台" in unit:
- return "HECS185"
- if "邯郸" in unit:
- return "HECS186"
- if "保定" in unit:
- return "HECS187"
- if "石家庄" in unit:
- return "HECS188"
- if "承德" in unit:
- return "HECS189"
- if "衡水" in unit:
- return "HECS720"
- if "雄安" in unit:
- return "HECS728"
- return "HECS018"
- if area_name == "雄安":
- unit = unit.replace("雄安新区", "")
- l3 = third_org_list_map.get(area_no, [])
- for organization_po in l3:
- if organization_po.get('name') in unit:
- return organization_po.get('id')
- if area_name == "沧州":
- return "D0130911"
- if area_name == "唐山":
- return "D0130202"
- if area_name == "秦皇岛":
- return "D0130302"
- if area_name == "廊坊":
- return "D0131000"
- if area_name == "张家口":
- return "D0130701"
- if area_name == "邢台":
- return "D0130502"
- if area_name == "邯郸":
- return "D0130402"
- if area_name == "保定":
- return "D0130601"
- if area_name == "石家庄":
- return "D0130186"
- if area_name == "承德":
- return "D0130801"
- if area_name == "衡水":
- return "D0133001"
- if area_name == "雄安":
- return "D0130830"
- return "HE001"
- # 将get_city_no函数应用到DataFrame的每一行,生成'三级组织机构编码'列
- df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
- # 将get_org_name函数应用到'三级组织机构编码'列,生成'三级组织机构名称'列
- df['三级组织机构名称'] = df['三级组织机构编码'].apply(get_org_name)
- # 定义一个函数,用于根据行数据获取二级组织机构编码2
- def get_area_no2(x):
- # 获取相关字段值,如果为空则设置为""
- area_name = str(x['二级组织机构名称']) if pd.notna(x['二级组织机构名称']) else ""
- city_name = str(x['三级组织机构名称']) if pd.notna(x['三级组织机构名称']) else ""
- # 如果二级组织机构名称为空,则返回""
- if not area_name or not area_name.strip():
- return ""
- # 根据二级组织机构名称和三级组织机构名称的内容,返回对应的编码
- if area_name == "省机动局" and city_name and city_name.strip():
- if "沧州" in city_name:
- return "180"
- if "唐山" in city_name:
- return "181"
- if "秦皇岛" in city_name:
- return "182"
- if "廊坊" in city_name:
- return "183"
- if "张家口" in city_name:
- return "184"
- if "邢台" in city_name:
- return "185"
- if "邯郸" in city_name:
- return "186"
- if "保定" in city_name:
- return "187"
- if "石家庄" in city_name:
- return "188"
- if "承德" in city_name:
- return "189"
- if "衡水" in city_name:
- return "720"
- if "雄安" in city_name:
- return "782"
- if "沧州" in area_name:
- return "180"
- if "唐山" in area_name:
- return "181"
- if "秦皇岛" in area_name:
- return "182"
- if "廊坊" in area_name:
- return "183"
- if "张家口" in area_name:
- return "184"
- if "邢台" in area_name:
- return "185"
- if "邯郸" in area_name:
- return "186"
- if "保定" in area_name:
- return "187"
- if "石家庄" in area_name:
- return "188"
- if "承德" in area_name:
- return "189"
- if "衡水" in area_name:
- return "720"
- if "雄安" in area_name:
- return "782"
- return ""
- # 将get_area_no2函数应用到DataFrame的每一行,生成'二级组织机构编码2'列
- df['二级组织机构编码2'] = df.apply(get_area_no2, axis=1)
- # 将get_org_name函数应用到'二级组织机构编码2'列,生成'二级组织机构名称2'列
- df['二级组织机构名称2'] = df['二级组织机构编码2'].apply(get_org_name)
- # 定义一个函数,用于根据单位名称获取城市ID
- def get_city_id(unit):
- # 如果单位为空或无效,则返回""
- if pd.isna(unit) or not unit or not unit.strip():
- return ""
- # 遍历cities列表,匹配单位名称并返回对应的城市ID
- for city in cities:
- if city.get('short_name') and city['short_name'] in unit:
- return city.get('area_id', "")
- return ""
- # 将get_city_id函数应用到'原始一级单位'列,生成'city_id'列
- df['city_id'] = df['原始一级单位'].apply(get_city_id)
- # 定义一个函数,用于根据ID获取区域名称
- def get_area_name(id):
- # 如果ID为空或无效,则返回""
- if pd.isna(id) or not id or not id.strip():
- return ""
- # 在area_map中查找对应ID的区域信息,并返回其名称
- area_po = area_map.get(id)
- if area_po is not None:
- return area_po.get("area_name", "")
- return ""
- # 将get_area_name函数应用到'city_id'列,生成'city'列
- df['city'] = df['city_id'].apply(get_area_name)
- # 定义一个函数,用于根据行数据获取区县ID
- def get_district_id(x):
- # 获取相关字段值,如果为空则设置为""
- city_id = str(x['city_id']) if pd.notna(x['city_id']) else ""
- city = str(x['city']) if pd.notna(x['city']) else ""
- unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
- # 如果城市ID、城市名称或单位为空,则返回""
- if not city_id or not city or not unit:
- return ""
- # 根据城市名称和单位内容,返回对应的区县ID
- if city == "石家庄":
- if "井陉矿区" in unit:
- return "130107"
- if "井陉" in unit:
- return "130121"
- if city == "雄安":
- unit = unit.replace("雄安新区", "")
- districts = district_list_map.get(city_id)
- if not districts:
- return ""
- for district in districts:
- if district.get('short_name') in unit:
- return district.get('area_id')
- return ""
- # 将get_district_id函数应用到DataFrame的每一行,生成'district_id'列
- df['district_id'] = df.apply(get_district_id, axis=1)
- # 将get_area_name函数应用到'district_id'列,生成'district'列
- df['district'] = df['district_id'].apply(get_area_name)
- # 提取账期年份和月份信息
- df['year_no'] = df['账期'].apply(lambda x: None if pd.isna(x) else str(x)[:4])
- df['month_no'] = df['账期'].apply(lambda x: None if pd.isna(x) else str(x)[-2:])
- df['年检时间'] = df['年检时间'].apply(lambda x: None if pd.isna(x) else pd.to_datetime(x).strftime('%Y%m'))
- # 打印DataFrame的信息
- print(df.info())
- # 将处理后的数据保存到CSV文件中
- df.to_csv(path_or_buf=output_path,
- header=['year_month', 'che_pai_hao', 'che_xing', 'first_unit', 'second_unit', 'third_unit',
- 'deng_ji_ri_qi', 'nian_jian_shi_jian', 'shi_fou_guo_jian', 'shi_fou_ben_yue_ying_jian',
- 'ben_yue_shi_fou_nian_jian', 'raw_yi_ji', 'raw_er_ji', 'raw_san_ji', 'raw_che_pai_hao',
- 'che_pai_fail', 'area_no', 'area_name', 'city_no', 'city_name', 'area_no2', 'area_name2',
- 'city_id', 'city', 'district_id', 'district', 'year_no', 'month_no'],
- index=False,
- encoding='utf-8-sig')
- def data_import():
- # 定义 PowerShell 脚本的路径
- script_path = r"../../copy.ps1"
- # 目标表和文件信息
- table = "car.car_guo_jian" # 数据库目标表名
- # 表字段列名,用于指定导入数据的列顺序
- columns = "year_month,che_pai_hao,che_xing,first_unit,second_unit,third_unit,deng_ji_ri_qi,nian_jian_shi_jian,shi_fou_guo_jian,shi_fou_ben_yue_ying_jian,ben_yue_shi_fou_nian_jian,raw_yi_ji,raw_er_ji,raw_san_ji,raw_che_pai_hao,che_pai_fail,area_no,area_name,city_no,city_name,area_no2,area_name2,city_id,city,district_id,district,year_no,month_no"
- # 构造执行 PowerShell 脚本的命令
- command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
- # 打印生成的命令,方便调试和日志记录
- logger.info("command: {}", command)
- # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
- completed_process = subprocess.run(
- command, # 执行的命令
- check=False, # 如果命令执行失败,不抛出异常
- text=True, # 将输出作为字符串处理
- capture_output=True, # 捕获标准输出和标准错误
- )
- # 打印命令执行的结果,包括返回码、标准输出和标准错误
- logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout, completed_process.stderr)
- # 定义正则表达式,用于匹配标准输出中的 COPY 结果
- p = re.compile(r"^(COPY) (\d+)$")
- count = None # 初始化计数变量
- matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
- if matcher:
- count = int(matcher.group(2)) # 提取导入的数据行数
- # 如果没有成功提取到导入数据的行数,抛出运行时异常
- if count is None:
- raise RuntimeError("导入数据失败")
- def upload_file():
- remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
- # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
- with paramiko.SSHClient() as ssh:
- # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
- ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- # 连接到远程主机,传入主机地址、端口、用户名和密码
- ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
- # 执行远程命令,创建远程目录(如果不存在)
- ssh.exec_command(f'mkdir -p {remote_dir_path}')
- # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
- with ssh.open_sftp() as sftp:
- # 记录日志,提示即将上传的本地文件和远程目标路径
- logger.info("upload {} to {}", input_path, remote_path)
- # 使用SFTP的put方法将本地文件上传到远程主机
- sftp.put(input_path, remote_path)
- # 记录日志,提示文件已成功上传
- logger.info("uploaded {}", input_path)
- def data_update():
- with psycopg.connect(
- conninfo=conn_info,
- ) as conn:
- with conn.cursor() as curs:
- # 插入过检
- sql = f"""
- insert
- into
- car_theme.wz_f_un_annual_inspectionleased_vehicles_details
- (
- statistical_month,
- card_num,
- car_brand,
- city,
- dpt_sec,
- grid,
- enable_date,
- passed_inspection,
- should_inspection_be_conducted_this_month,
- annual_inspection_this_month,
- inspect_annually_date
- )
- select
- year_month as statistical_month,
- che_pai_hao as card_num,
- che_xing as car_brand,
- first_unit as city,
- second_unit as dpt_sec,
- third_unit as grid,
- deng_ji_ri_qi as enable_date,
- shi_fou_guo_jian as passed_inspection,
- shi_fou_ben_yue_ying_jian as should_inspection_be_conducted_this_month,
- ben_yue_shi_fou_nian_jian as annual_inspection_this_month,
- nian_jian_shi_jian as inspect_annually_date
- from
- car.car_guo_jian
- where
- year_month = {year_month}
- """
- logger.info(f"sql: {sql}")
- curs.execute(sql)
- logger.info(f"update {curs.rowcount}")
- data_process()
- data_import()
- upload_file()
- data_update()
|