package chat.server.moquette; import java.util.List; import org.apache.log4j.Logger; import chat.server.moquette.message.ClientID; import chat.server.moquette.message.MessageID; import chat.server.moquette.message.MqttMessage; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttSubAckMessage; import chat.server.moquette.message.MqttSubscribeMessage; import chat.server.moquette.message.MqttUnsubscribeMessage; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; public class MqttMessageLoggerHandler extends ChannelDuplexHandler { private static Logger logger; static { logger = Logger.getLogger(MqttMessageLoggerHandler.class); } @Override public void channelRead(ChannelHandlerContext ctx, Object message) { logMQTTMessage(ctx, message, "C->B"); ctx.fireChannelRead(message); } private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String direction) { if (!(message instanceof MqttMessage)) { return; } MqttMessage msg = (MqttMessage) message; ClientID clientID = ClientID.valueOf(ctx.channel()); MqttMessageType messageType = msg.fixedHeader().messageType(); switch (messageType) { case CONNECT: case CONNACK: case PINGREQ: case PINGRESP: case DISCONNECT: logger.info(direction + " " + messageType + " <" + clientID + ">"); break; case SUBSCRIBE: MqttSubscribeMessage subscribe = (MqttSubscribeMessage) msg; logger.info(direction + " SUBSCRIBE " + clientID + " to topics <" + subscribe.payload().topicSubscriptions() + ">"); break; case UNSUBSCRIBE: MqttUnsubscribeMessage unsubscribe = (MqttUnsubscribeMessage) msg; logger.info(direction + " UNSUBSCRIBE " + clientID + " to topics <" + unsubscribe.payload().topics() + ">"); break; case PUBLISH: MqttPublishMessage publish = (MqttPublishMessage) msg; logger.info(direction + " PUBLISH " + clientID + " to topics <" + publish.variableHeader().topicName() + ">"); break; case PUBREC: case PUBCOMP: case PUBREL: case PUBACK: case UNSUBACK: logger.info(direction + " " + messageType + " " + clientID + " packetID " + MessageID.valueOf(msg)); break; case SUBACK: MqttSubAckMessage suback = (MqttSubAckMessage) msg; List grantedQoSLevels = suback.payload().grantedQoSLevels(); logger.info(direction + " SUBACK " + clientID + " packetID " + MessageID.valueOf(msg) + ", grantedQoses " + grantedQoSLevels); break; } } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ClientID clientID = ClientID.valueOf(ctx.channel()); logger.info("Channel closed " + clientID); ctx.fireChannelInactive(); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { logMQTTMessage(ctx, msg, "C<-B"); ctx.write(msg, promise); } }