package chat.server.moquette; import org.apache.log4j.Logger; import chat.server.ServerInstance; import chat.server.im.IMDispatcher; import chat.server.moquette.message.ClientID; import chat.server.moquette.message.MqttConnAckMessage; import chat.server.moquette.message.MqttConnAckVariableHeader; import chat.server.moquette.message.MqttConnectAckPayload; import chat.server.moquette.message.MqttConnectMessage; import chat.server.moquette.message.MqttConnectPayload; import chat.server.moquette.message.MqttConnectReturnCode; import chat.server.moquette.message.MqttConnectVariableHeader; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessage; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttQoS; import chat.user.Session; import chat.user.SessionStore; import cn.wildfirechat.proto.WFCMessage; import cn.wildfirechat.proto.WFCMessage.ConnectAckPayload; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelPipeline; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.AttributeKey; import io.netty.util.ReferenceCountUtil; public class MqttDispatcher extends ChannelInboundHandlerAdapter { private static AttributeKey ATTR_KEEPALIVE = AttributeKey.valueOf("keepAlive"); private static AttributeKey ATTR_CLEANSESSION = AttributeKey.valueOf("cleanSession"); private static AttributeKey ATTR_CLIENTID = AttributeKey.valueOf("ClientID"); private static Logger logger; private static IMDispatcher imDispatcher; static { logger = Logger.getLogger(MqttDispatcher.class); imDispatcher = new IMDispatcher(); } public MqttDispatcher() { } @Override public void channelRead(ChannelHandlerContext ctx, Object message) { try { MqttMessage msg = (MqttMessage) message; MqttMessageType messageType = msg.messageType(); logger.info("Processing MQTT message, type=" + messageType); Channel channel = ctx.channel(); switch (messageType) { case CONNECT: processConnect(channel, msg); break; case DISCONNECT: processDisconnect(channel, msg); break; case PUBLISH: MqttPublishMessage mess = (MqttPublishMessage) msg; String topic = mess.variableHeader().topicName(); if (topic.equals("MP")) { System.out.println("MP"); } processPublish(channel, msg); break; case PINGREQ: processPing(channel, msg); break; default: logger.error("Unkonwn MessageType:" + messageType); break; } } catch (Throwable ex) { logger.error("Exception was caught while processing MQTT message, " + ex.getCause(), ex); ctx.fireExceptionCaught(ex); ctx.close(); } finally { ReferenceCountUtil.release(message); } } public void processConnect(Channel channel, MqttMessage msg) { if (logger.isDebugEnabled()) { logger.debug("receive connect(channel=)" + channel.id()); } MqttConnectMessage message = (MqttConnectMessage) msg; MqttConnectPayload payload = message.payload(); ClientID clientID = ClientID.valueOf(payload); //1. if starting or terminated if (!ServerInstance.isStart()) { channel.close(); return; } //2. if empty client id if (clientID.isEmpty()) { MqttConnAckMessage badIdMessage = createConnAckMessageFalse(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED, null); channel.writeAndFlush(badIdMessage); channel.close(); return; } //3. set channel attribute MqttConnectVariableHeader header = message.variableHeader(); customizeChannel(channel, clientID, header); //4. session clientID.setValueTo(channel); Session session = SessionStore.get(clientID); if (session != null) { session.setChannel(channel); session.refreshLastActiveTime(); } //5. write ACK message ConnectAckPayload ackPayload = createConnAckPayload(session); MqttConnAckMessage successMessage = createConnAckMessage(MqttConnectReturnCode.CONNECTION_ACCEPTED, ackPayload.toByteArray()); channel.writeAndFlush(successMessage); //6. notify user state // User user = session.getUser(); // user.notify(PublishedOperator.Online); } private ConnectAckPayload createConnAckPayload(Session session) { long timestamp = System.currentTimeMillis(); long messageHead = timestamp; //session.getMessageHead(); long friendHead = timestamp;//session.getFriendHead(); long friendRqHead = timestamp;//session.getFriendRqHead(); long settingHead = timestamp;//session.getSettingHead(); ConnectAckPayload.Builder builder = WFCMessage.ConnectAckPayload.newBuilder(); builder.setMsgHead(messageHead); builder.setFriendHead(friendHead); builder.setFriendRqHead(friendRqHead); builder.setSettingHead(settingHead); builder.setServerTime(System.currentTimeMillis()); ConnectAckPayload payload = builder.build(); return payload; } private MqttConnAckMessage createConnAckMessage(MqttConnectReturnCode code, byte[] data) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(code, true); MqttConnAckMessage result = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader, new MqttConnectAckPayload(data)); return result; } private MqttConnAckMessage createConnAckMessageFalse(MqttConnectReturnCode code, byte[] data) { MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(code, false); MqttConnAckMessage result = new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader, new MqttConnectAckPayload(data)); return result; } private void customizeChannel(Channel channel, ClientID clientID, MqttConnectVariableHeader header) { //1. set clientID channel.attr(ATTR_CLIENTID).set(clientID.getValue()); //2. live time int keepAliveTimeSeconds = header.keepAliveTimeSeconds(); channel.attr(ATTR_KEEPALIVE).set(keepAliveTimeSeconds); //3. clean session boolean cleanSession = header.isCleanSession(); channel.attr(ATTR_CLEANSESSION).set(cleanSession); //4. idle time ChannelPipeline pipeline = channel.pipeline(); if (pipeline.names().contains("idleStateHandler")) { pipeline.remove("idleStateHandler"); } int idleTime = Math.round(keepAliveTimeSeconds * 1.5f); pipeline.addFirst("idleStateHandler", new IdleStateHandler(idleTime, 0, 0)); } public void processDisconnect(Channel channel, MqttMessage msg) throws InterruptedException { if (logger.isDebugEnabled()) { logger.debug("receive disconnect(channel=)" + channel.id()); } ClientID clientID = ClientID.valueOf(channel); //1. flush channel.flush(); //2. if empty if (clientID.isEmpty()) { channel.close(); return; } //3. close session channel Session session = SessionStore.get(clientID); if (session == null) { return; } session.refreshLastActiveTime(); session.closeChannel(); //4. delete session SessionStore.delete(clientID); } public void processPublish(Channel channel, MqttMessage message) throws Exception { if (logger.isDebugEnabled()) { logger.debug("receive request(channel=)" + channel.id() + ": publish"); } MqttPublishMessage publishMessage = (MqttPublishMessage) message; imDispatcher.channelRead0(channel, publishMessage); } private void processPing(Channel channel, MqttMessage message) { if (logger.isDebugEnabled()) { logger.debug("receive ping(channel=)" + channel.id()); } MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttMessage result = new MqttMessage(pingHeader); channel.writeAndFlush(result); } }