package com.nokia.esb_socket.service; import java.nio.charset.StandardCharsets; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import javax.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.nokia.esb_socket.pojo.TousuSheet; import com.nokia.esb_socket.service.handler.AuthHandler; import com.nokia.esb_socket.service.handler.HeartbeatHandler; import com.nokia.esb_socket.service.handler.MessagePrintHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.timeout.IdleStateHandler; import lombok.extern.slf4j.Slf4j; /** * 投诉驱动建设socket接口 */ @Slf4j @Service public class EsbNettyService { @Value("${toususheet.host}") private String host; @Value("${toususheet.port}") private int port; @Value("${toususheet.userName}") private String userName; @Value("${toususheet.password}") private String password; private final ObjectMapper objectMapper; private final EventLoopGroup group; private Channel channel = null; @Autowired public EsbNettyService(ObjectMapper objectMapper) { this.objectMapper = objectMapper; // 线程资源在实例初始化时建立 group = new NioEventLoopGroup(1); } /** * 发送消息 * * @param tousuSheet */ public void send(TousuSheet tousuSheet) { if (channel == null || !channel.isActive()) { // 连接异常 log.info("准备连接到服务器..."); connect(); } try { String msg = objectMapper.writeValueAsString(tousuSheet); synchronized (channel) { channel.writeAndFlush("\r\n" + msg + "\r\n\r\n"); } } catch (JsonProcessingException e) { e.printStackTrace(); log.error("发送失败--{}", e.getMessage()); } } /** * 建立连接 */ synchronized public void connect() { CompletableFuture future = new CompletableFuture<>(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new StringEncoder(StandardCharsets.UTF_8)) .addLast(new IdleStateHandler(0, 60, 0)) .addLast(new HeartbeatHandler()) .addLast(new StringDecoder(StandardCharsets.UTF_8)) .addLast(new MessagePrintHandler()) .addLast(new AuthHandler(userName, password, future)); } }); try { ChannelFuture channelFuture = bootstrap.connect(host, port).sync(); // 建立 channel = channelFuture.channel(); Boolean flag = future.get(); if (flag) { log.info("通道已激活, 鉴权已通过..."); } else { disconnect(); throw new RuntimeException("鉴权失败,已断开连接..."); } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } /** * 释放连接 */ synchronized public void disconnect() { if (channel != null) { try { channel.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } channel = null; } } /** * 由spring框架控制自动释放资源 * * @throws InterruptedException */ @PreDestroy public void tearDown() throws InterruptedException { group.shutdownGracefully().sync(); } }