浏览代码

feat: 添加心跳超时次数检查

weijianghai 2 年之前
父节点
当前提交
7173f5dd90

+ 4 - 0
pom.xml

@@ -173,6 +173,10 @@
 							<groupId>org.projectlombok</groupId>
 							<artifactId>lombok</artifactId>
 						</exclude>
+						<exclude>
+							<groupId>org.springframework.boot</groupId>
+							<artifactId>spring-boot-devtools</artifactId>
+						</exclude>
 					</excludes>
 				</configuration>
 			</plugin>

+ 22 - 0
src/main/java/com/nokia/hb/config/ThreadPoolConfig.java

@@ -0,0 +1,22 @@
+package com.nokia.hb.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.TaskScheduler;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+/**
+ * 线程池配置
+ */
+@Slf4j
+@Configuration
+public class ThreadPoolConfig {
+    @Bean
+    public TaskScheduler taskScheduler() {
+        ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
+        scheduler.setPoolSize(100);
+        scheduler.setThreadNamePrefix("taskScheduler-");
+        return scheduler;
+    }
+}

+ 30 - 12
src/main/java/com/nokia/hb/service/WebSocketService.java

@@ -14,6 +14,7 @@ import java.time.LocalDateTime;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * 网络套接字服务
@@ -32,10 +33,15 @@ public class WebSocketService extends AbstractWebSocketHandler {
      * ping时间map
      */
     static final ConcurrentMap<String, LocalDateTime> PING_MAP = new ConcurrentHashMap<>();
+    static final ConcurrentMap<String, AtomicInteger> TIMEOUT_MAP = new ConcurrentHashMap<>();
     /**
      * 心跳超时(s)
      */
     private Long heartbeatTimeout;
+    /**
+     * 最大心跳超时次数
+     */
+    private Integer maxHeartbeatTimeout;
     private final SessionService sessionService;
 
     public WebSocketService(SessionService sessionService) {
@@ -95,6 +101,7 @@ public class WebSocketService extends AbstractWebSocketHandler {
         log.info("handlePongMessage: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
                 message.getPayload());
         PING_MAP.put(session.getId(), LocalDateTime.now());
+        TIMEOUT_MAP.put(session.getId(), new AtomicInteger(0));
     }
 
     @Scheduled(fixedDelay = 2000)
@@ -105,18 +112,6 @@ public class WebSocketService extends AbstractWebSocketHandler {
             Map<String, Object> attributes = v.getAttributes();
             String sessionId = (String) attributes.get("JSESSIONID");
             String username = (String) attributes.get("username");
-            PING_MAP.putIfAbsent(k, LocalDateTime.now());
-            // 心跳超时断开连接
-            if (LocalDateTime.now().minusSeconds(heartbeatTimeout).compareTo(PING_MAP.get(k)) > 0) {
-                log.warn("超过{}s未收到pong: {} -> {} -> {}", heartbeatTimeout, v.getId(), sessionId, username);
-                try {
-                    v.close();
-                    PING_MAP.remove(k);
-                } catch (IOException e) {
-                    log.error(e.getMessage(), e);
-                }
-                continue;
-            }
             // 发送ping
             try {
                 log.info("ping {} -> {} -> {}", v.getId(), sessionId, username);
@@ -124,6 +119,29 @@ public class WebSocketService extends AbstractWebSocketHandler {
             } catch (IOException e) {
                 log.error(e.getMessage(), e);
             }
+            LocalDateTime now = LocalDateTime.now();
+            LocalDateTime expireTime = now.minusSeconds(heartbeatTimeout);
+            PING_MAP.putIfAbsent(k, now);
+            TIMEOUT_MAP.putIfAbsent(k, new AtomicInteger(0));
+            LocalDateTime pingTime = PING_MAP.get(k);
+            boolean isTimeout = expireTime.compareTo(pingTime) > 0;
+            log.info("pong: {} -> {} -> {} -> {} -> {} -> {}", v.getId(), sessionId, username, expireTime, pingTime,
+                    isTimeout);
+            // 心跳超时
+            if (isTimeout) {
+                log.warn("超过{}s未收到pong: {} -> {} -> {}", heartbeatTimeout, v.getId(), sessionId, username);
+                // 超过最大超时次数断开连接
+                if (TIMEOUT_MAP.get(k).incrementAndGet() > maxHeartbeatTimeout) {
+                    log.warn("超过{}次未收到pong: {} -> {} -> {}", maxHeartbeatTimeout, v.getId(), sessionId, username);
+                    try {
+                        v.close();
+                        PING_MAP.remove(k);
+                        TIMEOUT_MAP.remove(k);
+                    } catch (IOException e) {
+                        log.error(e.getMessage(), e);
+                    }
+                }
+            }
         }
     }
 }

+ 0 - 2
src/main/resources/application-dev.yml

@@ -3,5 +3,3 @@ server:
 logging:
   level:
     com.nokia.hb: debug
-session:
-  timeout: 600

+ 1 - 0
src/main/resources/application.yml

@@ -31,3 +31,4 @@ session:
   timeout: 28800
 socket:
   heartbeat-timeout: 5
+  max-heartbeat-timeout: 3