hefeixia
2021-02-18 5b8c95c760840f09910730943b21391e47187315
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package chat.server.moquette;
 
import java.util.List;
 
import org.apache.log4j.Logger;
 
import chat.server.moquette.message.ClientID;
import chat.server.moquette.message.MessageID;
import chat.server.moquette.message.MqttMessage;
import chat.server.moquette.message.MqttMessageType;
import chat.server.moquette.message.MqttPublishMessage;
import chat.server.moquette.message.MqttSubAckMessage;
import chat.server.moquette.message.MqttSubscribeMessage;
import chat.server.moquette.message.MqttUnsubscribeMessage;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
 
public class MqttMessageLoggerHandler extends ChannelDuplexHandler {
 
    private static Logger logger;
 
    static {
        logger = Logger.getLogger(MqttMessageLoggerHandler.class);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object message) {
        logMQTTMessage(ctx, message, "C->B");
        ctx.fireChannelRead(message);
    }
 
    private void logMQTTMessage(ChannelHandlerContext ctx, Object message, String direction) {
        if (!(message instanceof MqttMessage)) {
            return;
        }
        MqttMessage msg = (MqttMessage) message;
        ClientID clientID = ClientID.valueOf(ctx.channel());
        MqttMessageType messageType = msg.fixedHeader().messageType();
        
        switch (messageType) {
            case CONNECT:
            case CONNACK:
            case PINGREQ:
            case PINGRESP:
            case DISCONNECT:
                logger.info(direction + " " + messageType + " <" + clientID + ">");
                break;
            case SUBSCRIBE:
                MqttSubscribeMessage subscribe = (MqttSubscribeMessage) msg;
                logger.info(direction + " SUBSCRIBE " + clientID + " to topics <" + subscribe.payload().topicSubscriptions() + ">");
                break;
            case UNSUBSCRIBE:
                MqttUnsubscribeMessage unsubscribe = (MqttUnsubscribeMessage) msg;
                logger.info(direction + " UNSUBSCRIBE " + clientID + " to topics <" + unsubscribe.payload().topics() + ">");
                break;
            case PUBLISH:
                MqttPublishMessage publish = (MqttPublishMessage) msg;
                logger.info(direction + " PUBLISH " + clientID + " to topics <" + publish.variableHeader().topicName() + ">");
                break;
            case PUBREC:
            case PUBCOMP:
            case PUBREL:
            case PUBACK:
            case UNSUBACK:
                logger.info(direction + " " + messageType + " " + clientID + " packetID " + MessageID.valueOf(msg));
                break;
            case SUBACK:
                MqttSubAckMessage suback = (MqttSubAckMessage) msg;
                List<Integer> grantedQoSLevels = suback.payload().grantedQoSLevels();
 
                logger.info(direction + " SUBACK " + clientID + " packetID " + MessageID.valueOf(msg) + ", grantedQoses " + grantedQoSLevels);
                break;
        }
    }
 
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ClientID clientID = ClientID.valueOf(ctx.channel());
        logger.info("Channel closed " + clientID);
        
        ctx.fireChannelInactive();
    }
 
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        logMQTTMessage(ctx, msg, "C<-B");
        ctx.write(msg, promise);
    }
}