EsbNioClient.java 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package com.nokia.esb;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SocketChannel;
  6. import java.util.Date;
  7. import java.util.concurrent.ExecutionException;
  8. import java.util.concurrent.Future;
  9. import java.util.concurrent.ScheduledFuture;
  10. import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
  11. import com.nokia.pojo.TousuSheet;
  12. import lombok.extern.slf4j.Slf4j;
  13. @Slf4j
  14. public class EsbNioClient {
  15. private SocketChannel channel = null;
  16. private volatile ScheduledFuture<?> scheduledFuture = null;
  17. private final ThreadPoolTaskScheduler socketScheduler;
  18. private final InetSocketAddress address;
  19. private final String userName;
  20. private final String password;
  21. public EsbNioClient(String host, int port, String userName, String password,
  22. ThreadPoolTaskScheduler socketScheduler) {
  23. address = new InetSocketAddress(host, port);
  24. this.userName = userName;
  25. this.password = password;
  26. this.socketScheduler = socketScheduler;
  27. }
  28. public void send(TousuSheet tousuSheet) {
  29. if (channel == null || !channel.isConnected()) {
  30. connect();
  31. }
  32. // 停止心跳检测
  33. stopHeartbeat();
  34. Future<?> future = socketScheduler.submit(() -> {
  35. try {
  36. channel.write(ByteBuffer.wrap(MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8")));
  37. } catch (IOException e) {
  38. throw new RuntimeException(e.getMessage());
  39. }
  40. });
  41. try {
  42. future.get();
  43. log.debug("发送成功...");
  44. } catch (InterruptedException | ExecutionException e) {
  45. log.error("发送失败...");
  46. e.printStackTrace();
  47. // 释放资源
  48. close();
  49. throw new RuntimeException(e.getMessage());
  50. }
  51. startHeartbeat();
  52. }
  53. public void connect() {
  54. if (channel != null && channel.isConnected()) {
  55. return;
  56. }
  57. try {
  58. channel = SocketChannel.open(address);
  59. while (!channel.finishConnect()) {
  60. // 等待连接建立
  61. }
  62. channel.write(ByteBuffer.wrap(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8")));
  63. log.debug("已发送鉴权请求...");
  64. ByteBuffer buffer = ByteBuffer.allocate(1024);
  65. channel.read(buffer);
  66. String parseVerifyRespone = MessageUtil.parseVerifyRespone(buffer.array());
  67. if ("0".equals(parseVerifyRespone)) {
  68. // 连接成功
  69. log.debug("鉴权成功,已建立连接...");
  70. }
  71. } catch (IOException e) {
  72. e.printStackTrace();
  73. // 释放资源
  74. close();
  75. throw new RuntimeException(e.getMessage());
  76. }
  77. }
  78. /**
  79. * 释放资源
  80. */
  81. private void close() {
  82. // TODO 释放资源的实现
  83. if (scheduledFuture != null) {
  84. scheduledFuture.cancel(false);
  85. scheduledFuture = null;
  86. }
  87. if (channel != null) {
  88. try {
  89. channel.close();
  90. } catch (IOException e) {
  91. e.printStackTrace();
  92. throw new RuntimeException(e.getMessage());
  93. }
  94. channel = null;
  95. }
  96. }
  97. /**
  98. * 启动心跳
  99. */
  100. private void startHeartbeat() {
  101. if (scheduledFuture == null) {
  102. synchronized (this) {
  103. if (scheduledFuture == null) {
  104. scheduledFuture = socketScheduler.scheduleAtFixedRate(() -> {
  105. try {
  106. channel.write(ByteBuffer.wrap(MessageUtil.getHeartbeatMessage().getBytes("utf-8")));
  107. } catch (IOException e) {
  108. e.printStackTrace();
  109. // 释放资源
  110. close();
  111. throw new RuntimeException(e.getMessage());
  112. }
  113. }, new Date(System.currentTimeMillis() + 60000), 60000); // 60秒后开始,每60秒1次
  114. }
  115. }
  116. }
  117. }
  118. private void stopHeartbeat() {
  119. if (scheduledFuture == null) {
  120. return;
  121. }
  122. scheduledFuture.cancel(false);
  123. scheduledFuture = null;
  124. }
  125. }