浏览代码

feat: 接入新网关,记录短信报告

weijianghai 10 月之前
父节点
当前提交
dca6e6e655

+ 17 - 1
src/main/java/com/nokia/sms/action/ReportAction.java

@@ -1,5 +1,9 @@
 package com.nokia.sms.action;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.stream.Collectors;
 
+import com.nokia.sms.pojo.SmsReport;
 import com.nokia.sms.sgip.message.ReportMessage;
 import com.nokia.sms.sgip.message.SgipMessage;
 import com.nokia.sms.sgip.mo.server.Action;
@@ -13,7 +17,19 @@ public class ReportAction implements Action {
 		if (message instanceof ReportMessage) {
 			// 收到状态报告
 			ReportMessage rMessage = (ReportMessage) message;
-			log.info("收到Report." + rMessage.toString());
+			log.info("收到Report." + rMessage);
+			String flowId = Arrays.stream(rMessage.getSubmitSN())
+					.mapToObj(Integer::toString).collect(Collectors.joining("-"));
+			SmsReport smsReport = new SmsReport();
+			smsReport.setPhone(rMessage.getUserNumber().replaceFirst("86", ""));
+			smsReport.setFlowId(flowId);
+			smsReport.setReportTime(new Date());
+			smsReport.setReportType(rMessage.getReportType());
+			smsReport.setState(rMessage.getState());
+			smsReport.setErrorCode(rMessage.getErrorCode());
+			if (!SmsReport.offer(smsReport)) {
+				log.error("短信报告入队失败: {}", smsReport);
+			}
 		}
 
 	}

+ 15 - 0
src/main/java/com/nokia/sms/config/MOServerConfig.java

@@ -0,0 +1,15 @@
+package com.nokia.sms.config;
+
+import com.nokia.sms.sgip.mo.server.MOServer;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import java.util.concurrent.CompletableFuture;
+
+@Configuration
+public class MOServerConfig {
+    @PostConstruct
+    public void init() {
+        CompletableFuture.runAsync(MOServer::getInstance);
+    }
+}

+ 13 - 0
src/main/java/com/nokia/sms/config/MTClientConfig.java

@@ -0,0 +1,13 @@
+package com.nokia.sms.config;
+
+import com.nokia.sms.sgip.mt.client.MTClient;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class MTClientConfig {
+    @Bean
+    public MTClient mtClient() {
+        return MTClient.getInstance();
+    }
+}

+ 21 - 2
src/main/java/com/nokia/sms/dao/SmsDao.java

@@ -1,14 +1,18 @@
 package com.nokia.sms.dao;
 
+import com.nokia.common.mybatis.JsonTypeHandler;
+import com.nokia.sms.pojo.Sms;
+import com.nokia.sms.pojo.SmsReport;
 import org.apache.ibatis.annotations.Insert;
 import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
 import org.apache.ibatis.annotations.Result;
 import org.apache.ibatis.annotations.Results;
 import org.apache.ibatis.annotations.Select;
+import org.apache.ibatis.annotations.Update;
 import org.apache.ibatis.type.JdbcType;
 
-import com.nokia.common.mybatis.JsonTypeHandler;
-import com.nokia.sms.pojo.Sms;
+import java.util.List;
 
 @Mapper
 public interface SmsDao {
@@ -34,4 +38,19 @@ public interface SmsDao {
         })
         @Select("select * from sqmdb_rpt.sms_record where id = #{id}")
         Sms selectById(String id);
+
+        @Update(
+                "<script>"
+                + " insert into sqmdb_rpt.sms_report"
+                + " (id, phone, flow_id, send_time)"
+                + " values"
+                + " <foreach collection=\"list\" item=\"item\" index=\"index\" separator=\",\">"
+                + " (#{item.id}, #{item.phone}, #{item.flowId}, #{item.sendTime})"
+                + " </foreach>"
+                + " </script>"
+        )
+        int insertBatchSmsReport(@Param("list") List<SmsReport> list);
+
+        @Update("update sqmdb_rpt.sms_report set report_time = #{po.reportTime}, report_type = #{po.reportType}, state = #{po.state}, error_code = #{po.errorCode} where phone = #{po.phone} and flow_id = #{po.flowId}")
+        int updateSmsReport(@Param("po") SmsReport po);
 }

