EsbSocketClient.java 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package com.nokia.esb;
  2. import java.io.IOException;
  3. import java.io.InputStream;
  4. import java.io.OutputStream;
  5. import java.net.InetSocketAddress;
  6. import java.net.Socket;
  7. import java.util.Date;
  8. import java.util.concurrent.ExecutionException;
  9. import java.util.concurrent.Future;
  10. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  11. import com.nokia.pojo.TousuSheet;
  12. import lombok.extern.slf4j.Slf4j;
  13. @Slf4j
  14. public class EsbSocketClient {
  15. private Socket socket = null;
  16. private InputStream inputStream = null;
  17. private OutputStream outputStream = null;
  18. private final ThreadPoolTaskScheduler socketScheduler;
  19. private final InetSocketAddress address;
  20. private final String userName;
  21. private final String password;
  22. private final int timeout = 3000;
  23. private long heartbeatDelay = 60000L;
  24. /**
  25. * 指定远程服务器地址端口和鉴权信息
  26. *
  27. * @param host
  28. * @param port
  29. * @param userName
  30. * @param password
  31. */
  32. public EsbSocketClient(String host, int port, String userName, String password,
  33. ThreadPoolTaskScheduler socketScheduler) {
  34. this.userName = userName;
  35. this.password = password;
  36. address = new InetSocketAddress(host, port);
  37. this.socketScheduler = socketScheduler;
  38. }
  39. public boolean send(TousuSheet tousuSheet) {
  40. // 检查连接状态
  41. if (socket == null || !socket.isConnected()) {
  42. connect();
  43. }
  44. Future<Boolean> future = socketScheduler.submit(() -> {
  45. try {
  46. byte[] bytes = MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8");
  47. outputStream.write(bytes);
  48. outputStream.flush();
  49. log.debug("已发送消息...");
  50. // 并不是每次发送都会有回复,所以这里不能同步收回复消息
  51. if (inputStream.available() > 0) {
  52. byte[] resp = new byte[1024];
  53. inputStream.read(resp);
  54. log.debug(new String(resp).trim());
  55. }
  56. return true;
  57. } catch (IOException e) {
  58. e.printStackTrace();
  59. throw new RuntimeException(e.getMessage());
  60. }
  61. });
  62. try {
  63. return future.get();
  64. } catch (InterruptedException | ExecutionException e) {
  65. e.printStackTrace();
  66. throw new RuntimeException(e.getMessage());
  67. }
  68. }
  69. /**
  70. * 连接到远程服务器并完成鉴权
  71. */
  72. synchronized public void connect() {
  73. if (socket != null && socket.isConnected()) {
  74. // 确认当前未在连接状态
  75. return;
  76. }
  77. socket = new Socket();
  78. try {
  79. // 连接到远程服务器
  80. socket.connect(address, timeout);
  81. // 绑定输入输出流
  82. inputStream = socket.getInputStream();
  83. outputStream = socket.getOutputStream();
  84. // 发送鉴权消息
  85. outputStream.write(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8"));
  86. outputStream.flush();
  87. log.debug("已发送鉴权请求...");
  88. // 接收鉴权结果 返回消息应该没有超过1024的
  89. byte[] resp = new byte[1024];
  90. inputStream.read(resp);
  91. String parseVerifyRespone = MessageUtil.parseVerifyRespone(resp);
  92. if ("0".equals(parseVerifyRespone)) {
  93. // 连接成功
  94. log.debug("鉴权成功,已建立连接...");
  95. socketScheduler.scheduleAtFixedRate(() -> {
  96. heartbeat();
  97. }, new Date(System.currentTimeMillis() + heartbeatDelay), heartbeatDelay);
  98. return;
  99. } else {
  100. log.error("连接失败..." + parseVerifyRespone);
  101. // 释放资源
  102. close();
  103. throw new RuntimeException("连接失败..." + parseVerifyRespone);
  104. }
  105. } catch (IOException e) {
  106. e.printStackTrace();
  107. // 释放资源
  108. close();
  109. throw new RuntimeException(e.getMessage());
  110. }
  111. }
  112. /**
  113. * 心跳检测
  114. */
  115. synchronized public void heartbeat() {
  116. try {
  117. outputStream.write(MessageUtil.getHeartbeatMessage().getBytes("utf-8"));
  118. outputStream.flush();
  119. log.debug("已发送心跳消息...");
  120. byte[] resp = new byte[1024];
  121. inputStream.read(resp);
  122. MessageUtil.parseHeartbeatRespone(resp);
  123. log.debug("收到心跳回复...");
  124. } catch (IOException e) {
  125. e.printStackTrace();
  126. // 释放资源
  127. close();
  128. throw new RuntimeException(e.getMessage());
  129. }
  130. }
  131. /**
  132. * 释放资源
  133. */
  134. private void close() {
  135. // 释放资源
  136. if (inputStream != null) {
  137. try {
  138. inputStream.close();
  139. } catch (IOException e) {
  140. e.printStackTrace();
  141. }
  142. inputStream = null;
  143. }
  144. if (outputStream != null) {
  145. try {
  146. outputStream.close();
  147. } catch (IOException e) {
  148. e.printStackTrace();
  149. }
  150. outputStream = null;
  151. }
  152. if (socket != null) {
  153. try {
  154. socket.close();
  155. } catch (IOException e) {
  156. e.printStackTrace();
  157. }
  158. socket = null;
  159. }
  160. }
  161. }