hefeixia
2021-02-18 5b8c95c760840f09910730943b21391e47187315
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package chat.message;
 
import static chat.consts.ProtoConstants.PersistFlag.Transparent;
 
import java.util.Collection;
import java.util.List;
 
import chat.consts.ProtoConstants;
import chat.module.PrivateFriendBucket;
import chat.module.entity.MessageContainer;
import chat.module.entity.PrivateFriend;
import chat.server.im.IMTopic;
import chat.server.moquette.message.MqttFixedHeader;
import chat.server.moquette.message.MqttMessageType;
import chat.server.moquette.message.MqttPublishMessage;
import chat.server.moquette.message.MqttPublishVariableHeader;
import chat.server.moquette.message.MqttQoS;
import chat.user.Client;
import chat.user.Session;
import chat.user.UserStore;
import chat.util.MessageShardingUtil;
import cn.wildfirechat.proto.WFCMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
 
public class MessagesPublisher {
 
    public void publish2Receivers(WFCMessage.Message messageT, Collection<String> receivers, int pullType) {
        String sender = messageT.getFromUser();
        int conversationType = messageT.getConversation().getType();
        String target = messageT.getConversation().getTarget();
        int line = messageT.getConversation().getLine();
        long messageId = messageT.getMessageId();
        int messageContentType = messageT.getContent().getType();
        MessageContainer messageContainer;
        
        String pushContent;
        if (messageT.getContent().getPersistFlag() == Transparent) {
            pushContent = null;
        } else {
            if (messageContentType == ProtoConstants.ContentType.Text) {
                pushContent = messageT.getContent().getPushContent();
            }
        }
 
        String pushData = messageT.getContent().getPushData();
        
        long serverTime = messageT.getServerTimestamp();
        int mentionType = messageT.getContent().getMentionedType();
        List<String> mentionTargets = messageT.getContent().getMentionedTargetList();
        int persistFlag = messageT.getContent().getPersistFlag();
        WFCMessage.Message message = null;
        
        if (persistFlag == Transparent) {
            publishTransparentMessage2Receivers(messageT, receivers, pullType);
            return;
        }
        
        try {
            for (String usr : receivers) {
                 if (usr.equals(sender)) {
                     continue;
                 }
                 
                 long messageSeq = MessageShardingUtil.generateId();
                 
                 //根据sender、targetId 定位ChatSpaceType, 保存message数据
                 if(messageT.getConversation().getType() == 0) {
                    //PrivateFriendBucket.getInstance().saveMessage(messageT, messageSeq);
                 }
                 //群组发消息
                 else if (messageT.getConversation().getType() == 1) {
                 }
                 
                 if (!usr.equals(sender)) {
                     WFCMessage.NotifyMessage notifyMessage = WFCMessage.NotifyMessage
                                                             .newBuilder()
                                                             .setType(pullType)
                                                             .setHead(messageSeq)
                                                             .build();
                      ByteBuf payload = Unpooled.buffer();
                     byte[] byteData = notifyMessage.toByteArray();
                     
                     payload.ensureWritable(byteData.length).writeBytes(byteData);
                     MqttPublishMessage publishMsg;
                     publishMsg = notRetainedPublish(IMTopic.NotifyMessageTopic, MqttQoS.AT_MOST_ONCE, payload);
                     
                     Client client = UserStore.getById(usr).getClient(usr);
                     
                     Session sessionC = client.getSession();
                     if (sessionC != null) {
                          Channel channel = sessionC.getChannel();
                          if (publishMsg != null) {
                               channel.writeAndFlush(publishMsg);
                          }
                     }
                 }
            }
        } catch(Exception e) {
            e.printStackTrace();
        }        
    }
    
    private void publishTransparentMessage2Receivers(WFCMessage.Message message, Collection<String> receivers, int pullType) {
        for (String user : receivers) {
             ByteBuf payload = Unpooled.buffer();
             byte[] byteData = message.toByteArray();
             payload.ensureWritable(byteData.length).writeBytes(byteData);
             MqttPublishMessage publishMsg;
             publishMsg = notRetainedPublish(IMTopic.SendMessageTopic, MqttQoS.AT_MOST_ONCE, payload);
             
             Client client = UserStore.getById(user).getClient(user);
             
              Session sessionC = client.getSession();
              if (sessionC != null) {
                  Channel channel = sessionC.getChannel();
                  if (publishMsg != null) {
                      channel.writeAndFlush(publishMsg);
                  }
              }             
        }
    }
    
    private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
        return notRetainedPublishWithMessageId(topic, qos, message, 0);
    }
 
    private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
        MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
        return new MqttPublishMessage(fixedHeader, varHeader, message);
    }    
}