123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- package com.nokia.esb;
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SocketChannel;
- import java.util.Date;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.Future;
- import java.util.concurrent.ScheduledFuture;
- import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
- import com.nokia.pojo.TousuSheet;
- import lombok.extern.slf4j.Slf4j;
- @Slf4j
- public class EsbNioClient {
- private SocketChannel channel = null;
- private volatile ScheduledFuture<?> scheduledFuture = null;
- private final ThreadPoolTaskScheduler socketScheduler;
- private final InetSocketAddress address;
- private final String userName;
- private final String password;
- public EsbNioClient(String host, int port, String userName, String password,
- ThreadPoolTaskScheduler socketScheduler) {
- address = new InetSocketAddress(host, port);
- this.userName = userName;
- this.password = password;
- this.socketScheduler = socketScheduler;
- }
- public void send(TousuSheet tousuSheet) {
- if (channel == null || !channel.isConnected()) {
- connect();
- }
- // 停止心跳检测
- stopHeartbeat();
- Future<?> future = socketScheduler.submit(() -> {
- try {
- channel.write(ByteBuffer.wrap(MessageUtil.getBusinessMessage(tousuSheet).getBytes("utf-8")));
- } catch (IOException e) {
- throw new RuntimeException(e.getMessage());
- }
- });
- try {
- future.get();
- log.debug("发送成功...");
- } catch (InterruptedException | ExecutionException e) {
- log.error("发送失败...");
- e.printStackTrace();
- // 释放资源
- close();
- throw new RuntimeException(e.getMessage());
- }
- startHeartbeat();
- }
- public void connect() {
- if (channel != null && channel.isConnected()) {
- return;
- }
- try {
- channel = SocketChannel.open(address);
- while (!channel.finishConnect()) {
- // 等待连接建立
- }
- channel.write(ByteBuffer.wrap(MessageUtil.getVerifyMessage(userName, password).getBytes("utf-8")));
- log.debug("已发送鉴权请求...");
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- channel.read(buffer);
- String parseVerifyRespone = MessageUtil.parseVerifyRespone(buffer.array());
- if ("0".equals(parseVerifyRespone)) {
- // 连接成功
- log.debug("鉴权成功,已建立连接...");
- }
- } catch (IOException e) {
- e.printStackTrace();
- // 释放资源
- close();
- throw new RuntimeException(e.getMessage());
- }
- }
- /**
- * 释放资源
- */
- private void close() {
- // TODO 释放资源的实现
- if (scheduledFuture != null) {
- scheduledFuture.cancel(false);
- scheduledFuture = null;
- }
- if (channel != null) {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- throw new RuntimeException(e.getMessage());
- }
- channel = null;
- }
- }
- /**
- * 启动心跳
- */
- private void startHeartbeat() {
- if (scheduledFuture == null) {
- synchronized (this) {
- if (scheduledFuture == null) {
- scheduledFuture = socketScheduler.scheduleAtFixedRate(() -> {
- try {
- channel.write(ByteBuffer.wrap(MessageUtil.getHeartbeatMessage().getBytes("utf-8")));
- } catch (IOException e) {
- e.printStackTrace();
- // 释放资源
- close();
- throw new RuntimeException(e.getMessage());
- }
- }, new Date(System.currentTimeMillis() + 60000), 60000); // 60秒后开始,每60秒1次
- }
- }
- }
- }
- private void stopHeartbeat() {
- if (scheduledFuture == null) {
- return;
- }
- scheduledFuture.cancel(false);
- scheduledFuture = null;
- }
- }
|