IOS
hefeixia
2021-02-18 49f3c1374873f73dbde2983ca0fcf1fb10bfedbf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package chat.server.im;
 
import org.apache.log4j.Logger;
 
import chat.message.Callback;
import chat.server.call.CallObject;
import chat.server.call.CallObjectMap;
import chat.server.call.Operator;
import chat.server.call.ResultCode;
import chat.server.moquette.message.ClientID;
import chat.server.moquette.message.MqttPublishMessage;
import chat.user.Session;
import chat.user.SessionStore;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
 
public class IMDispatcher extends SimpleChannelInboundHandler<FullHttpRequest> {
    
    protected static Logger logger;
 
    
    static {
        logger = Logger.getLogger(IMDispatcher.class);
    }
    
    public IMDispatcher() {
 
    }
    
    //read short connection
    public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest httpRequest) throws Exception {
        //1. session
        ClientID clientID = ClientID.valueOf(httpRequest);
        Session session = SessionStore.get(clientID);
        //2. data pool
        DataPool dataPool = new DataPool(RequestType.Short, session, httpRequest);
        
        //3. operator, dataPool, resultPool
        String uri = dataPool.getRequestURI();
        String topic = dataPool.getRequestTopic();
        
        Operator operator = Operator.getInstance(uri, topic);
        int messageId = dataPool.getMessageId();
        String secret = session != null ? session.getSecret() : "";
        
        ResultPool resultPool = new ResultPool(RequestType.Short, topic, messageId, secret);
        
        try {
            //4. execute 
            if (operator != null) {
                if (logger.isDebugEnabled()) {
                    logger.debug("receive request(short): " + httpRequest.uri() + ", method: " + operator.getMethod() + ", topic: " + topic);
                }
                
                CallObject callObject = CallObjectMap.get(operator);
                
                if (callObject == null) {
                    logger.error("unknown request(short) path: " + uri);
                    resultPool.error(ResultCode.Error_Path_NotExists);
                    resultPool.add("error", "Path_NotExists" + uri);
                }
                
                System.out.println(operator.getTopic());
                callObject.exec(session, operator, dataPool, resultPool);
                
                if (logger.isDebugEnabled()) {
                    logger.debug("return " + operator.getMethod() + " short(" + topic + ")");
                }
            }
            else {
                logger.error("unknown request(short) path: " + uri + ", topic: " + topic);
                resultPool.error(ResultCode.Error_Path_NotExists);
                resultPool.add("error", "Path_NotExists" + uri);
            }
        }
        finally {
            //5. write back
            Object data = resultPool.getResultData();
            ctx.writeAndFlush(data);
        }
        
        //6. callback
        Callback callback = resultPool.getCallback();
        
        if (callback != null) {
            callback.exec();
        }
    }
    
    //read long connection
    public void channelRead0(Channel channel, MqttPublishMessage message) throws Exception {
        ClientID clientID = ClientID.valueOf(channel);
        Session session = SessionStore.get(clientID);
        
        if (session == null) {
            return;
        }
        
        System.out.println("long:" + channel);
        if (session != null) {
            session.setChannel(channel);
            session.refreshLastActiveTime();
        }        
        
        String topic = message.variableHeader().topicName();
        System.out.println("Topic===============>:" + topic);
        
        Operator operator = Operator.getInstance(null, topic);
        
        if (operator == null) {
            logger.error("unknown publish(long): " + topic);
            return;
        }
        
        if (logger.isDebugEnabled()) {
            logger.debug("receive request(long): method: " + operator.getMethod() + ", topic: " + topic);
        }
        
        //2. do dispatch
        DataPool dataPool = new DataPool(RequestType.Long, session, message);
        
        int messageId = message.variableHeader().packetId();//dataPool.getMessageId();
        String secret = session.getSecret();
        ResultPool resultPool = new ResultPool(RequestType.Long, topic, messageId, secret);
        
        //3. execute
        CallObject callObject = CallObjectMap.get(operator);
        
        if (callObject == null) {
            logger.error("unknown request(long): " + topic);
            return;
        }
        
        callObject.exec(session, operator, dataPool, resultPool);
        
        //4. write back
        Object data = resultPool.getResultData();
        
        System.out.println("send message:" + channel);
        
        channel.writeAndFlush(data);
        
        if (logger.isDebugEnabled()) {
            logger.debug("return " + operator.getMethod() + " long(" + topic + ")");
        }
        
        //5. callback
        Callback callback = resultPool.getCallback();
        
        if (callback != null) {
            callback.exec();
        }
    }
 
}