package com.nokia.esb; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import com.nokia.pojo.TousuSheet; import lombok.extern.slf4j.Slf4j; @Slf4j public class EsbNioClient { private SocketChannel channel = null; private volatile ScheduledFuture scheduledFuture = null; private final ThreadPoolTaskScheduler socketScheduler; private final InetSocketAddress address; private final String userName; private final String password; public EsbNioClient(String host, int port, String userName, String password, ThreadPoolTaskScheduler socketScheduler) { address = new InetSocketAddress(host, port); this.userName = userName; this.password = password; this.socketScheduler = socketScheduler; } public void send(TousuSheet tousuSheet) { if (channel == null || !channel.isConnected()) { connect(); } // 停止心跳检测 stopHeartbeat(); Future future = socketScheduler.submit(() -> { try { channel.write(ByteBuffer.wrap(MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8"))); } catch (IOException e) { throw new RuntimeException(e.getMessage()); } }); try { future.get(); log.debug("发送成功..."); } catch (InterruptedException | ExecutionException e) { log.error("发送失败..."); e.printStackTrace(); // 释放资源 close(); throw new RuntimeException(e.getMessage()); } startHeartbeat(); } public void connect() { if (channel != null && channel.isConnected()) { return; } try { channel = SocketChannel.open(address); while (!channel.finishConnect()) { // 等待连接建立 } channel.write(ByteBuffer.wrap(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8"))); log.debug("已发送鉴权请求..."); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer); String parseVerifyRespone = MessageUtil.parseVerifyRespone(buffer.array()); if ("0".equals(parseVerifyRespone)) { // 连接成功 log.debug("鉴权成功,已建立连接..."); } } catch (IOException e) { e.printStackTrace(); // 释放资源 close(); throw new RuntimeException(e.getMessage()); } } /** * 释放资源 */ private void close() { // TODO 释放资源的实现 if (scheduledFuture != null) { scheduledFuture.cancel(false); scheduledFuture = null; } if (channel != null) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } channel = null; } } /** * 启动心跳 */ private void startHeartbeat() { if (scheduledFuture == null) { synchronized (this) { if (scheduledFuture == null) { scheduledFuture = socketScheduler.scheduleAtFixedRate(() -> { try { channel.write(ByteBuffer.wrap(MessageUtil.getHeartbeatMessage().getBytes("utf-8"))); } catch (IOException e) { e.printStackTrace(); // 释放资源 close(); throw new RuntimeException(e.getMessage()); } }, new Date(System.currentTimeMillis() + 60000), 60000); // 60秒后开始,每60秒1次 } } } } private void stopHeartbeat() { if (scheduledFuture == null) { return; } scheduledFuture.cancel(false); scheduledFuture = null; } }