+ 29 - 0
src/main/java/com/nokia/sms/pojo/SmsReport.java

@@ -0,0 +1,29 @@
+package com.nokia.sms.pojo;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+import java.util.concurrent.LinkedBlockingQueue;
+
+@NoArgsConstructor
+@Data
+public class SmsReport {
+    static final LinkedBlockingQueue<SmsReport> QUEUE = new LinkedBlockingQueue<>();
+    private String id;
+    private String phone;
+    private String flowId;
+    private Date sendTime;
+    private Date reportTime;
+    private Integer reportType;
+    private Integer state;
+    private Integer errorCode;
+
+    public static boolean offer(SmsReport report) {
+        return QUEUE.offer(report);
+    }
+
+    public static SmsReport take() throws InterruptedException {
+        return QUEUE.take();
+    }
+}

+ 58 - 22
src/main/java/com/nokia/sms/service/SgipSmsService.java

@@ -4,6 +4,7 @@ import com.alibaba.fastjson2.JSON;
 import com.nokia.common.http.R;
 import com.nokia.sms.dao.SmsDao;
 import com.nokia.sms.pojo.Sms;
+import com.nokia.sms.pojo.SmsReport;
 import com.nokia.sms.sgip.message.SendResult;
 import com.nokia.sms.sgip.mt.client.MTClient;
 import lombok.Data;
