package chat.server.moquette;
|
|
import org.apache.log4j.Logger;
|
|
import chat.server.ServerInstance;
|
import chat.server.im.IMDispatcher;
|
import chat.server.moquette.message.ClientID;
|
import chat.server.moquette.message.MqttConnAckMessage;
|
import chat.server.moquette.message.MqttConnAckVariableHeader;
|
import chat.server.moquette.message.MqttConnectAckPayload;
|
import chat.server.moquette.message.MqttConnectMessage;
|
import chat.server.moquette.message.MqttConnectPayload;
|
import chat.server.moquette.message.MqttConnectReturnCode;
|
import chat.server.moquette.message.MqttConnectVariableHeader;
|
import chat.server.moquette.message.MqttFixedHeader;
|
import chat.server.moquette.message.MqttMessage;
|
import chat.server.moquette.message.MqttMessageType;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.server.moquette.message.MqttQoS;
|
import chat.user.Session;
|
import chat.user.SessionStore;
|
import cn.wildfirechat.proto.WFCMessage;
|
import cn.wildfirechat.proto.WFCMessage.ConnectAckPayload;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelInboundHandlerAdapter;
|
import io.netty.channel.ChannelPipeline;
|
import io.netty.handler.timeout.IdleStateHandler;
|
import io.netty.util.AttributeKey;
|
import io.netty.util.ReferenceCountUtil;
|
|
public class MqttDispatcher extends ChannelInboundHandlerAdapter {
|
|
private static AttributeKey<Object> ATTR_KEEPALIVE = AttributeKey.valueOf("keepAlive");
|
private static AttributeKey<Object> ATTR_CLEANSESSION = AttributeKey.valueOf("cleanSession");
|
private static AttributeKey<Object> ATTR_CLIENTID = AttributeKey.valueOf("ClientID");
|
|
private static Logger logger;
|
private static IMDispatcher imDispatcher;
|
|
static {
|
logger = Logger.getLogger(MqttDispatcher.class);
|
imDispatcher = new IMDispatcher();
|
}
|
|
public MqttDispatcher() {
|
|
}
|
|
@Override
|
public void channelRead(ChannelHandlerContext ctx, Object message) {
|
try {
|
MqttMessage msg = (MqttMessage) message;
|
MqttMessageType messageType = msg.messageType();
|
|
logger.info("Processing MQTT message, type=" + messageType);
|
|
Channel channel = ctx.channel();
|
|
switch (messageType) {
|
case CONNECT:
|
processConnect(channel, msg);
|
break;
|
case DISCONNECT:
|
processDisconnect(channel, msg);
|
break;
|
case PUBLISH:
|
MqttPublishMessage mess = (MqttPublishMessage) msg;
|
String topic = mess.variableHeader().topicName();
|
if (topic.equals("MP")) {
|
System.out.println("MP");
|
}
|
processPublish(channel, msg);
|
break;
|
case PINGREQ:
|
processPing(channel, msg);
|
break;
|
default:
|
logger.error("Unkonwn MessageType:" + messageType);
|
break;
|
}
|
}
|
catch (Throwable ex) {
|
logger.error("Exception was caught while processing MQTT message, " + ex.getCause(), ex);
|
ctx.fireExceptionCaught(ex);
|
ctx.close();
|
}
|
finally {
|
ReferenceCountUtil.release(message);
|
}
|
}
|
|
public void processConnect(Channel channel, MqttMessage msg) {
|
if (logger.isDebugEnabled()) {
|
logger.debug("receive connect(channel=)" + channel.id());
|
}
|
|
MqttConnectMessage message = (MqttConnectMessage) msg;
|
MqttConnectPayload payload = message.payload();
|
|
ClientID clientID = ClientID.valueOf(payload);
|
|
//1. if starting or terminated
|
if (!ServerInstance.isStart()) {
|
channel.close();
|
return;
|
}
|
|
//2. if empty client id
|
if (clientID.isEmpty()) {
|
MqttConnAckMessage badIdMessage = createConnAckMessageFalse(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, null);
|
channel.writeAndFlush(badIdMessage);
|
channel.close();
|
return;
|
}
|
|
//3. set channel attribute
|
MqttConnectVariableHeader header = message.variableHeader();
|
customizeChannel(channel, clientID, header);
|
|
//4. session
|
clientID.setValueTo(channel);
|
|
Session session = SessionStore.get(clientID);
|
|
if (session != null) {
|
session.setChannel(channel);
|
session.refreshLastActiveTime();
|
}
|
|
//5. write ACK message
|
ConnectAckPayload ackPayload = createConnAckPayload(session);
|
MqttConnAckMessage successMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED, ackPayload.toByteArray());
|
channel.writeAndFlush(successMessage);
|
|
//6. notify user state
|
// User user = session.getUser();
|
// user.notify(PublishedOperator.Online);
|
}
|
|
private ConnectAckPayload createConnAckPayload(Session session) {
|
long timestamp = System.currentTimeMillis();
|
long messageHead = timestamp; //session.getMessageHead();
|
long friendHead = timestamp;//session.getFriendHead();
|
long friendRqHead = timestamp;//session.getFriendRqHead();
|
long settingHead = timestamp;//session.getSettingHead();
|
|
ConnectAckPayload.Builder builder = WFCMessage.ConnectAckPayload.newBuilder();
|
builder.setMsgHead(messageHead);
|
builder.setFriendHead(friendHead);
|
builder.setFriendRqHead(friendRqHead);
|
builder.setSettingHead(settingHead);
|
builder.setServerTime(System.currentTimeMillis());
|
|
ConnectAckPayload payload = builder.build();
|
return payload;
|
}
|
|
private MqttConnAckMessage createConnAckMessage(MqttConnectReturnCode code, byte[] data) {
|
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(code, true);
|
|
MqttConnAckMessage result = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader, new MqttConnectAckPayload(data));
|
return result;
|
}
|
|
private MqttConnAckMessage createConnAckMessageFalse(MqttConnectReturnCode code, byte[] data) {
|
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(code, false);
|
|
MqttConnAckMessage result = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader, new MqttConnectAckPayload(data));
|
return result;
|
}
|
|
private void customizeChannel(Channel channel, ClientID clientID, MqttConnectVariableHeader header) {
|
//1. set clientID
|
channel.attr(ATTR_CLIENTID).set(clientID.getValue());
|
|
//2. live time
|
int keepAliveTimeSeconds = header.keepAliveTimeSeconds();
|
channel.attr(ATTR_KEEPALIVE).set(keepAliveTimeSeconds);
|
|
//3. clean session
|
boolean cleanSession = header.isCleanSession();
|
channel.attr(ATTR_CLEANSESSION).set(cleanSession);
|
|
//4. idle time
|
ChannelPipeline pipeline = channel.pipeline();
|
|
if (pipeline.names().contains("idleStateHandler")) {
|
pipeline.remove("idleStateHandler");
|
}
|
|
int idleTime = Math.round(keepAliveTimeSeconds * 1.5f);
|
pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0));
|
}
|
|
public void processDisconnect(Channel channel, MqttMessage msg) throws InterruptedException {
|
if (logger.isDebugEnabled()) {
|
logger.debug("receive disconnect(channel=)" + channel.id());
|
}
|
|
ClientID clientID = ClientID.valueOf(channel);
|
|
//1. flush
|
channel.flush();
|
|
//2. if empty
|
if (clientID.isEmpty()) {
|
channel.close();
|
return;
|
}
|
|
//3. close session channel
|
Session session = SessionStore.get(clientID);
|
|
if (session == null) {
|
return;
|
}
|
|
session.refreshLastActiveTime();
|
session.closeChannel();
|
|
//4. delete session
|
SessionStore.delete(clientID);
|
}
|
|
public void processPublish(Channel channel, MqttMessage message) throws Exception {
|
if (logger.isDebugEnabled()) {
|
logger.debug("receive request(channel=)" + channel.id() + ": publish");
|
}
|
|
MqttPublishMessage publishMessage = (MqttPublishMessage) message;
|
imDispatcher.channelRead0(channel, publishMessage);
|
}
|
|
private void processPing(Channel channel, MqttMessage message) {
|
if (logger.isDebugEnabled()) {
|
logger.debug("receive ping(channel=)" + channel.id());
|
}
|
|
MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
MqttMessage result = new MqttMessage(pingHeader);
|
channel.writeAndFlush(result);
|
}
|
|
|
|
}
|