123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package com.nokia.esb;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.InetSocketAddress;
- import java.net.Socket;
- import java.util.Date;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- import com.nokia.pojo.TousuSheet;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class EsbSocketClient {
- private Socket socket = null;
- private InputStream inputStream = null;
- private OutputStream outputStream = null;
- private final ThreadPoolTaskScheduler socketScheduler;
- private final InetSocketAddress address;
- private final String userName;
- private final String password;
- private final int timeout = 3000;
- private long heartbeatDelay = 60000L;
- /**
- * 指定远程服务器地址端口和鉴权信息
- *
- * @param host
- * @param port
- * @param userName
- * @param password
- */
- public EsbSocketClient(String host, int port, String userName, String password,
- ThreadPoolTaskScheduler socketScheduler) {
- this.userName = userName;
- this.password = password;
- address = new InetSocketAddress(host, port);
- this.socketScheduler = socketScheduler;
- }
- public boolean send(TousuSheet tousuSheet) {
- // 检查连接状态
- if (socket == null || !socket.isConnected()) {
- connect();
- }
- Future<Boolean> future = socketScheduler.submit(() -> {
- try {
- byte[] bytes = MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8");
- outputStream.write(bytes);
- outputStream.flush();
- log.debug("已发送消息...");
- // 并不是每次发送都会有回复,所以这里不能同步收回复消息
- if (inputStream.available() > 0) {
- byte[] resp = new byte[1024];
- inputStream.read(resp);
- log.debug(new String(resp).trim());
- }
- return true;
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e.getMessage());
- }
- });
- try {
- return future.get();
- } catch (InterruptedException | ExecutionException e) {
- e.printStackTrace();
- throw new RuntimeException(e.getMessage());
- }
- }
- /**
- * 连接到远程服务器并完成鉴权
- */
- synchronized public void connect() {
- if (socket != null && socket.isConnected()) {
- // 确认当前未在连接状态
- return;
- }
- socket = new Socket();
- try {
- // 连接到远程服务器
- socket.connect(address, timeout);
- // 绑定输入输出流
- inputStream = socket.getInputStream();
- outputStream = socket.getOutputStream();
- // 发送鉴权消息
- outputStream.write(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8"));
- outputStream.flush();
- log.debug("已发送鉴权请求...");
- // 接收鉴权结果 返回消息应该没有超过1024的
- byte[] resp = new byte[1024];
- inputStream.read(resp);
- String parseVerifyRespone = MessageUtil.parseVerifyRespone(resp);
- if ("0".equals(parseVerifyRespone)) {
- // 连接成功
- log.debug("鉴权成功,已建立连接...");
- socketScheduler.scheduleAtFixedRate(() -> {
- heartbeat();
- }, new Date(System.currentTimeMillis() + heartbeatDelay), heartbeatDelay);
- return;
- } else {
- log.error("连接失败..." + parseVerifyRespone);
- // 释放资源
- close();
- throw new RuntimeException("连接失败..." + parseVerifyRespone);
- }
- } catch (IOException e) {
- e.printStackTrace();
- // 释放资源
- close();
- throw new RuntimeException(e.getMessage());
- }
- }
- /**
- * 心跳检测
- */
- synchronized public void heartbeat() {
- try {
- outputStream.write(MessageUtil.getHeartbeatMessage().getBytes("utf-8"));
- outputStream.flush();
- log.debug("已发送心跳消息...");
- byte[] resp = new byte[1024];
- inputStream.read(resp);
- MessageUtil.parseHeartbeatRespone(resp);
- log.debug("收到心跳回复...");
- } catch (IOException e) {
- e.printStackTrace();
- // 释放资源
- close();
- throw new RuntimeException(e.getMessage());
- }
- }
- /**
- * 释放资源
- */
- private void close() {
- // 释放资源
- if (inputStream != null) {
- try {
- inputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- inputStream = null;
- }
- if (outputStream != null) {
- try {
- outputStream.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- outputStream = null;
- }
- if (socket != null) {
- try {
- socket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- socket = null;
- }
- }
- }
|