package chat.server.im; import org.apache.log4j.Logger; import chat.message.Callback; import chat.server.call.CallObject; import chat.server.call.CallObjectMap; import chat.server.call.Operator; import chat.server.call.ResultCode; import chat.server.moquette.message.ClientID; import chat.server.moquette.message.MqttPublishMessage; import chat.user.Session; import chat.user.SessionStore; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; public class IMDispatcher extends SimpleChannelInboundHandler { protected static Logger logger; static { logger = Logger.getLogger(IMDispatcher.class); } public IMDispatcher() { } //read short connection public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception { //1. session ClientID clientID = ClientID.valueOf(httpRequest); Session session = SessionStore.get(clientID); //2. data pool DataPool dataPool = new DataPool(RequestType.Short, session, httpRequest); //3. operator, dataPool, resultPool String uri = dataPool.getRequestURI(); String topic = dataPool.getRequestTopic(); Operator operator = Operator.getInstance(uri, topic); int messageId = dataPool.getMessageId(); String secret = session != null ? session.getSecret() : ""; ResultPool resultPool = new ResultPool(RequestType.Short, topic, messageId, secret); try { //4. execute if (operator != null) { if (logger.isDebugEnabled()) { logger.debug("receive request(short): " + httpRequest.uri() + ", method: " + operator.getMethod() + ", topic: " + topic); } CallObject callObject = CallObjectMap.get(operator); if (callObject == null) { logger.error("unknown request(short) path: " + uri); resultPool.error(ResultCode.Error_Path_NotExists); resultPool.add("error", "Path_NotExists" + uri); } System.out.println(operator.getTopic()); callObject.exec(session, operator, dataPool, resultPool); if (logger.isDebugEnabled()) { logger.debug("return " + operator.getMethod() + " short(" + topic + ")"); } } else { logger.error("unknown request(short) path: " + uri + ", topic: " + topic); resultPool.error(ResultCode.Error_Path_NotExists); resultPool.add("error", "Path_NotExists" + uri); } } finally { //5. write back Object data = resultPool.getResultData(); ctx.writeAndFlush(data); } //6. callback Callback callback = resultPool.getCallback(); if (callback != null) { callback.exec(); } } //read long connection public void channelRead0(Channel channel, MqttPublishMessage message) throws Exception { ClientID clientID = ClientID.valueOf(channel); Session session = SessionStore.get(clientID); if (session == null) { return; } System.out.println("long:" + channel); if (session != null) { session.setChannel(channel); session.refreshLastActiveTime(); } String topic = message.variableHeader().topicName(); System.out.println("Topic===============>:" + topic); Operator operator = Operator.getInstance(null, topic); if (operator == null) { logger.error("unknown publish(long): " + topic); return; } if (logger.isDebugEnabled()) { logger.debug("receive request(long): method: " + operator.getMethod() + ", topic: " + topic); } //2. do dispatch DataPool dataPool = new DataPool(RequestType.Long, session, message); int messageId = message.variableHeader().packetId();//dataPool.getMessageId(); String secret = session.getSecret(); ResultPool resultPool = new ResultPool(RequestType.Long, topic, messageId, secret); //3. execute CallObject callObject = CallObjectMap.get(operator); if (callObject == null) { logger.error("unknown request(long): " + topic); return; } callObject.exec(session, operator, dataPool, resultPool); //4. write back Object data = resultPool.getResultData(); System.out.println("send message:" + channel); channel.writeAndFlush(data); if (logger.isDebugEnabled()) { logger.debug("return " + operator.getMethod() + " long(" + topic + ")"); } //5. callback Callback callback = resultPool.getCallback(); if (callback != null) { callback.exec(); } } }