package chat.server.moquette;
|
|
import java.util.List;
|
|
import org.apache.log4j.Logger;
|
|
import chat.server.moquette.message.ClientID;
|
import chat.server.moquette.message.MessageID;
|
import chat.server.moquette.message.MqttMessage;
|
import chat.server.moquette.message.MqttMessageType;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.server.moquette.message.MqttSubAckMessage;
|
import chat.server.moquette.message.MqttSubscribeMessage;
|
import chat.server.moquette.message.MqttUnsubscribeMessage;
|
import io.netty.channel.ChannelDuplexHandler;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.channel.ChannelPromise;
|
|
public class MqttMessageLoggerHandler extends ChannelDuplexHandler {
|
|
private static Logger logger;
|
|
static {
|
logger = Logger.getLogger(MqttMessageLoggerHandler.class);
|
}
|
|
@Override
|
public void channelRead(ChannelHandlerContext ctx, Object message) {
|
logMQTTMessage(ctx, message, "C->B");
|
ctx.fireChannelRead(message);
|
}
|
|
private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String direction) {
|
if (!(message instanceof MqttMessage)) {
|
return;
|
}
|
MqttMessage msg = (MqttMessage) message;
|
ClientID clientID = ClientID.valueOf(ctx.channel());
|
MqttMessageType messageType = msg.fixedHeader().messageType();
|
|
switch (messageType) {
|
case CONNECT:
|
case CONNACK:
|
case PINGREQ:
|
case PINGRESP:
|
case DISCONNECT:
|
logger.info(direction + " " + messageType + " <" + clientID + ">");
|
break;
|
case SUBSCRIBE:
|
MqttSubscribeMessage subscribe = (MqttSubscribeMessage) msg;
|
logger.info(direction + " SUBSCRIBE " + clientID + " to topics <" + subscribe.payload().topicSubscriptions() + ">");
|
break;
|
case UNSUBSCRIBE:
|
MqttUnsubscribeMessage unsubscribe = (MqttUnsubscribeMessage) msg;
|
logger.info(direction + " UNSUBSCRIBE " + clientID + " to topics <" + unsubscribe.payload().topics() + ">");
|
break;
|
case PUBLISH:
|
MqttPublishMessage publish = (MqttPublishMessage) msg;
|
logger.info(direction + " PUBLISH " + clientID + " to topics <" + publish.variableHeader().topicName() + ">");
|
break;
|
case PUBREC:
|
case PUBCOMP:
|
case PUBREL:
|
case PUBACK:
|
case UNSUBACK:
|
logger.info(direction + " " + messageType + " " + clientID + " packetID " + MessageID.valueOf(msg));
|
break;
|
case SUBACK:
|
MqttSubAckMessage suback = (MqttSubAckMessage) msg;
|
List<Integer> grantedQoSLevels = suback.payload().grantedQoSLevels();
|
|
logger.info(direction + " SUBACK " + clientID + " packetID " + MessageID.valueOf(msg) + ", grantedQoses " + grantedQoSLevels);
|
break;
|
}
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
ClientID clientID = ClientID.valueOf(ctx.channel());
|
logger.info("Channel closed " + clientID);
|
|
ctx.fireChannelInactive();
|
}
|
|
@Override
|
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
|
logMQTTMessage(ctx, msg, "C<-B");
|
ctx.write(msg, promise);
|
}
|
}
|