@@ -12,11 +13,15 @@ import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Service
@@ -25,14 +30,16 @@ import java.util.concurrent.LinkedBlockingQueue;
 public class SgipSmsService {
     static final LinkedBlockingQueue<Sms> QUEUE = new LinkedBlockingQueue<>();
     private final SmsDao smsDao;
+    private final MTClient mtClient;
 
     /**
      * 最大重试次数
      */
     private Integer maxRetry;
 
-    public SgipSmsService(SmsDao smsDao) {
+    public SgipSmsService(SmsDao smsDao, MTClient mtClient) {
         this.smsDao = smsDao;
+        this.mtClient = mtClient;
     }
 
     /**
@@ -42,27 +49,39 @@ public class SgipSmsService {
      * @return {@link R}
      */
     public R offer(Sms sms) {
-        if (QUEUE.offer(sms)) {
-            return R.ok();
-        } else {
+        if (!QUEUE.offer(sms)) {
             log.error("入队失败");
             return R.error();
         }
+        return R.ok();
     }
 
     public void sendMessage(Sms sms) {
+        List<SmsReport> smsReportList = new ArrayList<>();
+        LinkedHashMap<String, String> sendResultMap = new LinkedHashMap<>();
         try {
             log.debug("准备发送短信: {}", JSON.toJSONString(sms));
-            sms.setId(UUID.randomUUID().toString());
+            String id = UUID.randomUUID().toString();
+            sms.setId(id);
             sms.setSendTime(new Date());
-            // 获取单例的MTClient
-            MTClient client = MTClient.getInstance();
-            // 发送短信
-            SendResult[] results = client.sendMessage(sms.getPhoneNumbers(), sms.getContent(), sms.getScheduleTime());
-            log.debug("短信已成功发送, 发送结果: {}", Arrays.toString(results));
-            // 销毁应用
-            client.dispose();
-            sms.setResult(Arrays.toString(results));
+            for (String phoneNumber : sms.getPhoneNumbers()) {
+                // 发送短信
+                SendResult[] results = mtClient.sendMessage(phoneNumber, sms.getContent(), sms.getScheduleTime());
+                String resultString = Arrays.toString(results);
+                log.debug("{}短信已成功发送, 发送结果: {}", phoneNumber, resultString);
+                sendResultMap.put(phoneNumber, resultString);
+                sms.setResult(JSON.toJSONString(sendResultMap));
+                for (SendResult result : results) {
+                    String flowId = Arrays.stream(result.getFlowId())
+                            .mapToObj(Integer::toString).collect(Collectors.joining("-"));
+                    SmsReport smsReport = new SmsReport();
+                    smsReport.setId(id);
+                    smsReport.setPhone(phoneNumber);
+                    smsReport.setSendTime(new Date());
+                    smsReport.setFlowId(flowId);
+                    smsReportList.add(smsReport);
+                }
+            }
 //            if (ThreadLocalRandom.current().nextBoolean()) {
 //                throw new RuntimeException("异常测试");
 //            }
@@ -70,15 +89,14 @@ public class SgipSmsService {
             log.info("短信发送成功: {}", JSON.toJSONString(sms));
         } catch (Exception e) {
             sms.setStatus("发送到短信网关失败");
-            log.error("短信发送失败, 短信: {}, 错误信息: {}", sms, e.getMessage());
-            log.error(e.getMessage(), e);
+            log.error("短信发送失败, 短信: {}, 错误信息: {}", sms, e.getMessage(), e);
             // 重试次数小于最大重试次数进行重试
             if (sms.getCount().getAndIncrement() < maxRetry) {
                 retry(sms);
             }
         } finally {
             smsDao.insert(sms);
-            log.debug("insert: {}", JSON.toJSONString(sms));
+            smsDao.insertBatchSmsReport(smsReportList);
         }
     }
 
@@ -86,20 +104,38 @@ public class SgipSmsService {
      * 消费队列发送短信
      */
     @PostConstruct
-    public void consumer() {
+    public void consumerSendQueue() {
         CompletableFuture.runAsync(() -> {
             while (true) {
-                Sms sms = null;
                 try {
-                    sms = QUEUE.take();
+                    Sms sms = QUEUE.take();
+                    sendMessage(sms);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
                     log.error(e.getMessage(), e);
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
                 }
-                if (sms == null) {
-                    continue;
+            }
+        });
+    }
+
+    /**
+     * 消费短信报告队列更新数据库
+     */
+    @PostConstruct
+    public void consumerReportQueue() {
+        CompletableFuture.runAsync(() -> {
+            while (true) {
+                try {
+                    SmsReport smsReport = SmsReport.take();
+                    smsDao.updateSmsReport(smsReport);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    log.error(e.getMessage(), e);
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
                 }
-                sendMessage(sms);
             }
         });
     }

+ 5 - 4
src/main/java/com/nokia/sms/sgip/codec/MessageUtil.java

@@ -1,14 +1,15 @@
 package com.nokia.sms.sgip.codec;
 
+import org.apache.mina.core.buffer.IoBuffer;
+
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CharsetEncoder;
 import java.text.SimpleDateFormat;
+import java.time.LocalDate;
 import java.util.Date;
 
-import org.apache.mina.core.buffer.IoBuffer;
-
 public class MessageUtil {
     private static int sgipSeq;
     private static final SimpleDateFormat dateForamt = new SimpleDateFormat("MMddHHmmss");
@@ -34,8 +35,8 @@ public class MessageUtil {
         int[] flowId = new int[3];
         flowId[0] = nodeId;
         flowId[1] = Integer.parseInt(dateForamt.format(new Date()));
-        flowId[2] = sgipSeq;
-        if (sgipSeq == 2147483647) {
+        flowId[2] = LocalDate.now().getYear() * 100000 + sgipSeq;
+        if (sgipSeq == 99999) {
             sgipSeq = 0;
         } else {
             sgipSeq += 1;

+ 1 - 1
src/main/java/com/nokia/sms/sgip/mo/server/MOServer.java

@@ -58,7 +58,7 @@ public class MOServer {
                     new ExecutorFilter(new UnorderedThreadPoolExecutor(codeThread, maxThread)));
             int port = this.config.getInt("sgip_mo_port");
             this.acceptor.bind(new InetSocketAddress(port));
-            log.info("Bind port=" + port);
+            log.info("MOServer Bind port=" + port);
         } catch (IOException e) {
             log.error("", e);
         }

+ 1 - 0
src/main/resources/application.properties

@@ -17,3 +17,4 @@ pushmessage.appSecret=oz4OgKBaMNwi4LWfLPbhrPbbuCS8T0Rb
 pushmessage.systemId=10000078
 pushmessage.moduleId=20000156
 pushmessage.busiCode=30000111
+#mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl

+ 8 - 6
src/main/resources/sgipConfig.properties

@@ -1,7 +1,8 @@
 #=======与网关的连接配置信息========
 # 网关ip
 # sgip_mt_server_ip=133.96.90.123
-sgip_mt_server_ip=10.17.141.26
+#sgip_mt_server_ip=10.17.141.26
+sgip_mt_server_ip=10.17.183.5
 # 网关端口
 sgip_mt_server_port=8801
 # 连接最大空闲时间,建议值50
@@ -24,7 +25,8 @@ sgip_mt_node_id=14039
 # sgip_mt_login_name=14362
 sgip_mt_login_name=14039
 # sp作为客户绑定网关时的密码
-sgip_mt_login_password=ZXwg20@@
+#sgip_mt_login_password=ZXwg20@@
+sgip_mt_login_password=Rhzx@024
 # 网关分配给sp的接入号
 # sgip_mt_sp_number=1065577000
 sgip_mt_sp_number=1065572174
@@ -57,18 +59,18 @@ sgip_mt_message_type=0
 
 #========用于接受网关的消息的服务端相关配置=====
 # 本地监听的MO消息的端口
-sgip_mo_port=8802
+sgip_mo_port=8801
 # 提供给网关登录时的用户名
-sgip_mo_login_name=123
+sgip_mo_login_name=14039
 # 提供给网关登录时的密码
-sgip_mo_login_password=123
+sgip_mo_login_password=Rhzx@024
 
 # 处理网关发过来的消息的连接池初始化线程数
 sgip_mo_threadpool_code_thread=4
 # 处理网关发过来的消息的连接池最大线程数
 sgip_mo_threadpool_max_thread=16
 # 与网关的socket连接的超时时间
-sgip_mo_socket_timeout=5
+sgip_mo_socket_timeout=30
 # 处理上行消息的类
 sgip_mo_deliver_action_class=com.nokia.sms.action.DeliverAction
 # 处理下行消息的类

+ 44 - 0
src/test/com/nokia/sms/ApiTests.java

@@ -0,0 +1,44 @@
+package com.nokia.sms;
+
+import com.nokia.sms.vo.MultiNumberSms;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Import;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Stream;
+
+@Slf4j
+@SpringBootTest
+@Import(TestConfig.class)
+class ApiTests {
+    @Autowired
+    private ThreadPoolExecutor requestExecutor;
+
+    @Test
+    void test() {
+        String url = "http://192.168.70.125:12081/api/sms/sendByPhoneNumbers";
+        MultiNumberSms dto = new MultiNumberSms();
+        dto.setFromSystem("test");
+        dto.setPhoneNumbers(Stream.of("13231899751", "13020841628").toArray(String[]::new));
+        dto.setContent("这是一条比较长的测试短信,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十,一二三四五六七八九十。");
+        dto.setSmsType("test");
+        dto.setInternalId("000001");
+        CompletableFuture.allOf(Stream.iterate(1, n -> n + 1).limit(100).map(t ->
+                CompletableFuture.runAsync(() -> {
+                    try {
+                        log.info(">>>>>>>>>start {}", t);
+                        RestTemplate template = new RestTemplate();
+                        ResponseEntity<Object> r = template.postForEntity(url, dto, Object.class);
+                        log.info(">>>>>>>>>end {} -> {}", t, r.getStatusCode());
+                    } catch (Exception e) {
+                        log.error(e.getMessage(), e);
+                    }
+                }, requestExecutor)).toArray(CompletableFuture[]::new)).join();
+    }
+}

+ 26 - 0
src/test/com/nokia/sms/TestConfig.java

@@ -0,0 +1,26 @@
+package com.nokia.sms;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@TestConfiguration
+public class TestConfig {
+    public static final int corePoolSize = 100;
+    public static final int maximumPoolSize = corePoolSize;
+
+    @Bean
+    public ThreadPoolExecutor requestExecutor() {
+        ThreadPoolExecutor t = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
+                0L, TimeUnit.MILLISECONDS,
+                new LinkedBlockingQueue<>(), new CustomizableThreadFactory("request-"));
+        t.prestartAllCoreThreads();
+        return t;
+    }
+}