car_wei_zhang.py 33 KB


  1. """车辆违章数据处理
  2. """
  3. import re
  4. import subprocess
  5. from datetime import datetime
  6. from dateutil.relativedelta import relativedelta
  7. from loguru import logger
  8. import pandas as pd
  9. import psycopg
  10. import paramiko
  11. # 添加日志记录,将日志输出到文件 a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/car/wei-zhang/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. # 正则表达式匹配车牌省份简称(如京、津、晋等)
  36. has_che_pai_province_pattern = re.compile(
  37. "[" + re.escape("京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁国防") + "]")
  38. # 正则表达式匹配非车牌字符,排除车牌可能包含的字符(如字母、数字、特殊标志等)
  39. not_che_pai_pattern = re.compile(
  40. "[^京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁新港澳学挂领试超练警国防A-Z\\d]")
  41. # 正则表达式匹配完整的车牌号格式
  42. che_pai_pattern = re.compile(
  43. r"([京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z]"
  44. r"(([DF]((?![IO])[A-Z0-9](?![IO]))\d{4})|(\d{5}[DF]))|"
  45. r"[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z][A-Z0-9]{4}[A-Z0-9挂学警港澳])"
  46. )
  47. # 一级单位字典
  48. first_unit_map = {
  49. "保定市分公司": "保定",
  50. "沧州市分公司": "沧州",
  51. "承德市分公司": "承德",
  52. "邯郸市分公司": "邯郸",
  53. "河北省分公司线路维护中心": "机动局",
  54. "河北省机动通信局": "机动局",
  55. "衡水市分公司": "衡水",
  56. "华北基地建设部": "华北基地建设部",
  57. "廊坊市分公司": "廊坊",
  58. "秦皇岛市分公司": "秦皇岛",
  59. "省公司本部": "省公司本部",
  60. "石家庄市分公司": "石家庄",
  61. "唐山市分公司": "唐山",
  62. "邢台市分公司": "邢台",
  63. "雄安基地建设部": "雄安基地建设部",
  64. "雄安新区分公司": "雄安",
  65. "张家口市分公司": "张家口"
  66. }
  67. # 定义二级行政区划映射表(地级市及其下属区县)
  68. er_ji_map = {
  69. "石家庄": ["鹿泉", "藁城", "栾城", "井陉矿区", "井陉", "无极", "正定", "元氏", "新乐", "晋州", "平山", "灵寿",
  70. "赞皇", "赵县", "行唐", "高邑", "辛集", "深泽"],
  71. "唐山": ["唐山高开区", "迁西", "海港", "开平", "丰南", "滦县", "乐亭", "丰润", "玉田", "古冶", "曹妃甸", "遵化",
  72. "滦南", "迁安"],
  73. "秦皇岛": ["北戴河新区", "北戴河", "山海关", "昌黎", "卢龙", "青龙", "抚宁"],
  74. "邯郸": ["曲周", "魏县", "馆陶", "磁县", "大名", "鸡泽", "成安", "涉县", "永年", "武安", "峰峰", "广平", "临漳",
  75. "邱县", "肥乡"],
  76. "邢台": ["新河", "南宫", "隆尧", "内邱", "平乡", "宁晋", "广宗", "清河", "临西", "任县", "巨鹿", "沙河", "威县",
  77. "临城", "柏乡", "南和"],
  78. "保定": ["涞水", "蠡县", "顺平", "博野", "安国", "涞源", "唐县", "定州", "高阳", "曲阳", "阜平", "清苑",
  79. "高碑店",
  80. "满城", "涿州", "易县", "望都", "徐水", "定兴", "白沟"],
  81. "张家口": ["张北", "崇礼", "康保", "赤城", "阳原", "万全", "下花园", "尚义", "怀安", "怀来", "蔚县", "涿鹿",
  82. "沽源",
  83. "宣化"],
  84. "承德": ["承德县", "兴隆", "宽城", "平泉", "营子", "隆化", "滦平", "围场", "丰宁", "双滦"],
  85. "廊坊": ["文安", "霸州", "大城", "廊坊开发区", "三河", "香河", "永清", "胜芳", "燕郊", "固安", "大厂"],
  86. "沧州": ["东光", "吴桥", "黄骅", "盐山", "孟村", "泊头", "献县", "南皮", "渤海新区", "海兴", "沧县", "河间",
  87. "青县",
  88. "任丘", "肃宁"],
  89. "衡水": ["景县", "阜城", "枣强", "深州", "饶阳", "故城", "武强", "武邑", "冀州", "安平"],
  90. "雄安": ["容城", "雄县", "安新"]
  91. }
  92. # 初始化组织结构映射表
  93. org_map = {}
  94. third_org_map = {}
  95. third_org_list_map = {}
  96. area_map = {}
  97. district_list_map = {}
  98. # 连接PostgreSQL数据库
  99. with psycopg.connect(
  100. conninfo=conn_info,
  101. row_factory=psycopg.rows.dict_row
  102. ) as conn:
  103. with conn.cursor() as curs:
  104. # 查询一级组织数据,并按order_num排序
  105. sql = """
  106. select * from common.organization where grade = 1 order by order_num
  107. """
  108. logger.info(f"sql: {sql}")
  109. curs.execute(sql)
  110. second_orgs = curs.fetchall()
  111. # 遍历一级组织数据,构建org_map和third_org_list_map
  112. for x in second_orgs:
  113. org_map[x['id']] = x
  114. third_org_list_map[x['id']] = []
  115. # 查询二级组织数据,并按parent_id和order_num排序
  116. sql = """
  117. select * from common.organization where grade = 2 order by parent_id, order_num
  118. """
  119. logger.info(f"sql: {sql}")
  120. curs.execute(sql)
  121. third_orgs = curs.fetchall()
  122. # 遍历二级组织数据,构建org_map、third_org_list_map和third_org_map
  123. for x in third_orgs:
  124. org_map[x['id']] = x
  125. third_org_list_map[x['parent_id']].append(x)
  126. third_org_map[x['id']] = x
  127. # 查询一级行政区划数据,并按area_id排序
  128. sql = """
  129. select * from common.area where area_grade = 1 order by area_id
  130. """
  131. logger.info(f"sql: {sql}")
  132. curs.execute(sql)
  133. cities = curs.fetchall()
  134. # 遍历一级行政区划数据,构建area_map
  135. for city in cities:
  136. area_map[city['area_id']] = city
  137. # 查询二级行政区划数据,并按parent_id和area_id排序
  138. sql = """
  139. select * from common.area where area_grade = 2 order by parent_id, area_id
  140. """
  141. logger.info(f"sql: {sql}")
  142. curs.execute(sql)
  143. districts = curs.fetchall()
  144. # 遍历二级行政区划数据,构建area_map和district_list_map
  145. for district in districts:
  146. area_map[district['area_id']] = district
  147. # 构建城市与区县的映射关系
  148. for city in cities:
  149. district_list_map[city['area_id']] = []
  150. for district in districts:
  151. if city['area_id'] == district['parent_id']:
  152. district_list_map[city['area_id']].append(district)
  153. # 读取 Excel 文件中的数据
  154. df = pd.read_excel(io=input_path)
  155. # 获取当前 DataFrame 的列名列表
  156. columns = df.columns.tolist()
  157. # 定义所需的字段列表
  158. required_columns = ['序号', '车牌号', '一级单位', '二级单位', '三级单位', '车架号', '违章时间', '违章地点',
  159. '违章详情', '扣分', '罚款', '三方处理状态', '处理时间', '违章未处理时长(天)']
  160. # 检查是否有缺失的字段
  161. missing_columns = [col for col in required_columns if col not in columns]
  162. # 检查是否有多余的字段
  163. ex_columns = [col for col in columns if col not in required_columns]
  164. # 如果存在缺失字段,则抛出运行时错误并提示缺少哪些字段
  165. if missing_columns or ex_columns:
  166. raise RuntimeError(f"缺少以下字段: {missing_columns};存在以下多余字段:{ex_columns}")
  167. # 获取需要清理的列名列表,排除 "违章时间" 和 "处理时间" 列
  168. columns_to_clean = list(filter(lambda x: x not in ('违章时间', '处理时间'), df.columns))
  169. # 对需要清理的列进行字符串清理,移除多余的空白字符
  170. df[columns_to_clean] = df[columns_to_clean].map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
  171. df['year_month'] = year_month
  172. # 保存原始单位和车牌号信息到新的列中
  173. df['raw_yi_ji'] = df['一级单位']
  174. df['raw_er_ji'] = df['二级单位']
  175. df['raw_san_ji'] = df['三级单位']
  176. df['raw_che_pai_hao'] = df['车牌号']
  177. # 定义函数,用于提取并标准化车牌号
  178. def get_che_pai(che_pai):
  179. # 如果车牌号为空或无效,则返回空字符串
  180. if pd.isna(che_pai) or not che_pai or not che_pai.strip():
  181. return ""
  182. # 将车牌号转换为大写
  183. upper_case = che_pai.upper()
  184. # 移除车牌号中不符合规则的字符
  185. s = not_che_pai_pattern.sub("", upper_case)
  186. # 使用正则表达式匹配合法的车牌号
  187. m = che_pai_pattern.search(s)
  188. if m:
  189. return m.group(0)
  190. # 如果车牌号包含省份简称但未匹配成功,记录警告日志
  191. if has_che_pai_province_pattern.search(che_pai):
  192. logger.warning(f"车牌匹配失败: {che_pai} -> {s}")
  193. return s
  194. # 如果完全无法匹配,记录警告日志并返回原车牌号
  195. logger.warning(f"车牌匹配失败: {che_pai} -> {upper_case}")
  196. return upper_case
  197. # 应用 get_che_pai 函数处理车牌号列
  198. df['车牌号'] = df['车牌号'].apply(get_che_pai)
  199. # 定义函数,用于标记车牌号是否匹配失败
  200. def che_pai_fail(che_pai):
  201. # 如果车牌号为空或无效,则标记为失败
  202. if pd.isna(che_pai) or not che_pai or not che_pai.strip():
  203. return "1"
  204. # 移除车牌号中不符合规则的字符
  205. s = not_che_pai_pattern.sub("", che_pai.upper())
  206. # 使用正则表达式匹配合法的车牌号
  207. m = che_pai_pattern.search(s)
  208. if m:
  209. return "0" # 匹配成功
  210. return "1" # 匹配失败
  211. # 应用 che_pai_fail 函数生成车牌匹配失败标记列
  212. df['che_pai_fail'] = df['车牌号'].apply(che_pai_fail)
  213. # 获取一级单位
  214. def get_first_unit(x):
  215. raw_che_pai = x['raw_che_pai_hao']
  216. raw_first_unit = str(x['raw_yi_ji']) if pd.notna(x['raw_yi_ji']) else ""
  217. if not raw_first_unit or not raw_first_unit.strip():
  218. raise RuntimeError(f"一级单位为空:{raw_che_pai}")
  219. if raw_first_unit in first_unit_map:
  220. return first_unit_map.get(raw_first_unit)
  221. raise RuntimeError(f"一级单位匹配失败:{raw_che_pai} {raw_first_unit}")
  222. # 应用 get_first_unit 函数生成一级单位列
  223. df['一级单位'] = df.apply(get_first_unit, axis=1)
  224. # 获取二级单位
  225. def get_second_unit(x):
  226. first_unit = str(x['一级单位']) if pd.notna(x['一级单位']) else ""
  227. raw_second_unit = str(x['raw_er_ji']) if pd.notna(x['raw_er_ji']) else ""
  228. if first_unit in ["华北基地建设部", "雄安基地建设部", "省公司本部"]:
  229. return first_unit
  230. if not raw_second_unit or not raw_second_unit.strip():
  231. return f"{first_unit}本部"
  232. if first_unit == "机动局":
  233. for yj in er_ji_map.keys():
  234. if yj in raw_second_unit:
  235. return f"机动局{yj}"
  236. return "机动局本部"
  237. if first_unit == "石家庄":
  238. if "开发区" in raw_second_unit:
  239. return "石家庄开发区"
  240. if first_unit == "廊坊":
  241. if "开发区" in raw_second_unit:
  242. return "廊坊开发区"
  243. if first_unit == "邢台":
  244. if "内丘" in raw_second_unit:
  245. return "内邱"
  246. if "任泽" in raw_second_unit:
  247. return "任县"
  248. if first_unit == "唐山":
  249. if "高开区" in raw_second_unit:
  250. return "唐山高开区"
  251. if "滦州" in raw_second_unit:
  252. return "滦县"
  253. ejs = er_ji_map.get(first_unit, [])
  254. if first_unit == "雄安":
  255. raw_second_unit = raw_second_unit.replace("雄安新区", "")
  256. for ej in ejs:
  257. if ej in raw_second_unit:
  258. return ej
  259. return f"{first_unit}本部"
  260. # 应用 get_second_unit 函数生成二级单位列
  261. df['二级单位'] = df.apply(get_second_unit, axis=1)
  262. # 获取三级单位
  263. def get_third_unit(x):
  264. second_unit = str(x['二级单位']) if pd.notna(x['二级单位']) else ""
  265. raw_third_unit = str(x['raw_san_ji']) if pd.notna(x['raw_san_ji']) else ""
  266. return raw_third_unit if raw_third_unit and raw_third_unit.strip() else second_unit
  267. # 应用 get_third_unit 函数生成三级单位列
  268. df['三级单位'] = df.apply(get_third_unit, axis=1)
  269. # 定义一个函数,用于根据单位名称获取二级组织机构编码
  270. def get_area_no(first_unit):
  271. if first_unit == "机动局":
  272. return "-11"
  273. if first_unit in ["省公司本部", "雄安基地建设部", "华北基地建设部"]:
  274. return "-12"
  275. for second_org in second_orgs:
  276. if second_org.get('name') in first_unit:
  277. return second_org.get('id')
  278. return ''
  279. df['area_no'] = df['一级单位'].apply(get_area_no)
  280. # 用于根据组织机构编码获取组织机构名称
  281. def get_org_name(x):
  282. org_no = str(x) if pd.notna(x) else ''
  283. # 如果编码为空或无效,则返回空字符串
  284. if not org_no or not org_no.strip():
  285. return ''
  286. # 在org_map中查找对应编码的组织机构信息,并返回其名称
  287. po = org_map.get(org_no)
  288. if po is not None:
  289. return po.get('name')
  290. return ''
  291. # 将get_org_name函数应用到'area_no'列,生成'area_name'列
  292. df['area_name'] = df['area_no'].apply(get_org_name)
  293. # 定义一个函数,用于根据行数据获取三级组织机构编码
  294. def get_city_no(x):
  295. # 获取相关字段值,如果为空则设置为""
  296. area_no = str(x['area_no']) if pd.notna(x['area_no']) else ""
  297. area_name = str(x['area_name']) if pd.notna(x['area_name']) else ""
  298. first_unit = str(x['一级单位']) if pd.notna(x['一级单位']) else ""
  299. second_unit = str(x['raw_er_ji']) if pd.notna(x['raw_er_ji']) else ""
  300. if not area_no or not area_no.strip() or not area_name or not area_name.strip():
  301. return ''
  302. if '华北基地建设部' == first_unit:
  303. return 'HE018'
  304. if '雄安基地建设部' == first_unit:
  305. return 'HE019'
  306. if second_unit and second_unit.strip():
  307. if area_name == "石家庄":
  308. if "井陉矿区" in second_unit:
  309. return "D0130185"
  310. if "井陉" in second_unit:
  311. return "D0130121"
  312. if area_name == "秦皇岛":
  313. if "北戴河新区" in second_unit:
  314. return "D0130325"
  315. if "北戴河" in second_unit:
  316. return "D0130304"
  317. if area_name == "邯郸":
  318. if "峰峰" in second_unit:
  319. return "D0130406"
  320. if area_name == "邢台":
  321. if "内丘" in second_unit:
  322. return "D0130523"
  323. if "任泽" in second_unit:
  324. return "D0130526"
  325. if area_name == "省机动局":
  326. if "沧州" in second_unit:
  327. return "HECS180"
  328. if "唐山" in second_unit:
  329. return "HECS181"
  330. if "秦皇岛" in second_unit:
  331. return "HECS182"
  332. if "廊坊" in second_unit:
  333. return "HECS183"
  334. if "张家口" in second_unit:
  335. return "HECS184"
  336. if "邢台" in second_unit:
  337. return "HECS185"
  338. if "邯郸" in second_unit:
  339. return "HECS186"
  340. if "保定" in second_unit:
  341. return "HECS187"
  342. if "石家庄" in second_unit:
  343. return "HECS188"
  344. if "承德" in second_unit:
  345. return "HECS189"
  346. if "衡水" in second_unit:
  347. return "HECS720"
  348. if "雄安" in second_unit:
  349. return "HECS728"
  350. return "HECS018"
  351. if area_name == "雄安":
  352. second_unit = second_unit.replace("雄安新区", "")
  353. l3 = third_org_list_map.get(area_no, [])
  354. for organization_po in l3:
  355. if organization_po.get('name') in second_unit:
  356. return organization_po.get('id')
  357. if area_name == '省本部':
  358. return 'HE001'
  359. if area_name == "省机动局":
  360. return "HECS018"
  361. if area_name == "沧州":
  362. return "D0130911"
  363. if area_name == "唐山":
  364. return "D0130202"
  365. if area_name == "秦皇岛":
  366. return "D0130302"
  367. if area_name == "廊坊":
  368. return "D0131000"
  369. if area_name == "张家口":
  370. return "D0130701"
  371. if area_name == "邢台":
  372. return "D0130502"
  373. if area_name == "邯郸":
  374. return "D0130402"
  375. if area_name == "保定":
  376. return "D0130601"
  377. if area_name == "石家庄":
  378. return "D0130186"
  379. if area_name == "承德":
  380. return "D0130801"
  381. if area_name == "衡水":
  382. return "D0133001"
  383. if area_name == "雄安":
  384. return "D0130830"
  385. return ''
  386. # 将get_city_no函数应用到DataFrame的每一行,生成'city_no'列
  387. df['city_no'] = df.apply(get_city_no, axis=1)
  388. # 将get_org_name函数应用到'city_no'列,生成'city_name'列
  389. df['city_name'] = df['city_no'].apply(get_org_name)
  390. # 定义一个函数,用于根据行数据获取二级组织机构编码2
  391. def get_area_no2(x):
  392. # 获取相关字段值,如果为空则设置为""
  393. area_name = str(x['area_name']) if pd.notna(x['area_name']) else ""
  394. city_name = str(x['city_name']) if pd.notna(x['city_name']) else ""
  395. if not area_name or not area_name.strip() or '省本部' == area_name:
  396. return ''
  397. # 根据二级组织机构名称和三级组织机构名称的内容,返回对应的编码
  398. if area_name == "省机动局" and city_name and city_name.strip():
  399. if "沧州" in city_name:
  400. return "180"
  401. if "唐山" in city_name:
  402. return "181"
  403. if "秦皇岛" in city_name:
  404. return "182"
  405. if "廊坊" in city_name:
  406. return "183"
  407. if "张家口" in city_name:
  408. return "184"
  409. if "邢台" in city_name:
  410. return "185"
  411. if "邯郸" in city_name:
  412. return "186"
  413. if "保定" in city_name:
  414. return "187"
  415. if "石家庄" in city_name:
  416. return "188"
  417. if "承德" in city_name:
  418. return "189"
  419. if "衡水" in city_name:
  420. return "720"
  421. if "雄安" in city_name:
  422. return "782"
  423. return ''
  424. if "沧州" in area_name:
  425. return "180"
  426. if "唐山" in area_name:
  427. return "181"
  428. if "秦皇岛" in area_name:
  429. return "182"
  430. if "廊坊" in area_name:
  431. return "183"
  432. if "张家口" in area_name:
  433. return "184"
  434. if "邢台" in area_name:
  435. return "185"
  436. if "邯郸" in area_name:
  437. return "186"
  438. if "保定" in area_name:
  439. return "187"
  440. if "石家庄" in area_name:
  441. return "188"
  442. if "承德" in area_name:
  443. return "189"
  444. if "衡水" in area_name:
  445. return "720"
  446. if "雄安" in area_name:
  447. return "782"
  448. return ""
  449. # 将get_area_no2函数应用到DataFrame的每一行,生成'area_no2'列
  450. df['area_no2'] = df.apply(get_area_no2, axis=1)
  451. # 将get_org_name函数应用到'area_no2'列,生成'area_name2'列
  452. df['area_name2'] = df['area_no2'].apply(get_org_name)
  453. # 获取城市ID
  454. def get_city_id(x):
  455. raw_first_unit = str(x['raw_yi_ji']) if pd.notna(x['raw_yi_ji']) else ""
  456. raw_second_unit = str(x['raw_er_ji']) if pd.notna(x['raw_er_ji']) else ""
  457. raw_third_unit = str(x['raw_san_ji']) if pd.notna(x['raw_san_ji']) else ""
  458. unit = f"{raw_first_unit}_{raw_second_unit}_{raw_third_unit}"
  459. if not unit or not unit.strip():
  460. return ''
  461. # 遍历cities列表,匹配单位名称并返回对应的城市ID
  462. for city in cities:
  463. if city.get('short_name') in unit:
  464. return city.get('area_id', '')
  465. return ''
  466. df['city_id'] = df.apply(get_city_id, axis=1)
  467. # 定义一个函数,用于根据ID获取区域名称
  468. def get_area_name(x):
  469. id = str(x) if pd.notna(x) else ""
  470. if not id or not id.strip():
  471. return ""
  472. # 在area_map中查找对应ID的区域信息,并返回其名称
  473. area_po = area_map.get(id)
  474. if area_po is not None:
  475. return area_po.get("area_name", "")
  476. return ""
  477. # 将get_area_name函数应用到'city_id'列,生成'city'列
  478. df['city'] = df['city_id'].apply(get_area_name)
  479. # 定义一个函数,用于根据行数据获取区县ID
  480. def get_district_id(x):
  481. # 获取相关字段值,如果为空则设置为""
  482. city_id = str(x['city_id']) if pd.notna(x['city_id']) else ""
  483. city = str(x['city']) if pd.notna(x['city']) else ""
  484. raw_first_unit = str(x['raw_yi_ji']) if pd.notna(x['raw_yi_ji']) else ""
  485. raw_second_unit = str(x['raw_er_ji']) if pd.notna(x['raw_er_ji']) else ""
  486. raw_third_unit = str(x['raw_san_ji']) if pd.notna(x['raw_san_ji']) else ""
  487. unit = f"{raw_first_unit}_{raw_second_unit}_{raw_third_unit}"
  488. # 如果城市ID、城市名称或单位为空,则返回""
  489. if not city_id or not city_id.strip() or not city or not city.strip() or not unit or not unit.strip():
  490. return ""
  491. # 根据城市名称和单位内容,返回对应的区县ID
  492. if city == "石家庄":
  493. if "井陉矿区" in unit:
  494. return "130107"
  495. if "井陉" in unit:
  496. return "130121"
  497. if city == "雄安":
  498. unit = unit.replace("雄安新区", "")
  499. districts = district_list_map.get(city_id, [])
  500. for district in districts:
  501. if district.get('short_name') in unit:
  502. return district.get('area_id')
  503. return ""
  504. df['district_id'] = df.apply(get_district_id, axis=1)
  505. # 将get_area_name函数应用到'district_id'列,生成'district'列
  506. df['district'] = df['district_id'].apply(get_area_name)
  507. # 提取账期年份和月份信息
  508. df['year_no'] = df['year_month'].apply(lambda x: None if pd.isna(x) else str(x)[:4])
  509. df['month_no'] = df['year_month'].apply(lambda x: None if pd.isna(x) else str(x)[-2:])
  510. # 格式化违章时间和处理时间为年月格式
  511. df['wei_zhang_nian_yue'] = df['违章时间'].apply(lambda x: None if pd.isna(x) else pd.to_datetime(x).strftime('%Y%m'))
  512. df['chu_li_nian_yue'] = df['处理时间'].apply(lambda x: None if pd.isna(x) else pd.to_datetime(x).strftime('%Y%m'))
  513. df.rename(
  514. columns={'车牌号': 'che_pai_hao', '一级单位': 'first_unit', '二级单位': 'second_unit', '三级单位': 'third_unit',
  515. '车架号': 'che_jia_hao', '违章时间': 'wei_zhang_shi_jian', '违章地点': 'wei_zhang_di_dian',
  516. '违章详情': 'wei_zhang_xiang_qing', '扣分': 'kou_fen', '罚款': 'fa_kuan',
  517. '三方处理状态': 'chu_li_zhuang_tai', '处理时间': 'chu_li_shi_jian',
  518. '违章未处理时长(天)': 'wei_zhang_wei_chu_li_shi_chang'}, inplace=True)
  519. df = df[['year_month', 'che_pai_hao', 'first_unit', 'second_unit', 'third_unit', 'che_jia_hao',
  520. 'wei_zhang_shi_jian', 'wei_zhang_di_dian', 'wei_zhang_xiang_qing', 'kou_fen', 'fa_kuan',
  521. 'chu_li_zhuang_tai', 'chu_li_shi_jian', 'wei_zhang_wei_chu_li_shi_chang', 'raw_yi_ji', 'raw_er_ji',
  522. 'raw_san_ji', 'raw_che_pai_hao', 'che_pai_fail', 'area_no', 'area_name', 'city_no', 'city_name',
  523. 'area_no2', 'area_name2', 'city_id', 'city', 'district_id', 'district', 'year_no', 'month_no',
  524. 'wei_zhang_nian_yue', 'chu_li_nian_yue']]
  525. # 打印DataFrame的信息
  526. df.info()
  527. # 将处理后的数据保存到CSV文件中
  528. df.to_csv(path_or_buf=output_path, index=False, encoding='utf-8-sig', lineterminator='\n')
  529. def data_import():
  530. # 定义 PowerShell 脚本的路径
  531. script_path = r"../../copy.ps1"
  532. # 目标表和文件信息
  533. table = "car.car_wei_zhang" # 数据库目标表名
  534. # 表字段列名,用于指定导入数据的列顺序
  535. columns = "year_month,che_pai_hao,first_unit,second_unit,third_unit,che_jia_hao,wei_zhang_shi_jian,wei_zhang_di_dian,wei_zhang_xiang_qing,kou_fen,fa_kuan,chu_li_zhuang_tai,chu_li_shi_jian,wei_zhang_wei_chu_li_shi_chang,raw_yi_ji,raw_er_ji,raw_san_ji,raw_che_pai_hao,che_pai_fail,area_no,area_name,city_no,city_name,area_no2,area_name2,city_id,city,district_id,district,year_no,month_no,wei_zhang_nian_yue,chu_li_nian_yue"
  536. # 构造执行 PowerShell 脚本的命令
  537. command = f"powershell -NoProfile -NonInteractive -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  538. # 打印生成的命令,方便调试和日志记录
  539. logger.info("command: {}", command)
  540. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  541. completed_process = subprocess.run(
  542. command, # 执行的命令
  543. check=False, # 如果命令执行失败,不抛出异常
  544. text=True, # 将输出作为字符串处理
  545. capture_output=True, # 捕获标准输出和标准错误
  546. )
  547. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  548. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  549. completed_process.stderr)
  550. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  551. p = re.compile(r"^(COPY) (\d+)$")
  552. count = None # 初始化计数变量
  553. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  554. if matcher:
  555. count = int(matcher.group(2)) # 提取导入的数据行数
  556. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  557. if count is None:
  558. raise RuntimeError("导入数据失败")
  559. def upload_file():
  560. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  561. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  562. with paramiko.SSHClient() as ssh:
  563. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  564. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  565. # 连接到远程主机,传入主机地址、端口、用户名和密码
  566. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  567. # 执行远程命令,创建远程目录(如果不存在)
  568. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  569. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  570. with ssh.open_sftp() as sftp:
  571. # 记录日志,提示即将上传的本地文件和远程目标路径
  572. logger.info("upload {} to {}", input_path, remote_path)
  573. # 使用SFTP的put方法将本地文件上传到远程主机
  574. sftp.put(input_path, remote_path)
  575. # 记录日志,提示文件已成功上传
  576. logger.info("uploaded {}", input_path)
  577. def data_update():
  578. with psycopg.connect(
  579. conninfo=conn_info,
  580. ) as conn:
  581. with conn.cursor() as curs:
  582. # 插入违章长期未处理
  583. sql = f"""
  584. insert
  585. into
  586. car.car_wei_zhang_chang_qi
  587. (
  588. year_month,
  589. che_pai_hao,
  590. raw_yi_ji,
  591. raw_er_ji,
  592. raw_san_ji,
  593. wei_zhang_shi_jian,
  594. wei_zhang_di_dian,
  595. wei_zhang_xiang_qing,
  596. kou_fen,
  597. fa_kuan,
  598. wei_zhang_wei_chu_li_shi_chang,
  599. chu_li_zhuang_tai,
  600. first_unit,
  601. second_unit,
  602. third_unit,
  603. area_no,
  604. area_name,
  605. city_no,
  606. city_name,
  607. area_name2,
  608. area_no2,
  609. city_id,
  610. city,
  611. district_id,
  612. district,
  613. raw_che_pai_hao,
  614. che_pai_fail,
  615. wei_zhang_nian_yue,
  616. year_no,
  617. month_no,
  618. source
  619. )
  620. select
  621. year_month,
  622. che_pai_hao,
  623. raw_yi_ji,
  624. raw_er_ji,
  625. raw_san_ji,
  626. wei_zhang_shi_jian,
  627. wei_zhang_di_dian,
  628. wei_zhang_xiang_qing,
  629. kou_fen,
  630. fa_kuan,
  631. wei_zhang_wei_chu_li_shi_chang,
  632. chu_li_zhuang_tai,
  633. first_unit,
  634. second_unit,
  635. third_unit,
  636. area_no,
  637. area_name,
  638. city_no,
  639. city_name,
  640. area_name2,
  641. area_no2,
  642. city_id,
  643. city,
  644. district_id,
  645. district,
  646. raw_che_pai_hao,
  647. che_pai_fail,
  648. wei_zhang_nian_yue,
  649. year_no,
  650. month_no,
  651. source
  652. from
  653. car.car_wei_zhang
  654. where
  655. chu_li_zhuang_tai = '未处理'
  656. and wei_zhang_wei_chu_li_shi_chang > 150
  657. and year_month = {year_month}
  658. """
  659. logger.info(f"sql: {sql}")
  660. curs.execute(sql)
  661. logger.info(f"update {curs.rowcount}")
  662. # 插入违章
  663. sql = f"""
  664. insert
  665. into
  666. car_theme.wz_f_violation_details
  667. (
  668. statistical_month,
  669. card_num,
  670. city,
  671. dpt_sec,
  672. grid,
  673. violation_time,
  674. violation_location,
  675. violation_details,
  676. deduction_points,
  677. fine,
  678. processing_time,
  679. unprocessed_duration_of_violation,
  680. offline_actual_processing_status
  681. )
  682. select
  683. year_month,
  684. che_pai_hao,
  685. first_unit,
  686. second_unit,
  687. third_unit,
  688. wei_zhang_shi_jian,
  689. wei_zhang_di_dian,
  690. wei_zhang_xiang_qing,
  691. kou_fen,
  692. fa_kuan,
  693. chu_li_shi_jian,
  694. wei_zhang_wei_chu_li_shi_chang,
  695. chu_li_zhuang_tai
  696. from
  697. car.car_wei_zhang
  698. where
  699. year_month = {year_month}
  700. """
  701. logger.info(f"sql: {sql}")
  702. curs.execute(sql)
  703. logger.info(f"update {curs.rowcount}")
  704. data_process()
  705. data_import()
  706. upload_file()
  707. data_update()