Browse Source

20230521--service已经写好了,还差接口的封装

lifuquan 2 years ago
parent
commit
ed45765ef1

+ 1 - 0
.gitignore

@@ -0,0 +1 @@
+**/target/

+ 136 - 0
doc/esb/EsbNioClient.java

@@ -0,0 +1,136 @@
+package com.nokia.esb;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+import com.nokia.pojo.TousuSheet;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class EsbNioClient {
+    private SocketChannel channel = null;
+    private volatile ScheduledFuture<?> scheduledFuture = null;
+    private final ThreadPoolTaskScheduler socketScheduler;
+    private final InetSocketAddress address;
+    private final String userName;
+    private final String password;
+
+    public EsbNioClient(String host, int port, String userName, String password,
+            ThreadPoolTaskScheduler socketScheduler) {
+        address = new InetSocketAddress(host, port);
+        this.userName = userName;
+        this.password = password;
+        this.socketScheduler = socketScheduler;
+    }
+
+    public void send(TousuSheet tousuSheet) {
+        if (channel == null || !channel.isConnected()) {
+            connect();
+        }
+        // 停止心跳检测
+        stopHeartbeat();
+        Future<?> future = socketScheduler.submit(() -> {
+            try {
+                channel.write(ByteBuffer.wrap(MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8")));
+            } catch (IOException e) {
+                throw new RuntimeException(e.getMessage());
+            }
+        });
+        try {
+            future.get();
+            log.debug("发送成功...");
+        } catch (InterruptedException | ExecutionException e) {
+            log.error("发送失败...");
+            e.printStackTrace();
+            // 释放资源
+            close();
+            throw new RuntimeException(e.getMessage());
+        }
+        startHeartbeat();
+    }
+
+    public void connect() {
+        if (channel != null && channel.isConnected()) {
+            return;
+        }
+        try {
+            channel = SocketChannel.open(address);
+            while (!channel.finishConnect()) {
+                // 等待连接建立
+            }
+            channel.write(ByteBuffer.wrap(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8")));
+            log.debug("已发送鉴权请求...");
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            channel.read(buffer);
+            String parseVerifyRespone = MessageUtil.parseVerifyRespone(buffer.array());
+            if ("0".equals(parseVerifyRespone)) {
+                // 连接成功
+                log.debug("鉴权成功,已建立连接...");
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+            // 释放资源
+            close();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * 释放资源
+     */
+    private void close() {
+        // TODO 释放资源的实现
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            scheduledFuture = null;
+        }
+        if (channel != null) {
+            try {
+                channel.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e.getMessage());
+            }
+            channel = null;
+        }
+    }
+
+    /**
+     * 启动心跳
+     */
+    private void startHeartbeat() {
+        if (scheduledFuture == null) {
+            synchronized (this) {
+                if (scheduledFuture == null) {
+                    scheduledFuture = socketScheduler.scheduleAtFixedRate(() -> {
+                        try {
+                            channel.write(ByteBuffer.wrap(MessageUtil.getHeartbeatMessage().getBytes("utf-8")));
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                            // 释放资源
+                            close();
+                            throw new RuntimeException(e.getMessage());
+                        }
+                    }, new Date(System.currentTimeMillis() + 60000), 60000); // 60秒后开始,每60秒1次
+                }
+            }
+        }
+    }
+
+    private void stopHeartbeat() {
+        if (scheduledFuture == null) {
+            return;
+        }
+        scheduledFuture.cancel(false);
+        scheduledFuture = null;
+    }
+}

+ 173 - 0
doc/esb/EsbSocketClient.java

@@ -0,0 +1,173 @@
+package com.nokia.esb;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Date;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
+
+import com.nokia.pojo.TousuSheet;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class EsbSocketClient {
+    private Socket socket = null;
+    private InputStream inputStream = null;
+    private OutputStream outputStream = null;
+
+    private final ThreadPoolTaskScheduler socketScheduler;
+    private final InetSocketAddress address;
+    private final String userName;
+    private final String password;
+    private final int timeout = 3000;
+    private long heartbeatDelay = 60000L;
+
+    /**
+     * 指定远程服务器地址端口和鉴权信息
+     * 
+     * @param host
+     * @param port
+     * @param userName
+     * @param password
+     */
+    public EsbSocketClient(String host, int port, String userName, String password,
+            ThreadPoolTaskScheduler socketScheduler) {
+        this.userName = userName;
+        this.password = password;
+        address = new InetSocketAddress(host, port);
+        this.socketScheduler = socketScheduler;
+    }
+
+    public boolean send(TousuSheet tousuSheet) {
+        // 检查连接状态
+        if (socket == null || !socket.isConnected()) {
+            connect();
+        }
+        Future<Boolean> future = socketScheduler.submit(() -> {
+            try {
+                byte[] bytes = MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8");
+                outputStream.write(bytes);
+                outputStream.flush();
+                log.debug("已发送消息...");
+                // 并不是每次发送都会有回复,所以这里不能同步收回复消息
+                if (inputStream.available() > 0) {
+                    byte[] resp = new byte[1024];
+                    inputStream.read(resp);
+                    log.debug(new String(resp).trim());
+                }
+                return true;
+            } catch (IOException e) {
+                e.printStackTrace();
+                throw new RuntimeException(e.getMessage());
+            }
+        });
+        try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * 连接到远程服务器并完成鉴权
+     */
+    synchronized public void connect() {
+        if (socket != null && socket.isConnected()) {
+            // 确认当前未在连接状态
+            return;
+        }
+        socket = new Socket();
+        try {
+            // 连接到远程服务器
+            socket.connect(address, timeout);
+            // 绑定输入输出流
+            inputStream = socket.getInputStream();
+            outputStream = socket.getOutputStream();
+            // 发送鉴权消息
+            outputStream.write(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8"));
+            outputStream.flush();
+            log.debug("已发送鉴权请求...");
+            // 接收鉴权结果 返回消息应该没有超过1024的
+            byte[] resp = new byte[1024];
+            inputStream.read(resp);
+            String parseVerifyRespone = MessageUtil.parseVerifyRespone(resp);
+            if ("0".equals(parseVerifyRespone)) {
+                // 连接成功
+                log.debug("鉴权成功,已建立连接...");
+                socketScheduler.scheduleAtFixedRate(() -> {
+                    heartbeat();
+                }, new Date(System.currentTimeMillis() + heartbeatDelay), heartbeatDelay);
+                return;
+            } else {
+                log.error("连接失败..." + parseVerifyRespone);
+                // 释放资源
+                close();
+                throw new RuntimeException("连接失败..." + parseVerifyRespone);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+            // 释放资源
+            close();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * 心跳检测
+     */
+    synchronized public void heartbeat() {
+        try {
+            outputStream.write(MessageUtil.getHeartbeatMessage().getBytes("utf-8"));
+            outputStream.flush();
+            log.debug("已发送心跳消息...");
+            byte[] resp = new byte[1024];
+            inputStream.read(resp);
+            MessageUtil.parseHeartbeatRespone(resp);
+            log.debug("收到心跳回复...");
+        } catch (IOException e) {
+            e.printStackTrace();
+            // 释放资源
+            close();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * 释放资源
+     */
+    private void close() {
+        // 释放资源
+        if (inputStream != null) {
+            try {
+                inputStream.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            inputStream = null;
+        }
+        if (outputStream != null) {
+            try {
+                outputStream.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            outputStream = null;
+        }
+        if (socket != null) {
+            try {
+                socket.close();
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            socket = null;
+        }
+    }
+
+}

+ 9 - 0
doc/esb/EsbVerifyRespone.java

@@ -0,0 +1,9 @@
+package com.nokia.esb;
+
+import lombok.Data;
+
+@Data
+public class EsbVerifyRespone {
+    private boolean success;
+    private String message;
+}

+ 121 - 0
doc/esb/MessageUtil.java

@@ -0,0 +1,121 @@
+package com.nokia.esb;
+
+import java.io.UnsupportedEncodingException;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nokia.pojo.TousuSheet;
+
+public class MessageUtil {
+
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
+    /**
+     * 构造一条鉴权消息
+     * 
+     * @param userName
+     * @param password
+     * @return 字符串格式的鉴权消息
+     */
+    public static String getVerifyMessage(String userName, String password) {
+        StringBuffer stringBuffer = new StringBuffer();
+        stringBuffer.append("<Connect>\r\n");
+        stringBuffer.append("UserName:").append(userName).append("\r\n");
+        stringBuffer.append("Password:").append(password).append("\r\n");
+        stringBuffer.append("</Connect>\r\n");
+        return stringBuffer.toString();
+    }
+
+    /**
+     * 解析鉴权消息的返回值
+     * 
+     * @param bytes
+     * @return 字符串0表示成功,失败返回完整的鉴权消息
+     */
+    public static String parseVerifyRespone(byte[] bytes) {
+        try {
+            String resp = new String(bytes, "utf-8");
+            resp = resp.trim();
+            if (resp.startsWith("<ConnectACK>")) {
+                int index = resp.indexOf("VerifyResult:0");
+                if (index > 0) {
+                    return "0";
+                } else {
+                    return resp;
+                }
+            } else {
+                throw new RuntimeException("鉴权失败,鉴权消息格式错误");
+            }
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    /**
+     * 心跳检测的消息
+     * 
+     * @return
+     */
+    public static String getHeartbeatMessage() {
+        StringBuffer stringBuffer = new StringBuffer();
+        stringBuffer.append("<HeartBeat>\r\n");
+        stringBuffer.append("ProvinceID:18\r\n");
+        stringBuffer.append("</HeartBeat>\r\n");
+        return stringBuffer.toString();
+    }
+
+    /**
+     * 解析心跳消息
+     * 
+     * @param bytes
+     * @return
+     */
+    public static String parseHeartbeatRespone(byte[] bytes) {
+        try {
+            String resp = new String(bytes, "utf-8");
+            resp = resp.trim();
+            if (resp.startsWith("<HeartBeatACK>")) {
+                return resp;
+            } else {
+                throw new RuntimeException("心跳失败" + resp);
+            }
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e.getMessage());
+        }
+    }
+
+    public static String getBusinessMessage(TousuSheet tousuSheet) {
+        StringBuffer stringBuffer = new StringBuffer();
+        stringBuffer.append("<AlarmStart>\r\n");
+        try {
+            stringBuffer.append(objectMapper.writeValueAsString(tousuSheet));
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+            throw new RuntimeException(e.getMessage());
+        }
+        stringBuffer.append("\r\n");
+        // stringBuffer.append("MsgSerial:").append(tousuSheet.getKfsn()).append("\r\n");
+        // stringBuffer.append("city:").append(tousuSheet.getCity()).append("\r\n");
+        // stringBuffer.append("kfsn:").append(tousuSheet.getKfsn()).append("\r\n");
+        // stringBuffer.append("acceptNo:").append(tousuSheet.getAcceptNo()).append("\r\n");
+        // stringBuffer.append("custLevel:").append(tousuSheet.getCustLevel()).append("\r\n");
+        // stringBuffer.append("pkgName:").append(tousuSheet.getPkgName()).append("\r\n");
+        // stringBuffer.append("acceptChannel:").append(tousuSheet.getAcceptChannel()).append("\r\n");
+        // stringBuffer.append("cplLevel:").append(tousuSheet.getCplLevel()).append("\r\n");
+        // stringBuffer.append("cplStype:").append(tousuSheet.getCplStype()).append("\r\n");
+        // stringBuffer.append("cplSname:").append(tousuSheet.getCplSname()).append("\r\n");
+        // stringBuffer.append("cplSno:").append(tousuSheet.getCplSno()).append("\r\n");
+        // stringBuffer.append("cplLon:").append(tousuSheet.getCplLon()).append("\r\n");
+        // stringBuffer.append("cplLat:").append(tousuSheet.getCplLat()).append("\r\n");
+        // stringBuffer.append("yhLon:").append(tousuSheet.getYhLon()).append("\r\n");
+        // stringBuffer.append("yhLat:").append(tousuSheet.getYhLat()).append("\r\n");
+        // stringBuffer.append("yhStype:").append(tousuSheet.getYhStype()).append("\r\n");
+        // stringBuffer.append("yhNtype:").append(tousuSheet.getYhNtype()).append("\r\n");
+        // stringBuffer.append("dstaddress:").append(tousuSheet.getDstaddress()).append("\r\n");
+        // stringBuffer.append("yhOpinion:").append(tousuSheet.getYhOpinion()).append("\r\n");
+        stringBuffer.append("<AlarmEnd>\r\n");
+        return stringBuffer.toString();
+    }
+}

+ 39 - 0
doc/投诉系统环境.md

@@ -0,0 +1,39 @@
+# 投诉系统环境
+
+172.16.110.7 192.168.10.7
+172.16.110.41 192.168.70.125
+172.16.110.42 192.168.10.41
+
+```json
+{
+"kfsn":"kfsn333333333",
+"acceptNo":"1",
+"custLevel":"2",
+"pkgName":"3",
+"acceptChannel":"4",
+"cplLevel":"5",
+"cplStype":"6",
+"cplSname":"7",
+"cplSno":"8",
+"cplLon":"9",
+"cplLat":"1",
+"yhLon":"1",
+"yhLat":"1",
+"yhStype":"1",
+"yhNtype":"1",
+"dstaddress":"1",
+"yhOpinion":"1"
+}
+```
+
+```text
+<HeartBeatACK>
+ProvinceID:18
+</HeartBeatACK>
+
+<Ack>
+MsgSerial:接收到1000条告警,未获取到MsgSerial,请检查告警报文格式!ProvinceID:HEB
+</Ack>
+
+<ConnectACK>VerifyResult:0FaultReason:</ConnectACK>
+```

BIN
doc/河北联通网络运营数字化支撑系统与掌沃建设接口规范-v1.4.docx


BIN
doc/移网投诉工单管控系统版本更新内容(投诉驱动建设)v1.3.docx


+ 70 - 0
pom.xml

@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.nokia</groupId>
+    <artifactId>esb-socket</artifactId>
+    <version>1.0</version>
+
+    <packaging>jar</packaging>
+
+    <properties>
+        <!-- 跳过测试代码 -->
+        <skipTests>true</skipTests>
+        <!-- 指定java的版本 -->
+        <java.version>1.8</java.version>
+        <!-- 文件拷贝时的编码 -->
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <!-- 指定maven-compiler-plugin的配置属性开始 -->
+        <!-- 编译时的编码 -->
+        <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
+        <!-- 指定编译的版本 -->
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
+        <!-- 指定maven-compiler-plugin的配置属性结束 -->
+    </properties>
+
+    <dependencies>
+        <!-- 引入netty框架 -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.1.92.Final</version>
+        </dependency>
+        <!--starter-web-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <!--lombok-->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <!-- springboot-test -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <!-- 当不把spring-boot-parent
+            作为parent项目时,需要使用dependencyManagement引入spring-boot-dependencies管理依赖 -->
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>2.6.14</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+</project>

+ 11 - 0
src/main/java/com/nokia/EsbSocketApplication.java

@@ -0,0 +1,11 @@
+package com.nokia;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class EsbSocketApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(EsbSocketApplication.class, args);
+    }
+}

+ 25 - 0
src/main/java/com/nokia/pojo/TousuSheet.java

@@ -0,0 +1,25 @@
+package com.nokia.pojo;
+
+import lombok.Data;
+
+@Data
+public class TousuSheet {
+    private String city;
+    private String kfsn;
+    private String acceptNo;
+    private String custLevel;
+    private String pkgName;
+    private String acceptChannel;
+    private String cplLevel;
+    private String cplStype;
+    private String cplSname;
+    private String cplSno;
+    private String cplLon;
+    private String cplLat;
+    private String yhLon;
+    private String yhLat;
+    private String yhStype;
+    private String yhNtype;
+    private String dstaddress;
+    private String yhOpinion;
+}

+ 136 - 0
src/main/java/com/nokia/service/EsbNettyService.java

@@ -0,0 +1,136 @@
+package com.nokia.service;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import javax.annotation.PreDestroy;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nokia.pojo.TousuSheet;
+import com.nokia.service.handler.AuthHandler;
+import com.nokia.service.handler.HeartbeatHandler;
+import com.nokia.service.handler.MessagePrintHandler;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.timeout.IdleStateHandler;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Service
+public class EsbNettyService {
+
+    private String host = "10.160.84.4";
+    private int port = 9130;
+    private String userName = "Nokia-TousuSheet";
+    private String password = "mEx7^9qJE";
+    private final ObjectMapper objectMapper;
+    private final EventLoopGroup group;
+    @Getter
+    private Channel channel = null;
+
+    @Autowired
+    public EsbNettyService(ObjectMapper objectMapper) {
+        this.objectMapper = objectMapper;
+        // 线程资源在实例初始化时建立
+        group = new NioEventLoopGroup(1);
+    }
+
+    /**
+     * 发送消息
+     * 
+     * @param tousuSheet
+     */
+    public void send(TousuSheet tousuSheet) {
+        if (channel == null || !channel.isActive()) {
+            // 连接异常
+            log.info("准备连接到服务器...");
+            connect();
+        }
+        try {
+            String msg = objectMapper.writeValueAsString(tousuSheet);
+            synchronized (channel) {
+                channel.writeAndFlush("<AlarmStart>\r\n" + msg + "\r\n<AlarmEnd>\r\n");
+            }
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+            log.error("发送失败--{}", e.getMessage());
+        }
+    }
+
+    /**
+     * 建立连接
+     */
+    synchronized public void connect() {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        Bootstrap bootstrap = new Bootstrap();
+        bootstrap.group(group)
+                .channel(NioSocketChannel.class)
+                .option(ChannelOption.SO_KEEPALIVE, true)
+                .handler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    protected void initChannel(SocketChannel socketChannel) throws Exception {
+                        socketChannel.pipeline()
+                                .addLast(new StringEncoder(StandardCharsets.UTF_8))
+                                .addLast(new IdleStateHandler(0, 60, 0))
+                                .addLast(new HeartbeatHandler())
+                                .addLast(new StringDecoder(StandardCharsets.UTF_8))
+                                .addLast(new MessagePrintHandler())
+                                .addLast(new AuthHandler(userName, password, future));
+                    }
+                });
+        try {
+            ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
+            // 建立
+            channel = channelFuture.channel();
+            Boolean flag = future.get();
+            if (flag) {
+                log.info("通道已激活, 鉴权已通过...");
+            } else {
+                disconnect();
+                throw new RuntimeException("鉴权失败,已断开连接...");
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * 释放连接
+     */
+    synchronized public void disconnect() {
+        if (channel != null) {
+            try {
+                channel.closeFuture().sync();
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+            channel = null;
+        }
+    }
+
+    /**
+     * 由spring框架控制自动释放资源
+     * 
+     * @throws InterruptedException
+     */
+    @PreDestroy
+    public void tearDown() throws InterruptedException {
+        group.shutdownGracefully().sync();
+    }
+}

+ 61 - 0
src/main/java/com/nokia/service/handler/AuthHandler.java

@@ -0,0 +1,61 @@
+package com.nokia.service.handler;
+
+import java.util.concurrent.CompletableFuture;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * 处理鉴权消息
+ * 通过CompletableFuture<Boolean>
+ */
+@Slf4j
+public class AuthHandler extends SimpleChannelInboundHandler<String> {
+
+    private final String userName;
+    private final String password;
+    private final CompletableFuture<Boolean> future;
+
+    public AuthHandler(String userName, String password, CompletableFuture<Boolean> future) {
+        this.userName = userName;
+        this.password = password;
+        this.future = future;
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
+        // 处理鉴权结果
+        msg = msg.trim();
+        if (msg.startsWith("<ConnectACK>")) {
+            int index = msg.indexOf("VerifyResult:0");
+            if (index > 0) {
+                // 鉴权成功,删除鉴权处理器
+                ctx.pipeline().remove(this);
+                future.complete(true);
+            }
+        } else {
+            log.error("鉴权失败,返回消息:{}", msg.replaceAll("\r\n", ""));
+            ctx.close();
+            future.complete(false);
+        }
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        // 发送鉴权消息
+        StringBuffer stringBuffer = new StringBuffer();
+        stringBuffer.append("<Connect>\r\nUserName:")
+                .append(userName).append("\r\nPassword:")
+                .append(password).append("\r\n</Connect>\r\n");
+        ctx.writeAndFlush(stringBuffer.toString());
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+        future.complete(false);
+    }
+
+}

+ 26 - 0
src/main/java/com/nokia/service/handler/HeartbeatHandler.java

@@ -0,0 +1,26 @@
+package com.nokia.service.handler;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+
+public class HeartbeatHandler extends ChannelDuplexHandler {
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        if (evt instanceof IdleStateEvent) {
+            // 如果60秒没有写操作,就发送心跳消息
+            if (((IdleStateEvent) evt).state().equals(IdleState.WRITER_IDLE)) {
+                // 发送心跳消息
+                ctx.writeAndFlush("<HeartBeat>\r\nProvinceID:18\r\n</HeartBeat>\r\n");
+            }
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        cause.printStackTrace();
+        ctx.close();
+    }
+}

+ 16 - 0
src/main/java/com/nokia/service/handler/MessagePrintHandler.java

@@ -0,0 +1,16 @@
+package com.nokia.service.handler;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class MessagePrintHandler extends SimpleChannelInboundHandler<String> {
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
+        log.info(msg.replace("\r\n", ""));
+        ctx.fireChannelRead(msg);
+    }
+
+}

+ 38 - 0
src/test/java/com/nokia/EsbSocketApplicationTest.java

@@ -0,0 +1,38 @@
+package com.nokia;
+
+import java.io.FileReader;
+import java.io.IOException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.nokia.pojo.TousuSheet;
+import com.nokia.service.EsbNettyService;
+
+@SpringBootTest
+public class EsbSocketApplicationTest {
+
+    @Autowired
+    private EsbNettyService service;
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Test
+    void test1() throws IOException, InterruptedException {
+        TousuSheet tousuSheet = objectMapper.readValue(new FileReader("D:/src/toususheet.json"), TousuSheet.class);
+        DateFormat format = new SimpleDateFormat("yyyyMMddHHmmss");
+        for (int i = 0; i < 1; i++) {
+            tousuSheet.setKfsn("kfsn" + format.format(new Date()) + i);
+            service.send(tousuSheet);
+            Thread.sleep(10);
+            // System.out.println("发送。。。" + tousuSheet.getKfsn());
+        }
+        System.in.read();
+    }
+
+}