|
@@ -0,0 +1,358 @@
|
|
|
+package com.nokia.sms.sgip.mt.client;
|
|
|
+
|
|
|
+import java.io.UnsupportedEncodingException;
|
|
|
+import java.net.InetSocketAddress;
|
|
|
+import java.text.SimpleDateFormat;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.FutureTask;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
+
|
|
|
+import org.apache.commons.configuration.CompositeConfiguration;
|
|
|
+import org.apache.mina.core.RuntimeIoException;
|
|
|
+import org.apache.mina.core.filterchain.IoFilter;
|
|
|
+import org.apache.mina.core.future.ConnectFuture;
|
|
|
+import org.apache.mina.core.future.WriteFuture;
|
|
|
+import org.apache.mina.core.service.IoConnector;
|
|
|
+import org.apache.mina.core.session.IdleStatus;
|
|
|
+import org.apache.mina.core.session.IoSession;
|
|
|
+import org.apache.mina.filter.logging.LogLevel;
|
|
|
+import org.apache.mina.transport.socket.nio.NioSocketConnector;
|
|
|
+import org.apache.mina.filter.codec.ProtocolCodecFilter;
|
|
|
+import org.apache.mina.filter.executor.ExecutorFilter;
|
|
|
+import org.apache.mina.filter.executor.UnorderedThreadPoolExecutor;
|
|
|
+
|
|
|
+import com.nokia.sms.sgip.RespCaller;
|
|
|
+import com.nokia.sms.sgip.SgipConfig;
|
|
|
+import com.nokia.sms.sgip.codec.MessageUtil;
|
|
|
+import com.nokia.sms.sgip.codec.SgipCodecFactory;
|
|
|
+import com.nokia.sms.sgip.filter.SGIPLoggingFilter;
|
|
|
+import com.nokia.sms.sgip.message.BindMessage;
|
|
|
+import com.nokia.sms.sgip.message.SubmitMessage;
|
|
|
+import com.nokia.sms.sgip.message.UnBindMessage;
|
|
|
+import com.nokia.sms.sgip.message.SendResult;
|
|
|
+
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+public class MTClient {
|
|
|
+ private IoConnector connector;
|
|
|
+ private CompositeConfiguration config = SgipConfig.getConfig();
|
|
|
+ private static MTClient sgipClient;
|
|
|
+ private static final SimpleDateFormat dateForamt = new SimpleDateFormat("yyMMddHHmmss032+");
|
|
|
+ private IoSession session;
|
|
|
+
|
|
|
+ public static synchronized MTClient getInstance() {
|
|
|
+ if (sgipClient == null) {
|
|
|
+ sgipClient = new MTClient();
|
|
|
+ }
|
|
|
+ return sgipClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ private MTClient() {
|
|
|
+ this.connector = new NioSocketConnector();
|
|
|
+ this.connector.setConnectTimeoutMillis(20000L);
|
|
|
+ this.connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE,
|
|
|
+ this.config.getInt("sgip_mt_client_max_idle_time", 50));
|
|
|
+ if (log.isDebugEnabled()) {
|
|
|
+ SGIPLoggingFilter logFilter = new SGIPLoggingFilter("MTClienter");
|
|
|
+
|
|
|
+ logFilter.setExceptionCaughtLogLevel(LogLevel.NONE);
|
|
|
+ logFilter.setSessionCreatedLogLevel(LogLevel.NONE);
|
|
|
+ logFilter.setSessionOpenedLogLevel(LogLevel.NONE);
|
|
|
+ logFilter.setSessionIdleLogLevel(LogLevel.NONE);
|
|
|
+ logFilter.setMessageSentLogLevel(LogLevel.NONE);
|
|
|
+ logFilter.setMessageReceivedLogLevel(LogLevel.DEBUG);
|
|
|
+ logFilter.setSessionClosedLogLevel(LogLevel.NONE);
|
|
|
+ this.connector.getFilterChain().addLast("MTClienter1", logFilter);
|
|
|
+ }
|
|
|
+ IoFilter coderFilter = new ProtocolCodecFilter(new SgipCodecFactory());
|
|
|
+ this.connector.getFilterChain().addLast("coderFilter", coderFilter);
|
|
|
+ this.connector.getFilterChain().addLast("logFilter2", new SGIPLoggingFilter("MTClienter"));
|
|
|
+
|
|
|
+ this.connector.setHandler(new MTClientHandler());
|
|
|
+ ThreadPoolExecutor executor = new UnorderedThreadPoolExecutor(4, 16);
|
|
|
+ this.connector.getFilterChain().addLast("exceutor", new ExecutorFilter(executor));
|
|
|
+
|
|
|
+ MessageUtil.setNodeId((int) this.config.getLong("sgip_mt_node_id"));
|
|
|
+ log.info("Inited!");
|
|
|
+ }
|
|
|
+
|
|
|
+ private synchronized IoSession getSession() {
|
|
|
+ byte connectResult = -1;
|
|
|
+ if ((this.session == null) || (!this.session.isConnected()) || (this.session.isClosing())) {
|
|
|
+ connectResult = connect();
|
|
|
+ if (connectResult != 0) {
|
|
|
+ if (this.session != null) {
|
|
|
+ this.session.closeOnFlush();
|
|
|
+ }
|
|
|
+ log.warn("Fail to connect!");
|
|
|
+ for (int i = 0; i < 2; i++) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(1000L);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ connectResult = connect();
|
|
|
+ if (connectResult == 0) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if ((connectResult != 0) &&
|
|
|
+ (this.session != null)) {
|
|
|
+ this.session.closeOnFlush();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return this.session;
|
|
|
+ }
|
|
|
+
|
|
|
+ private byte connect() {
|
|
|
+ byte result = -99;
|
|
|
+ String ip = this.config.getString("sgip_mt_server_ip", "127.0.0.1");
|
|
|
+ int port = this.config.getInt("sgip_mt_server_port", 9999);
|
|
|
+ InetSocketAddress address = new InetSocketAddress(ip, port);
|
|
|
+ boolean connectResult = false;
|
|
|
+ try {
|
|
|
+ ConnectFuture connectFuture = this.connector.connect(address);
|
|
|
+ connectResult = connectFuture.awaitUninterruptibly(20L, TimeUnit.SECONDS);
|
|
|
+ if (connectResult) {
|
|
|
+ this.session = connectFuture.getSession();
|
|
|
+
|
|
|
+ BindMessage bind = new BindMessage();
|
|
|
+ bind.setLoginType((byte) 1);
|
|
|
+ bind.setLoginName(SgipConfig.getConfig().getString("sgip_mt_login_name", ""));
|
|
|
+
|
|
|
+ bind.setLoginPassword(SgipConfig.getConfig().getString("sgip_mt_login_password", ""));
|
|
|
+
|
|
|
+ WriteFuture write = this.session.write(bind);
|
|
|
+ write.awaitUninterruptibly();
|
|
|
+ log.info("Write:" + bind);
|
|
|
+ RespCaller caller = new RespCaller();
|
|
|
+ FutureTask<Byte> task = new FutureTask<>(caller);
|
|
|
+ String flowId = bind.getHead().getStrFlowId();
|
|
|
+ this.session.setAttribute("Call" + flowId, caller);
|
|
|
+ this.session.setAttribute("Future" + flowId, task);
|
|
|
+ try {
|
|
|
+ result = ((Byte) task.get(SgipConfig.getConfig().getLong("submit_time_out", 20L), TimeUnit.SECONDS))
|
|
|
+ .byteValue();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ this.session.removeAttribute("Call" + flowId);
|
|
|
+ this.session.removeAttribute("Future" + flowId);
|
|
|
+ log.error("", e);
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ this.session.closeOnFlush();
|
|
|
+ }
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ this.session.removeAttribute("Call" + flowId);
|
|
|
+ this.session.removeAttribute("Future" + flowId);
|
|
|
+ log.error("", e);
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ this.session.closeOnFlush();
|
|
|
+ }
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ this.session.removeAttribute("Call" + flowId);
|
|
|
+ this.session.removeAttribute("Future" + flowId);
|
|
|
+ log.error("", e);
|
|
|
+ result = -2;
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ this.session.closeOnFlush();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ result = -1;
|
|
|
+ log.warn("Fail to connect:" + address.toString());
|
|
|
+ }
|
|
|
+ } catch (RuntimeIoException e) {
|
|
|
+ log.error(e + address.toString());
|
|
|
+ result = -3;
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ this.session.closeOnFlush();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ log.debug("result=" + result);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ public SendResult sendSubmitMsg(SubmitMessage message) {
|
|
|
+ byte result = -99;
|
|
|
+ IoSession session = getSession();
|
|
|
+ if (session == null) {
|
|
|
+ result = -13;
|
|
|
+ log.error("Fail to get session!");
|
|
|
+ } else {
|
|
|
+ session.write(message);
|
|
|
+ log.info("{}", message);
|
|
|
+ RespCaller caller = new RespCaller();
|
|
|
+ FutureTask<Byte> task = new FutureTask<>(caller);
|
|
|
+ String flowId = message.getHead().getStrFlowId();
|
|
|
+ session.setAttribute("Call" + flowId, caller);
|
|
|
+ session.setAttribute("Future" + flowId, task);
|
|
|
+ try {
|
|
|
+ result = ((Byte) task.get(SgipConfig.getConfig().getLong("submit_time_out", 20L), TimeUnit.SECONDS))
|
|
|
+ .byteValue();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ session.removeAttribute("Call" + flowId);
|
|
|
+ session.removeAttribute("Future" + flowId);
|
|
|
+ log.error("", e);
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ session.closeOnFlush();
|
|
|
+ }
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ session.removeAttribute("Call" + flowId);
|
|
|
+ session.removeAttribute("Future" + flowId);
|
|
|
+ log.error("", e);
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ session.closeOnFlush();
|
|
|
+ }
|
|
|
+ } catch (TimeoutException e) {
|
|
|
+ session.removeAttribute("Call" + flowId);
|
|
|
+ session.removeAttribute("Future" + flowId);
|
|
|
+ log.error("", e);
|
|
|
+ result = -2;
|
|
|
+ if (this.session.isConnected()) {
|
|
|
+ session.closeOnFlush();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ SendResult messageResult = new SendResult(message.getHead().getFlowId(), result);
|
|
|
+
|
|
|
+ log.info("{}", messageResult);
|
|
|
+ return messageResult;
|
|
|
+ }
|
|
|
+
|
|
|
+ public SendResult[] sendMessage(String userNumber, String content) {
|
|
|
+ return sendMessage(null, new String[] { userNumber }, content, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public SendResult[] sendMessage(String extendPort, String userNumber, String content, Date scheduleTime) {
|
|
|
+ return sendMessage(extendPort, new String[] { userNumber }, content, scheduleTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ public SendResult[] sendMessage(String extendPort, String[] userNumber, String content, Date scheduleTime) {
|
|
|
+ String signature = this.config.getString("sms_signature", null);
|
|
|
+ if ((signature != null) && (!"".equals(signature))) {
|
|
|
+ content = content + signature;
|
|
|
+ }
|
|
|
+ byte[] messageContent = null;
|
|
|
+ try {
|
|
|
+ messageContent = content.getBytes("UnicodeBigUnmarked");
|
|
|
+ } catch (UnsupportedEncodingException e) {
|
|
|
+ log.error("Fail to getByte by UnicodeBigUnmarked!" + e.getMessage());
|
|
|
+ }
|
|
|
+ String strScheduleTime = null;
|
|
|
+ if (scheduleTime != null) {
|
|
|
+ strScheduleTime = dateForamt.format(scheduleTime);
|
|
|
+ }
|
|
|
+ SubmitMessage message = new SubmitMessage();
|
|
|
+ initSumitMessage(message);
|
|
|
+ if (extendPort == null) {
|
|
|
+ message.setSpNumber(this.config.getString("sgip_mt_sp_number"));
|
|
|
+ } else {
|
|
|
+ message.setSpNumber(this.config.getString("sgip_mt_sp_number") + extendPort);
|
|
|
+ }
|
|
|
+ message.setUserCount((byte) userNumber.length);
|
|
|
+ message.setUserNumber(userNumber);
|
|
|
+ message.setScheduleTime(strScheduleTime);
|
|
|
+
|
|
|
+ SendResult[] messageResluts = null;
|
|
|
+ message.setMessageCoding((byte) 8);
|
|
|
+ if (messageContent.length <= 140) {
|
|
|
+ message.setTpudhi((byte) 0);
|
|
|
+ message.setMessageContent(messageContent);
|
|
|
+ messageResluts = new SendResult[] { sendSubmitMsg(message) };
|
|
|
+ } else {
|
|
|
+ message.setTpudhi((byte) 1);
|
|
|
+ byte[][] bytes = MessageUtil.subMessage(messageContent, 140);
|
|
|
+ messageResluts = new SendResult[bytes.length];
|
|
|
+ for (int i = 0; i < bytes.length; i++) {
|
|
|
+ message.setHead(null);
|
|
|
+ message.setMessageContent(bytes[i]);
|
|
|
+ messageResluts[i] = sendSubmitMsg(message);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return messageResluts;
|
|
|
+ }
|
|
|
+
|
|
|
+ public SendResult[] sendWapush(String extendPort, String userNumber, String url, String content,
|
|
|
+ Date scheduleTime) {
|
|
|
+ return sendWapush(extendPort, new String[] { userNumber }, url, content, scheduleTime);
|
|
|
+ }
|
|
|
+
|
|
|
+ public SendResult[] sendWapush(String extendPort, String[] userNumber, String url, String content,
|
|
|
+ Date scheduleTime) {
|
|
|
+ SubmitMessage message = new SubmitMessage();
|
|
|
+ initSumitMessage(message);
|
|
|
+ if (extendPort == null) {
|
|
|
+ message.setSpNumber(this.config.getString("sgip_mt_sp_number"));
|
|
|
+ } else {
|
|
|
+ message.setSpNumber(this.config.getString("sgip_mt_sp_number") + extendPort);
|
|
|
+ }
|
|
|
+ message.setUserCount((byte) userNumber.length);
|
|
|
+ message.setUserNumber(userNumber);
|
|
|
+ message.setMessageCoding((byte) 4);
|
|
|
+ message.setTpudhi((byte) 1);
|
|
|
+ String strScheduleTime = null;
|
|
|
+ if (scheduleTime != null) {
|
|
|
+ strScheduleTime = dateForamt.format(scheduleTime);
|
|
|
+ message.setScheduleTime(strScheduleTime);
|
|
|
+ }
|
|
|
+ SendResult[] messageResluts = null;
|
|
|
+ try {
|
|
|
+ byte[][] bytes = MessageUtil.buildWAPPush(content, url);
|
|
|
+ messageResluts = new SendResult[bytes.length];
|
|
|
+ for (int i = 0; i < bytes.length; i++) {
|
|
|
+ message.setHead(null);
|
|
|
+ message.setMessageContent(bytes[i]);
|
|
|
+ messageResluts[i] = sendSubmitMsg(message);
|
|
|
+ }
|
|
|
+ } catch (UnsupportedEncodingException e) {
|
|
|
+ log.error("Fail to getByte by UTF-8!" + e.getMessage());
|
|
|
+ }
|
|
|
+ return messageResluts;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initSumitMessage(SubmitMessage message) {
|
|
|
+ message.setChargeNumber(this.config.getString("sgip_mt_charge_number"));
|
|
|
+ message.setCorpId(this.config.getString("sgip_mt_corp_id"));
|
|
|
+ message.setServiceType(this.config.getString("sgip_mt_service_type"));
|
|
|
+ message.setFeeType(this.config.getByte("sgip_mt_fee_type"));
|
|
|
+ message.setFeeValue(this.config.getString("sgip_mt_fee_value"));
|
|
|
+ message.setGivenValue(this.config.getString("sgip_mt_given_value"));
|
|
|
+ message.setAgentFlag(this.config.getByte("sgip_mt_agen_flag"));
|
|
|
+ message.setMorelatetoMTFlag(this.config.getByte("sgip_mt_morelateto_mt_flag"));
|
|
|
+
|
|
|
+ message.setPriority(this.config.getByte("sgip_mt_priority"));
|
|
|
+ message.setReportFlag(this.config.getByte("sgip_mt_report_flag"));
|
|
|
+ message.setTppid(this.config.getByte("sgip_mt_tppid"));
|
|
|
+ message.setMessageType(this.config.getByte("sgip_mt_message_type"));
|
|
|
+ }
|
|
|
+
|
|
|
+ public void dispose() {
|
|
|
+ ExecutorFilter exceutor = (ExecutorFilter) this.connector.getFilterChain().get("exceutor");
|
|
|
+
|
|
|
+ this.connector.dispose();
|
|
|
+ try {
|
|
|
+ Thread.sleep(100L);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("", e);
|
|
|
+ }
|
|
|
+ if (exceutor != null) {
|
|
|
+ ThreadPoolExecutor service = (ThreadPoolExecutor) exceutor.getExecutor();
|
|
|
+
|
|
|
+ service.shutdownNow();
|
|
|
+ }
|
|
|
+ sgipClient = null;
|
|
|
+ log.info("Disposed!");
|
|
|
+ }
|
|
|
+
|
|
|
+ public void sendUnBindMessage() {
|
|
|
+ getSession().write(new UnBindMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ public static void main(String[] args)
|
|
|
+ throws UnsupportedEncodingException {
|
|
|
+ }
|
|
|
+}
|