package com.nokia.esb; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Date; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import com.nokia.pojo.TousuSheet; import lombok.extern.slf4j.Slf4j; @Slf4j public class EsbSocketClient { private Socket socket = null; private InputStream inputStream = null; private OutputStream outputStream = null; private final ThreadPoolTaskScheduler socketScheduler; private final InetSocketAddress address; private final String userName; private final String password; private final int timeout = 3000; private long heartbeatDelay = 60000L; /** * 指定远程服务器地址端口和鉴权信息 * * @param host * @param port * @param userName * @param password */ public EsbSocketClient(String host, int port, String userName, String password, ThreadPoolTaskScheduler socketScheduler) { this.userName = userName; this.password = password; address = new InetSocketAddress(host, port); this.socketScheduler = socketScheduler; } public boolean send(TousuSheet tousuSheet) { // 检查连接状态 if (socket == null || !socket.isConnected()) { connect(); } Future future = socketScheduler.submit(() -> { try { byte[] bytes = MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8"); outputStream.write(bytes); outputStream.flush(); log.debug("已发送消息..."); // 并不是每次发送都会有回复,所以这里不能同步收回复消息 if (inputStream.available() > 0) { byte[] resp = new byte[1024]; inputStream.read(resp); log.debug(new String(resp).trim()); } return true; } catch (IOException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } }); try { return future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } /** * 连接到远程服务器并完成鉴权 */ synchronized public void connect() { if (socket != null && socket.isConnected()) { // 确认当前未在连接状态 return; } socket = new Socket(); try { // 连接到远程服务器 socket.connect(address, timeout); // 绑定输入输出流 inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); // 发送鉴权消息 outputStream.write(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8")); outputStream.flush(); log.debug("已发送鉴权请求..."); // 接收鉴权结果 返回消息应该没有超过1024的 byte[] resp = new byte[1024]; inputStream.read(resp); String parseVerifyRespone = MessageUtil.parseVerifyRespone(resp); if ("0".equals(parseVerifyRespone)) { // 连接成功 log.debug("鉴权成功,已建立连接..."); socketScheduler.scheduleAtFixedRate(() -> { heartbeat(); }, new Date(System.currentTimeMillis() + heartbeatDelay), heartbeatDelay); return; } else { log.error("连接失败..." + parseVerifyRespone); // 释放资源 close(); throw new RuntimeException("连接失败..." + parseVerifyRespone); } } catch (IOException e) { e.printStackTrace(); // 释放资源 close(); throw new RuntimeException(e.getMessage()); } } /** * 心跳检测 */ synchronized public void heartbeat() { try { outputStream.write(MessageUtil.getHeartbeatMessage().getBytes("utf-8")); outputStream.flush(); log.debug("已发送心跳消息..."); byte[] resp = new byte[1024]; inputStream.read(resp); MessageUtil.parseHeartbeatRespone(resp); log.debug("收到心跳回复..."); } catch (IOException e) { e.printStackTrace(); // 释放资源 close(); throw new RuntimeException(e.getMessage()); } } /** * 释放资源 */ private void close() { // 释放资源 if (inputStream != null) { try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); } inputStream = null; } if (outputStream != null) { try { outputStream.close(); } catch (IOException e) { e.printStackTrace(); } outputStream = null; } if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } }