package chat.server.im;
|
|
import org.apache.log4j.Logger;
|
|
import chat.message.Callback;
|
import chat.server.call.CallObject;
|
import chat.server.call.CallObjectMap;
|
import chat.server.call.Operator;
|
import chat.server.call.ResultCode;
|
import chat.server.moquette.message.ClientID;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.user.Session;
|
import chat.user.SessionStore;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.SimpleChannelInboundHandler;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
|
public class IMDispatcher extends SimpleChannelInboundHandler<FullHttpRequest> {
|
|
protected static Logger logger;
|
|
|
static {
|
logger = Logger.getLogger(IMDispatcher.class);
|
}
|
|
public IMDispatcher() {
|
|
}
|
|
//read short connection
|
public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception {
|
//1. session
|
ClientID clientID = ClientID.valueOf(httpRequest);
|
Session session = SessionStore.get(clientID);
|
//2. data pool
|
DataPool dataPool = new DataPool(RequestType.Short, session, httpRequest);
|
|
//3. operator, dataPool, resultPool
|
String uri = dataPool.getRequestURI();
|
String topic = dataPool.getRequestTopic();
|
|
Operator operator = Operator.getInstance(uri, topic);
|
int messageId = dataPool.getMessageId();
|
String secret = session != null ? session.getSecret() : "";
|
|
ResultPool resultPool = new ResultPool(RequestType.Short, topic, messageId, secret);
|
|
try {
|
//4. execute
|
if (operator != null) {
|
if (logger.isDebugEnabled()) {
|
logger.debug("receive request(short): " + httpRequest.uri() + ", method: " + operator.getMethod() + ", topic: " + topic);
|
}
|
|
CallObject callObject = CallObjectMap.get(operator);
|
|
if (callObject == null) {
|
logger.error("unknown request(short) path: " + uri);
|
resultPool.error(ResultCode.Error_Path_NotExists);
|
resultPool.add("error", "Path_NotExists" + uri);
|
}
|
|
System.out.println(operator.getTopic());
|
callObject.exec(session, operator, dataPool, resultPool);
|
|
if (logger.isDebugEnabled()) {
|
logger.debug("return " + operator.getMethod() + " short(" + topic + ")");
|
}
|
}
|
else {
|
logger.error("unknown request(short) path: " + uri + ", topic: " + topic);
|
resultPool.error(ResultCode.Error_Path_NotExists);
|
resultPool.add("error", "Path_NotExists" + uri);
|
}
|
}
|
finally {
|
//5. write back
|
Object data = resultPool.getResultData();
|
ctx.writeAndFlush(data);
|
}
|
|
//6. callback
|
Callback callback = resultPool.getCallback();
|
|
if (callback != null) {
|
callback.exec();
|
}
|
}
|
|
//read long connection
|
public void channelRead0(Channel channel, MqttPublishMessage message) throws Exception {
|
ClientID clientID = ClientID.valueOf(channel);
|
Session session = SessionStore.get(clientID);
|
|
if (session == null) {
|
return;
|
}
|
|
System.out.println("long:" + channel);
|
if (session != null) {
|
session.setChannel(channel);
|
session.refreshLastActiveTime();
|
}
|
|
String topic = message.variableHeader().topicName();
|
System.out.println("Topic===============>:" + topic);
|
|
Operator operator = Operator.getInstance(null, topic);
|
|
if (operator == null) {
|
logger.error("unknown publish(long): " + topic);
|
return;
|
}
|
|
if (logger.isDebugEnabled()) {
|
logger.debug("receive request(long): method: " + operator.getMethod() + ", topic: " + topic);
|
}
|
|
//2. do dispatch
|
DataPool dataPool = new DataPool(RequestType.Long, session, message);
|
|
int messageId = message.variableHeader().packetId();//dataPool.getMessageId();
|
String secret = session.getSecret();
|
ResultPool resultPool = new ResultPool(RequestType.Long, topic, messageId, secret);
|
|
//3. execute
|
CallObject callObject = CallObjectMap.get(operator);
|
|
if (callObject == null) {
|
logger.error("unknown request(long): " + topic);
|
return;
|
}
|
|
callObject.exec(session, operator, dataPool, resultPool);
|
|
//4. write back
|
Object data = resultPool.getResultData();
|
|
System.out.println("send message:" + channel);
|
|
channel.writeAndFlush(data);
|
|
if (logger.isDebugEnabled()) {
|
logger.debug("return " + operator.getMethod() + " long(" + topic + ")");
|
}
|
|
//5. callback
|
Callback callback = resultPool.getCallback();
|
|
if (callback != null) {
|
callback.exec();
|
}
|
}
|
|
}
|