car_wei_zhang.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751
  1. """车辆违章数据处理
  2. """
  3. import re
  4. import subprocess
  5. from datetime import datetime
  6. from dateutil.relativedelta import relativedelta
  7. from loguru import logger
  8. import pandas as pd
  9. import psycopg
  10. import paramiko
  11. # 添加日志记录,将日志输出到文件 a.log
  12. logger.add(sink='a.log')
  13. ssh_hostname = '172.16.107.4' # 定义远程主机地址
  14. ssh_port = 22 # 定义SSH服务的端口号
  15. ssh_username = 'app' # 定义登录远程主机的用户名
  16. ssh_password = '(l4w0ST_' # 定义登录远程主机的密码
  17. # 服务器文件夹路径
  18. remote_dir_path = '/data/history/car/wei-zhang/'
  19. # 数据库连接信息
  20. db_host = "172.16.107.5" # 数据库主机地址
  21. db_port = 5432 # 数据库端口号
  22. db_username = "finance" # 数据库用户名
  23. db_password = "Finance@unicom23" # 数据库密码
  24. dbname = "financialdb" # 数据库名称
  25. conn_info = f"host='{db_host}' port={db_port} user='{db_username}' password='{db_password}' dbname='{dbname}'"
  26. # 获取当前日期,并计算上个月的第一天
  27. today = datetime.today()
  28. start_date = today - relativedelta(months=1, day=1)
  29. year_month = start_date.strftime('%Y%m')
  30. # 数据文件路径
  31. input_path = 'data.xlsx'
  32. # 输出文件路径
  33. output_path = 'output.csv'
  34. def data_process():
  35. # 正则表达式匹配车牌省份简称(如京、津、晋等)
  36. has_che_pai_province_pattern = re.compile(
  37. "[" + re.escape("京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁国防") + "]")
  38. # 正则表达式匹配非车牌字符,排除车牌可能包含的字符(如字母、数字、特殊标志等)
  39. not_che_pai_pattern = re.compile(
  40. "[^京津晋冀蒙辽吉黑沪苏浙皖闽赣鲁豫鄂湘粤桂琼渝川贵云藏陕甘青宁新港澳学挂领试超练警国防A-Z\\d]")
  41. # 正则表达式匹配完整的车牌号格式
  42. che_pai_pattern = re.compile(
  43. r"([京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z]"
  44. r"(([DF]((?![IO])[A-Z0-9](?![IO]))\d{4})|(\d{5}[DF]))|"
  45. r"[京津沪渝冀豫云辽黑湘皖鲁新苏浙赣鄂桂甘晋蒙陕吉闽贵粤青藏川宁琼使领A-Z][A-Z][A-Z0-9]{4}[A-Z0-9挂学警港澳])"
  46. )
  47. # 一级单位字典
  48. first_unit_map = {
  49. "保定市分公司": "保定",
  50. "沧州市分公司": "沧州",
  51. "承德市分公司": "承德",
  52. "邯郸市分公司": "邯郸",
  53. "河北省分公司线路维护中心": "机动局",
  54. "河北省机动通信局": "机动局",
  55. "衡水市分公司": "衡水",
  56. "华北基地建设部": "华北基地建设部",
  57. "廊坊市分公司": "廊坊",
  58. "秦皇岛市分公司": "秦皇岛",
  59. "省公司本部": "省公司本部",
  60. "石家庄市分公司": "石家庄",
  61. "唐山市分公司": "唐山",
  62. "邢台市分公司": "邢台",
  63. "雄安基地建设部": "雄安基地建设部",
  64. "雄安新区分公司": "雄安",
  65. "张家口市分公司": "张家口"
  66. }
  67. # 定义二级行政区划映射表(地级市及其下属区县)
  68. er_ji_map = {
  69. "石家庄": ["鹿泉", "藁城", "栾城", "井陉矿区", "井陉", "无极", "正定", "元氏", "新乐", "晋州", "平山", "灵寿",
  70. "赞皇", "赵县", "行唐", "高邑", "辛集", "深泽"],
  71. "唐山": ["唐山高开区", "迁西", "海港", "开平", "丰南", "滦县", "乐亭", "丰润", "玉田", "古冶", "曹妃甸", "遵化",
  72. "滦南", "迁安"],
  73. "秦皇岛": ["北戴河新区", "北戴河", "山海关", "昌黎", "卢龙", "青龙", "抚宁"],
  74. "邯郸": ["曲周", "魏县", "馆陶", "磁县", "大名", "鸡泽", "成安", "涉县", "永年", "武安", "峰峰", "广平", "临漳",
  75. "邱县", "肥乡"],
  76. "邢台": ["新河", "南宫", "隆尧", "内邱", "平乡", "宁晋", "广宗", "清河", "临西", "任县", "巨鹿", "沙河", "威县",
  77. "临城", "柏乡", "南和"],
  78. "保定": ["涞水", "蠡县", "顺平", "博野", "安国", "涞源", "唐县", "定州", "高阳", "曲阳", "阜平", "清苑",
  79. "高碑店",
  80. "满城", "涿州", "易县", "望都", "徐水", "定兴", "白沟"],
  81. "张家口": ["张北", "崇礼", "康保", "赤城", "阳原", "万全", "下花园", "尚义", "怀安", "怀来", "蔚县", "涿鹿",
  82. "沽源",
  83. "宣化"],
  84. "承德": ["承德县", "兴隆", "宽城", "平泉", "营子", "隆化", "滦平", "围场", "丰宁", "双滦"],
  85. "廊坊": ["文安", "霸州", "大城", "廊坊开发区", "三河", "香河", "永清", "胜芳", "燕郊", "固安", "大厂"],
  86. "沧州": ["东光", "吴桥", "黄骅", "盐山", "孟村", "泊头", "献县", "南皮", "渤海新区", "海兴", "沧县", "河间",
  87. "青县",
  88. "任丘", "肃宁"],
  89. "衡水": ["景县", "阜城", "枣强", "深州", "饶阳", "故城", "武强", "武邑", "冀州", "安平"],
  90. "雄安": ["容城", "雄县", "安新"]
  91. }
  92. # 初始化组织结构映射表
  93. org_map = {}
  94. third_org_map = {}
  95. third_org_list_map = {}
  96. area_map = {}
  97. district_list_map = {}
  98. # 连接PostgreSQL数据库
  99. with psycopg.connect(
  100. conninfo=conn_info,
  101. row_factory=psycopg.rows.dict_row
  102. ) as conn:
  103. with conn.cursor() as curs:
  104. # 查询一级组织数据,并按order_num排序
  105. sql = """
  106. select * from common.organization where grade = 1 order by order_num
  107. """
  108. logger.info(f"sql: {sql}")
  109. curs.execute(sql)
  110. second_orgs = curs.fetchall()
  111. # 遍历一级组织数据,构建org_map和third_org_list_map
  112. for x in second_orgs:
  113. org_map[x['id']] = x
  114. third_org_list_map[x['id']] = []
  115. # 查询二级组织数据,并按parent_id和order_num排序
  116. sql = """
  117. select * from common.organization where grade = 2 order by parent_id, order_num
  118. """
  119. logger.info(f"sql: {sql}")
  120. curs.execute(sql)
  121. third_orgs = curs.fetchall()
  122. # 遍历二级组织数据,构建org_map、third_org_list_map和third_org_map
  123. for x in third_orgs:
  124. org_map[x['id']] = x
  125. third_org_list_map[x['parent_id']].append(x)
  126. third_org_map[x['id']] = x
  127. # 查询一级行政区划数据,并按area_id排序
  128. sql = """
  129. select * from common.area where area_grade = 1 order by area_id
  130. """
  131. logger.info(f"sql: {sql}")
  132. curs.execute(sql)
  133. cities = curs.fetchall()
  134. # 遍历一级行政区划数据,构建area_map
  135. for city in cities:
  136. area_map[city['area_id']] = city
  137. # 查询二级行政区划数据,并按parent_id和area_id排序
  138. sql = """
  139. select * from common.area where area_grade = 2 order by parent_id, area_id
  140. """
  141. logger.info(f"sql: {sql}")
  142. curs.execute(sql)
  143. districts = curs.fetchall()
  144. # 遍历二级行政区划数据,构建area_map和district_list_map
  145. for district in districts:
  146. area_map[district['area_id']] = district
  147. # 构建城市与区县的映射关系
  148. for city in cities:
  149. district_list_map[city['area_id']] = []
  150. for district in districts:
  151. if city['area_id'] == district['parent_id']:
  152. district_list_map[city['area_id']].append(district)
  153. # 读取 Excel 文件中的数据
  154. df = pd.read_excel(io=input_path)
  155. # 获取需要清理的列名列表,排除 "违章时间" 和 "处理时间" 列
  156. columns_to_clean = list(filter(lambda x: x not in ('违章时间', '处理时间'), df.columns))
  157. # 对需要清理的列进行字符串清理,移除多余的空白字符
  158. df[columns_to_clean] = df[columns_to_clean].map(lambda x: re.sub(r'\s+', '', x) if type(x) is str else x)
  159. df['账期'] = year_month
  160. # 保存原始单位和车牌号信息到新的列中
  161. df['原始一级单位'] = df['一级单位']
  162. df['原始二级单位'] = df['二级单位']
  163. df['原始三级单位'] = df['三级单位']
  164. df['原始车牌号'] = df['车牌号']
  165. # 定义函数,用于提取并标准化车牌号
  166. def get_che_pai(che_pai):
  167. # 如果车牌号为空或无效,则返回空字符串
  168. if pd.isna(che_pai) or not che_pai or not che_pai.strip():
  169. return ""
  170. # 将车牌号转换为大写
  171. upper_case = che_pai.upper()
  172. # 移除车牌号中不符合规则的字符
  173. s = not_che_pai_pattern.sub("", upper_case)
  174. # 使用正则表达式匹配合法的车牌号
  175. m = che_pai_pattern.search(s)
  176. if m:
  177. return m.group(0)
  178. # 如果车牌号包含省份简称但未匹配成功,记录警告日志
  179. if has_che_pai_province_pattern.search(che_pai):
  180. logger.warning(f"车牌匹配失败: {che_pai} -> {s}")
  181. return s
  182. # 如果完全无法匹配,记录警告日志并返回原车牌号
  183. logger.warning(f"车牌匹配失败: {che_pai} -> {upper_case}")
  184. return upper_case
  185. # 应用 get_che_pai 函数处理车牌号列
  186. df['车牌号'] = df['车牌号'].apply(get_che_pai)
  187. # 定义函数,用于标记车牌号是否匹配失败
  188. def che_pai_fail(che_pai):
  189. # 如果车牌号为空或无效,则标记为失败
  190. if pd.isna(che_pai) or not che_pai or not che_pai.strip():
  191. return "1"
  192. # 移除车牌号中不符合规则的字符
  193. s = not_che_pai_pattern.sub("", che_pai.upper())
  194. # 使用正则表达式匹配合法的车牌号
  195. m = che_pai_pattern.search(s)
  196. if m:
  197. return "0" # 匹配成功
  198. return "1" # 匹配失败
  199. # 应用 che_pai_fail 函数生成车牌匹配失败标记列
  200. df['车牌匹配失败'] = df['车牌号'].apply(che_pai_fail)
  201. # 获取一级单位
  202. def get_first_unit(x):
  203. raw_che_pai = x['原始车牌号']
  204. raw_first_unit = str(x['原始一级单位']) if pd.notna(x['原始一级单位']) else ""
  205. if not raw_first_unit or not raw_first_unit.strip():
  206. raise RuntimeError(f"一级单位为空:{raw_che_pai}")
  207. if raw_first_unit in first_unit_map:
  208. return first_unit_map.get(raw_first_unit)
  209. raise RuntimeError(f"一级单位匹配失败:{raw_che_pai} {raw_first_unit}")
  210. # 应用 get_first_unit 函数生成一级单位列
  211. df['一级单位'] = df.apply(get_first_unit, axis=1)
  212. # 获取二级单位
  213. def get_second_unit(x):
  214. first_unit = str(x['一级单位']) if pd.notna(x['一级单位']) else ""
  215. raw_second_unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
  216. if first_unit in ["华北基地建设部", "雄安基地建设部", "省公司本部"]:
  217. return first_unit
  218. if not raw_second_unit or not raw_second_unit.strip():
  219. return f"{first_unit}本部"
  220. if first_unit == "机动局":
  221. for yj in er_ji_map.keys():
  222. if yj in raw_second_unit:
  223. return f"机动局{yj}"
  224. return "机动局本部"
  225. if first_unit == "石家庄":
  226. if "开发区" in raw_second_unit:
  227. return "石家庄开发区"
  228. if first_unit == "廊坊":
  229. if "开发区" in raw_second_unit:
  230. return "廊坊开发区"
  231. if first_unit == "邢台":
  232. if "内丘" in raw_second_unit:
  233. return "内邱"
  234. if "任泽" in raw_second_unit:
  235. return "任县"
  236. if first_unit == "唐山":
  237. if "高开区" in raw_second_unit:
  238. return "唐山高开区"
  239. if "滦州" in raw_second_unit:
  240. return "滦县"
  241. ejs = er_ji_map.get(first_unit, [])
  242. if first_unit == "雄安":
  243. raw_second_unit = raw_second_unit.replace("雄安新区", "")
  244. for ej in ejs:
  245. if ej in raw_second_unit:
  246. return ej
  247. return f"{first_unit}本部"
  248. # 应用 get_second_unit 函数生成二级单位列
  249. df['二级单位'] = df.apply(get_second_unit, axis=1)
  250. # 获取三级单位
  251. def get_third_unit(x):
  252. second_unit = str(x['二级单位']) if pd.notna(x['二级单位']) else ""
  253. raw_third_unit = str(x['原始三级单位']) if pd.notna(x['原始三级单位']) else ""
  254. return raw_third_unit if raw_third_unit and raw_third_unit.strip() else second_unit
  255. # 应用 get_third_unit 函数生成三级单位列
  256. df['三级单位'] = df.apply(get_third_unit, axis=1)
  257. # 定义一个函数,用于根据单位名称获取二级组织机构编码
  258. def get_area_no(first_unit):
  259. if first_unit == "机动局":
  260. return "-11"
  261. if first_unit in ["省公司本部", "雄安基地建设部", "华北基地建设部"]:
  262. return "-12"
  263. for second_org in second_orgs:
  264. if second_org.get('name') in first_unit:
  265. return second_org.get('id')
  266. return ''
  267. df['二级组织机构编码'] = df['一级单位'].apply(get_area_no)
  268. # 用于根据组织机构编码获取组织机构名称
  269. def get_org_name(x):
  270. org_no = str(x) if pd.notna(x) else ''
  271. # 如果编码为空或无效,则返回空字符串
  272. if not org_no or not org_no.strip():
  273. return ''
  274. # 在org_map中查找对应编码的组织机构信息,并返回其名称
  275. po = org_map.get(org_no)
  276. if po is not None:
  277. return po.get('name')
  278. return ''
  279. # 将get_org_name函数应用到'二级组织机构编码'列,生成'二级组织机构名称'列
  280. df['二级组织机构名称'] = df['二级组织机构编码'].apply(get_org_name)
  281. # 定义一个函数,用于根据行数据获取三级组织机构编码
  282. def get_city_no(x):
  283. # 获取相关字段值,如果为空则设置为""
  284. area_no = str(x['二级组织机构编码']) if pd.notna(x['二级组织机构编码']) else ""
  285. area_name = str(x['二级组织机构名称']) if pd.notna(x['二级组织机构名称']) else ""
  286. first_unit = str(x['一级单位']) if pd.notna(x['一级单位']) else ""
  287. second_unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
  288. if not area_no or not area_no.strip() or not area_name or not area_name.strip():
  289. return ''
  290. if '华北基地建设部' == first_unit:
  291. return 'HE018'
  292. if '雄安基地建设部' == first_unit:
  293. return 'HE019'
  294. if second_unit and second_unit.strip():
  295. if area_name == "石家庄":
  296. if "井陉矿区" in second_unit:
  297. return "D0130185"
  298. if "井陉" in second_unit:
  299. return "D0130121"
  300. if area_name == "秦皇岛":
  301. if "北戴河新区" in second_unit:
  302. return "D0130325"
  303. if "北戴河" in second_unit:
  304. return "D0130304"
  305. if area_name == "邯郸":
  306. if "峰峰" in second_unit:
  307. return "D0130406"
  308. if area_name == "邢台":
  309. if "内丘" in second_unit:
  310. return "D0130523"
  311. if "任泽" in second_unit:
  312. return "D0130526"
  313. if area_name == "省机动局":
  314. if "沧州" in second_unit:
  315. return "HECS180"
  316. if "唐山" in second_unit:
  317. return "HECS181"
  318. if "秦皇岛" in second_unit:
  319. return "HECS182"
  320. if "廊坊" in second_unit:
  321. return "HECS183"
  322. if "张家口" in second_unit:
  323. return "HECS184"
  324. if "邢台" in second_unit:
  325. return "HECS185"
  326. if "邯郸" in second_unit:
  327. return "HECS186"
  328. if "保定" in second_unit:
  329. return "HECS187"
  330. if "石家庄" in second_unit:
  331. return "HECS188"
  332. if "承德" in second_unit:
  333. return "HECS189"
  334. if "衡水" in second_unit:
  335. return "HECS720"
  336. if "雄安" in second_unit:
  337. return "HECS728"
  338. return "HECS018"
  339. if area_name == "雄安":
  340. second_unit = second_unit.replace("雄安新区", "")
  341. l3 = third_org_list_map.get(area_no, [])
  342. for organization_po in l3:
  343. if organization_po.get('name') in second_unit:
  344. return organization_po.get('id')
  345. if area_name == '省本部':
  346. return 'HE001'
  347. if area_name == "省机动局":
  348. return "HECS018"
  349. if area_name == "沧州":
  350. return "D0130911"
  351. if area_name == "唐山":
  352. return "D0130202"
  353. if area_name == "秦皇岛":
  354. return "D0130302"
  355. if area_name == "廊坊":
  356. return "D0131000"
  357. if area_name == "张家口":
  358. return "D0130701"
  359. if area_name == "邢台":
  360. return "D0130502"
  361. if area_name == "邯郸":
  362. return "D0130402"
  363. if area_name == "保定":
  364. return "D0130601"
  365. if area_name == "石家庄":
  366. return "D0130186"
  367. if area_name == "承德":
  368. return "D0130801"
  369. if area_name == "衡水":
  370. return "D0133001"
  371. if area_name == "雄安":
  372. return "D0130830"
  373. return ''
  374. # 将get_city_no函数应用到DataFrame的每一行,生成'三级组织机构编码'列
  375. df['三级组织机构编码'] = df.apply(get_city_no, axis=1)
  376. # 将get_org_name函数应用到'三级组织机构编码'列,生成'三级组织机构名称'列
  377. df['三级组织机构名称'] = df['三级组织机构编码'].apply(get_org_name)
  378. # 定义一个函数,用于根据行数据获取二级组织机构编码2
  379. def get_area_no2(x):
  380. # 获取相关字段值,如果为空则设置为""
  381. area_name = str(x['二级组织机构名称']) if pd.notna(x['二级组织机构名称']) else ""
  382. city_name = str(x['三级组织机构名称']) if pd.notna(x['三级组织机构名称']) else ""
  383. if not area_name or not area_name.strip() or '省本部' == area_name:
  384. return ''
  385. # 根据二级组织机构名称和三级组织机构名称的内容,返回对应的编码
  386. if area_name == "省机动局" and city_name and city_name.strip():
  387. if "沧州" in city_name:
  388. return "180"
  389. if "唐山" in city_name:
  390. return "181"
  391. if "秦皇岛" in city_name:
  392. return "182"
  393. if "廊坊" in city_name:
  394. return "183"
  395. if "张家口" in city_name:
  396. return "184"
  397. if "邢台" in city_name:
  398. return "185"
  399. if "邯郸" in city_name:
  400. return "186"
  401. if "保定" in city_name:
  402. return "187"
  403. if "石家庄" in city_name:
  404. return "188"
  405. if "承德" in city_name:
  406. return "189"
  407. if "衡水" in city_name:
  408. return "720"
  409. if "雄安" in city_name:
  410. return "782"
  411. return ''
  412. if "沧州" in area_name:
  413. return "180"
  414. if "唐山" in area_name:
  415. return "181"
  416. if "秦皇岛" in area_name:
  417. return "182"
  418. if "廊坊" in area_name:
  419. return "183"
  420. if "张家口" in area_name:
  421. return "184"
  422. if "邢台" in area_name:
  423. return "185"
  424. if "邯郸" in area_name:
  425. return "186"
  426. if "保定" in area_name:
  427. return "187"
  428. if "石家庄" in area_name:
  429. return "188"
  430. if "承德" in area_name:
  431. return "189"
  432. if "衡水" in area_name:
  433. return "720"
  434. if "雄安" in area_name:
  435. return "782"
  436. return ""
  437. # 将get_area_no2函数应用到DataFrame的每一行,生成'二级组织机构编码2'列
  438. df['二级组织机构编码2'] = df.apply(get_area_no2, axis=1)
  439. # 将get_org_name函数应用到'二级组织机构编码2'列,生成'二级组织机构名称2'列
  440. df['二级组织机构名称2'] = df['二级组织机构编码2'].apply(get_org_name)
  441. # 获取城市ID
  442. def get_city_id(x):
  443. raw_first_unit = str(x['原始一级单位']) if pd.notna(x['原始一级单位']) else ""
  444. raw_second_unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
  445. raw_third_unit = str(x['原始三级单位']) if pd.notna(x['原始三级单位']) else ""
  446. unit = f"{raw_first_unit}_{raw_second_unit}_{raw_third_unit}"
  447. if not unit or not unit.strip():
  448. return ''
  449. # 遍历cities列表,匹配单位名称并返回对应的城市ID
  450. for city in cities:
  451. if city.get('short_name') in unit:
  452. return city.get('area_id', '')
  453. return ''
  454. df['city_id'] = df.apply(get_city_id, axis=1)
  455. # 定义一个函数,用于根据ID获取区域名称
  456. def get_area_name(x):
  457. id = str(x) if pd.notna(x) else ""
  458. if not id or not id.strip():
  459. return ""
  460. # 在area_map中查找对应ID的区域信息,并返回其名称
  461. area_po = area_map.get(id)
  462. if area_po is not None:
  463. return area_po.get("area_name", "")
  464. return ""
  465. # 将get_area_name函数应用到'city_id'列,生成'city'列
  466. df['city'] = df['city_id'].apply(get_area_name)
  467. # 定义一个函数,用于根据行数据获取区县ID
  468. def get_district_id(x):
  469. # 获取相关字段值,如果为空则设置为""
  470. city_id = str(x['city_id']) if pd.notna(x['city_id']) else ""
  471. city = str(x['city']) if pd.notna(x['city']) else ""
  472. raw_first_unit = str(x['原始一级单位']) if pd.notna(x['原始一级单位']) else ""
  473. raw_second_unit = str(x['原始二级单位']) if pd.notna(x['原始二级单位']) else ""
  474. raw_third_unit = str(x['原始三级单位']) if pd.notna(x['原始三级单位']) else ""
  475. unit = f"{raw_first_unit}_{raw_second_unit}_{raw_third_unit}"
  476. # 如果城市ID、城市名称或单位为空,则返回""
  477. if not city_id or not city_id.strip() or not city or not city.strip() or not unit or not unit.strip():
  478. return ""
  479. # 根据城市名称和单位内容,返回对应的区县ID
  480. if city == "石家庄":
  481. if "井陉矿区" in unit:
  482. return "130107"
  483. if "井陉" in unit:
  484. return "130121"
  485. if city == "雄安":
  486. unit = unit.replace("雄安新区", "")
  487. districts = district_list_map.get(city_id, [])
  488. for district in districts:
  489. if district.get('short_name') in unit:
  490. return district.get('area_id')
  491. return ""
  492. df['district_id'] = df.apply(get_district_id, axis=1)
  493. # 将get_area_name函数应用到'district_id'列,生成'district'列
  494. df['district'] = df['district_id'].apply(get_area_name)
  495. # 提取账期年份和月份信息
  496. df['year_no'] = df['账期'].apply(lambda x: None if pd.isna(x) else str(x)[:4])
  497. df['month_no'] = df['账期'].apply(lambda x: None if pd.isna(x) else str(x)[-2:])
  498. # 格式化违章时间和处理时间为年月格式
  499. df['违章年月'] = df['违章时间'].apply(lambda x: None if pd.isna(x) else pd.to_datetime(x).strftime('%Y%m'))
  500. df['处理年月'] = df['处理时间'].apply(lambda x: None if pd.isna(x) else pd.to_datetime(x).strftime('%Y%m'))
  501. # 打印DataFrame的信息
  502. print(df.info())
  503. # 将处理后的数据保存到CSV文件中
  504. df.to_csv(path_or_buf=output_path,
  505. header=['year_month', 'che_pai_hao', 'first_unit', 'second_unit', 'third_unit', 'che_jia_hao',
  506. 'wei_zhang_shi_jian', 'wei_zhang_di_dian', 'wei_zhang_xiang_qing', 'kou_fen', 'fa_kuan',
  507. 'chu_li_zhuang_tai', 'chu_li_shi_jian', 'wei_zhang_wei_chu_li_shi_chang', 'raw_yi_ji',
  508. 'raw_er_ji',
  509. 'raw_san_ji', 'raw_che_pai_hao', 'che_pai_fail', 'area_no', 'area_name', 'city_no', 'city_name',
  510. 'area_no2', 'area_name2', 'city_id', 'city', 'district_id', 'district', 'year_no', 'month_no',
  511. 'wei_zhang_nian_yue', 'chu_li_nian_yue'],
  512. index=False,
  513. encoding='utf-8-sig')
  514. def data_import():
  515. # 定义 PowerShell 脚本的路径
  516. script_path = r"../../copy.ps1"
  517. # 目标表和文件信息
  518. table = "car.car_wei_zhang" # 数据库目标表名
  519. # 表字段列名,用于指定导入数据的列顺序
  520. columns = "year_month,che_pai_hao,first_unit,second_unit,third_unit,che_jia_hao,wei_zhang_shi_jian,wei_zhang_di_dian,wei_zhang_xiang_qing,kou_fen,fa_kuan,chu_li_zhuang_tai,chu_li_shi_jian,wei_zhang_wei_chu_li_shi_chang,raw_yi_ji,raw_er_ji,raw_san_ji,raw_che_pai_hao,che_pai_fail,area_no,area_name,city_no,city_name,area_no2,area_name2,city_id,city,district_id,district,year_no,month_no,wei_zhang_nian_yue,chu_li_nian_yue"
  521. # 构造执行 PowerShell 脚本的命令
  522. command = f"powershell -File {script_path} -db_host {db_host} -db_port {db_port} -db_username {db_username} -db_password {db_password} -dbname {dbname} -table {table} -filename {output_path} -columns {columns}"
  523. # 打印生成的命令,方便调试和日志记录
  524. logger.info("command: {}", command)
  525. # 使用 subprocess 模块运行 PowerShell 命令,并捕获输出
  526. completed_process = subprocess.run(
  527. command, # 执行的命令
  528. check=False, # 如果命令执行失败,不抛出异常
  529. text=True, # 将输出作为字符串处理
  530. capture_output=True, # 捕获标准输出和标准错误
  531. )
  532. # 打印命令执行的结果,包括返回码、标准输出和标准错误
  533. logger.info("导入结果:\n{}\n{}\n{}", completed_process.returncode, completed_process.stdout,
  534. completed_process.stderr)
  535. # 定义正则表达式,用于匹配标准输出中的 COPY 结果
  536. p = re.compile(r"^(COPY) (\d+)$")
  537. count = None # 初始化计数变量
  538. matcher = p.match(completed_process.stdout) # 匹配标准输出中的 COPY 结果
  539. if matcher:
  540. count = int(matcher.group(2)) # 提取导入的数据行数
  541. # 如果没有成功提取到导入数据的行数,抛出运行时异常
  542. if count is None:
  543. raise RuntimeError("导入数据失败")
  544. def upload_file():
  545. remote_path = f'{remote_dir_path}{year_month}.xlsx' # 定义远程主机的目标文件路径
  546. # 使用paramiko.SSHClient创建一个SSH客户端对象,并通过with语句管理其上下文
  547. with paramiko.SSHClient() as ssh:
  548. # 设置自动添加主机密钥策略,避免因未知主机密钥导致连接失败
  549. ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
  550. # 连接到远程主机,传入主机地址、端口、用户名和密码
  551. ssh.connect(ssh_hostname, port=ssh_port, username=ssh_username, password=ssh_password)
  552. # 执行远程命令,创建远程目录(如果不存在)
  553. ssh.exec_command(f'mkdir -p {remote_dir_path}')
  554. # 打开SFTP会话,用于文件传输,并通过with语句管理其上下文
  555. with ssh.open_sftp() as sftp:
  556. # 记录日志,提示即将上传的本地文件和远程目标路径
  557. logger.info("upload {} to {}", input_path, remote_path)
  558. # 使用SFTP的put方法将本地文件上传到远程主机
  559. sftp.put(input_path, remote_path)
  560. # 记录日志,提示文件已成功上传
  561. logger.info("uploaded {}", input_path)
  562. def data_update():
  563. with psycopg.connect(
  564. conninfo=conn_info,
  565. ) as conn:
  566. with conn.cursor() as curs:
  567. # 插入违章长期未处理
  568. sql = f"""
  569. insert
  570. into
  571. car.car_wei_zhang_chang_qi
  572. (
  573. year_month,
  574. che_pai_hao,
  575. raw_yi_ji,
  576. raw_er_ji,
  577. raw_san_ji,
  578. wei_zhang_shi_jian,
  579. wei_zhang_di_dian,
  580. wei_zhang_xiang_qing,
  581. kou_fen,
  582. fa_kuan,
  583. wei_zhang_wei_chu_li_shi_chang,
  584. chu_li_zhuang_tai,
  585. first_unit,
  586. second_unit,
  587. third_unit,
  588. area_no,
  589. area_name,
  590. city_no,
  591. city_name,
  592. area_name2,
  593. area_no2,
  594. city_id,
  595. city,
  596. district_id,
  597. district,
  598. raw_che_pai_hao,
  599. che_pai_fail,
  600. wei_zhang_nian_yue,
  601. year_no,
  602. month_no,
  603. source
  604. )
  605. select
  606. year_month,
  607. che_pai_hao,
  608. raw_yi_ji,
  609. raw_er_ji,
  610. raw_san_ji,
  611. wei_zhang_shi_jian,
  612. wei_zhang_di_dian,
  613. wei_zhang_xiang_qing,
  614. kou_fen,
  615. fa_kuan,
  616. wei_zhang_wei_chu_li_shi_chang,
  617. chu_li_zhuang_tai,
  618. first_unit,
  619. second_unit,
  620. third_unit,
  621. area_no,
  622. area_name,
  623. city_no,
  624. city_name,
  625. area_name2,
  626. area_no2,
  627. city_id,
  628. city,
  629. district_id,
  630. district,
  631. raw_che_pai_hao,
  632. che_pai_fail,
  633. wei_zhang_nian_yue,
  634. year_no,
  635. month_no,
  636. source
  637. from
  638. car.car_wei_zhang
  639. where
  640. chu_li_zhuang_tai = '未处理'
  641. and wei_zhang_wei_chu_li_shi_chang > 150
  642. and year_month = {year_month}
  643. """
  644. logger.info(f"sql: {sql}")
  645. curs.execute(sql)
  646. logger.info(f"update {curs.rowcount}")
  647. # 插入违章
  648. sql = f"""
  649. insert
  650. into
  651. car_theme.wz_f_violation_details
  652. (
  653. statistical_month,
  654. card_num,
  655. city,
  656. dpt_sec,
  657. grid,
  658. violation_time,
  659. violation_location,
  660. violation_details,
  661. deduction_points,
  662. fine,
  663. processing_time,
  664. unprocessed_duration_of_violation,
  665. offline_actual_processing_status
  666. )
  667. select
  668. year_month,
  669. che_pai_hao,
  670. first_unit,
  671. second_unit,
  672. third_unit,
  673. wei_zhang_shi_jian,
  674. wei_zhang_di_dian,
  675. wei_zhang_xiang_qing,
  676. kou_fen,
  677. fa_kuan,
  678. chu_li_shi_jian,
  679. wei_zhang_wei_chu_li_shi_chang,
  680. chu_li_zhuang_tai
  681. from
  682. car.car_wei_zhang
  683. where
  684. year_month = {year_month}
  685. """
  686. logger.info(f"sql: {sql}")
  687. curs.execute(sql)
  688. logger.info(f"update {curs.rowcount}")
  689. data_process()
  690. data_import()
  691. upload_file()
  692. data_update()