Prechádzať zdrojové kódy

feat: 添加websocket心跳检测

weijianghai 2 rokov pred
rodič
commit
84b2c6b7ed

+ 2 - 0
src/main/java/com/nokia/hb/HbApplication.java

@@ -2,7 +2,9 @@ package com.nokia.hb;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
 
+@EnableScheduling
 @SpringBootApplication
 public class HbApplication {
     public static void main(String[] args) {

+ 83 - 17
src/main/java/com/nokia/hb/service/WebSocketService.java

@@ -1,20 +1,41 @@
 package com.nokia.hb.service;
 
+import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
-import org.springframework.web.socket.CloseStatus;
-import org.springframework.web.socket.WebSocketHandler;
-import org.springframework.web.socket.WebSocketMessage;
-import org.springframework.web.socket.WebSocketSession;
+import org.springframework.web.socket.*;
+import org.springframework.web.socket.handler.AbstractWebSocketHandler;
 
+import java.io.IOException;
+import java.time.LocalDateTime;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * 网络套接字服务
  */
+@EqualsAndHashCode(callSuper = true)
 @Slf4j
 @Service
-public class WebSocketService implements WebSocketHandler {
+@Data
+@ConfigurationProperties("socket")
+public class WebSocketService extends AbstractWebSocketHandler {
+    /**
+     * 会话映射
+     */
+    static final ConcurrentMap<String, WebSocketSession> SESSION_MAP = new ConcurrentHashMap<>();
+    /**
+     * ping时间map
+     */
+    static final ConcurrentMap<String, LocalDateTime> PING_MAP = new ConcurrentHashMap<>();
+    /**
+     * 心跳超时(s)
+     */
+    private Long heartbeatTimeout;
     private final SessionService sessionService;
 
     public WebSocketService(SessionService sessionService) {
@@ -23,41 +44,86 @@ public class WebSocketService implements WebSocketHandler {
 
     @Override
     public void afterConnectionEstablished(WebSocketSession session) throws Exception {
+        SESSION_MAP.put(session.getId(), session);
         Map<String, Object> attributes = session.getAttributes();
         String sessionId = (String) attributes.get("JSESSIONID");
         String username = (String) attributes.get("username");
-        log.info("websocket connection established: {} -> {} -> {}", session.getId(), sessionId, username);
+        log.info("afterConnectionEstablished: {} -> {} -> {}", session.getId(), sessionId, username);
     }
 
     @Override
-    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
+    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
         Map<String, Object> attributes = session.getAttributes();
         String sessionId = (String) attributes.get("JSESSIONID");
         String username = (String) attributes.get("username");
-        log.info("receive websocket message: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
-                message.getPayload());
+        log.error("handleTransportError: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
+                exception.getMessage(), exception);
     }
 
     @Override
-    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
+    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
         Map<String, Object> attributes = session.getAttributes();
         String sessionId = (String) attributes.get("JSESSIONID");
         String username = (String) attributes.get("username");
-        log.error("websocket transport error: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
-                exception.getMessage(), exception);
+        log.info("afterConnectionClosed: {}, {} -> {} -> {}", closeStatus, session.getId(), sessionId, username);
+        sessionService.invalidate(username);
+        SESSION_MAP.remove(session.getId());
+    }
+    @Override
+    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
+        Map<String, Object> attributes = session.getAttributes();
+        String sessionId = (String) attributes.get("JSESSIONID");
+        String username = (String) attributes.get("username");
+        log.info("handleTextMessage: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
+                message.getPayload());
     }
 
     @Override
-    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
+    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
         Map<String, Object> attributes = session.getAttributes();
         String sessionId = (String) attributes.get("JSESSIONID");
         String username = (String) attributes.get("username");
-        log.info("websocket connection closed: {}, {} -> {} -> {}", closeStatus, session.getId(), sessionId, username);
-        sessionService.invalidate(username);
+        log.info("handleBinaryMessage: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
+                message.getPayload());
     }
 
     @Override
-    public boolean supportsPartialMessages() {
-        return false;
+    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
+        Map<String, Object> attributes = session.getAttributes();
+        String sessionId = (String) attributes.get("JSESSIONID");
+        String username = (String) attributes.get("username");
+        log.info("handlePongMessage: {} -> {} -> {} -> {}", session.getId(), sessionId, username,
+                message.getPayload());
+        PING_MAP.put(session.getId(), LocalDateTime.now());
+    }
+
+    @Scheduled(fixedDelay = 5000)
+    public void ping() {
+        for (Map.Entry<String, WebSocketSession> entry : SESSION_MAP.entrySet()) {
+            String k = entry.getKey();
+            WebSocketSession v = entry.getValue();
+            Map<String, Object> attributes = v.getAttributes();
+            String sessionId = (String) attributes.get("JSESSIONID");
+            String username = (String) attributes.get("username");
+            PING_MAP.putIfAbsent(k, LocalDateTime.now());
+            // 心跳超时断开连接
+            if (LocalDateTime.now().minusSeconds(heartbeatTimeout).compareTo(PING_MAP.get(k)) > 0) {
+                log.warn("超过{}s未收到pong: {} -> {} -> {}", heartbeatTimeout, v.getId(), sessionId, username);
+                try {
+                    v.close();
+                    PING_MAP.remove(k);
+                } catch (IOException e) {
+                    log.error(e.getMessage(), e);
+                }
+                continue;
+            }
+            // 发送ping
+            try {
+                log.info("ping {} -> {} -> {}", v.getId(), sessionId, username);
+                v.sendMessage(new PingMessage());
+            } catch (IOException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
     }
 }

+ 2 - 0
src/main/resources/application.yml

@@ -30,3 +30,5 @@ spring:
     password: abc123!
 session:
   timeout: 300
+socket:
+  heartbeat-timeout: 10

+ 7 - 0
src/main/resources/templates/template.html

@@ -174,10 +174,17 @@
 
     socket.onclose = (event) => {
         console.log(event)
+        logout()
     }
 
     socket.onerror = (event) => {
         console.log(event)
+        logout()
+    }
+
+    const logout = () => {
+        alert('websocket连接异常,请重新登录')
+        setTimeout(() => window.location.href = '/login', 1000)
     }
         var dataA;
         var layer;