package chat.message;
|
|
import static chat.consts.ProtoConstants.PersistFlag.Transparent;
|
|
import java.util.Collection;
|
import java.util.List;
|
|
import chat.consts.ProtoConstants;
|
import chat.module.PrivateFriendBucket;
|
import chat.module.entity.MessageContainer;
|
import chat.module.entity.PrivateFriend;
|
import chat.server.im.IMTopic;
|
import chat.server.moquette.message.MqttFixedHeader;
|
import chat.server.moquette.message.MqttMessageType;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.server.moquette.message.MqttPublishVariableHeader;
|
import chat.server.moquette.message.MqttQoS;
|
import chat.user.Client;
|
import chat.user.Session;
|
import chat.user.UserStore;
|
import chat.util.MessageShardingUtil;
|
import cn.wildfirechat.proto.WFCMessage;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.Channel;
|
|
public class MessagesPublisher {
|
|
public void publish2Receivers(WFCMessage.Message messageT, Collection<String> receivers, int pullType) {
|
String sender = messageT.getFromUser();
|
int conversationType = messageT.getConversation().getType();
|
String target = messageT.getConversation().getTarget();
|
int line = messageT.getConversation().getLine();
|
long messageId = messageT.getMessageId();
|
int messageContentType = messageT.getContent().getType();
|
MessageContainer messageContainer;
|
|
String pushContent;
|
if (messageT.getContent().getPersistFlag() == Transparent) {
|
pushContent = null;
|
} else {
|
if (messageContentType == ProtoConstants.ContentType.Text) {
|
pushContent = messageT.getContent().getPushContent();
|
}
|
}
|
|
String pushData = messageT.getContent().getPushData();
|
|
long serverTime = messageT.getServerTimestamp();
|
int mentionType = messageT.getContent().getMentionedType();
|
List<String> mentionTargets = messageT.getContent().getMentionedTargetList();
|
int persistFlag = messageT.getContent().getPersistFlag();
|
WFCMessage.Message message = null;
|
|
if (persistFlag == Transparent) {
|
publishTransparentMessage2Receivers(messageT, receivers, pullType);
|
return;
|
}
|
|
try {
|
for (String usr : receivers) {
|
if (usr.equals(sender)) {
|
continue;
|
}
|
|
long messageSeq = MessageShardingUtil.generateId();
|
|
//根据sender、targetId 定位ChatSpaceType, 保存message数据
|
if(messageT.getConversation().getType() == 0) {
|
//PrivateFriendBucket.getInstance().saveMessage(messageT, messageSeq);
|
}
|
//群组发消息
|
else if (messageT.getConversation().getType() == 1) {
|
}
|
|
if (!usr.equals(sender)) {
|
WFCMessage.NotifyMessage notifyMessage = WFCMessage.NotifyMessage
|
.newBuilder()
|
.setType(pullType)
|
.setHead(messageSeq)
|
.build();
|
ByteBuf payload = Unpooled.buffer();
|
byte[] byteData = notifyMessage.toByteArray();
|
|
payload.ensureWritable(byteData.length).writeBytes(byteData);
|
MqttPublishMessage publishMsg;
|
publishMsg = notRetainedPublish(IMTopic.NotifyMessageTopic, MqttQoS.AT_MOST_ONCE, payload);
|
|
Client client = UserStore.getById(usr).getClient(usr);
|
|
Session sessionC = client.getSession();
|
if (sessionC != null) {
|
Channel channel = sessionC.getChannel();
|
if (publishMsg != null) {
|
channel.writeAndFlush(publishMsg);
|
}
|
}
|
}
|
}
|
} catch(Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
private void publishTransparentMessage2Receivers(WFCMessage.Message message, Collection<String> receivers, int pullType) {
|
for (String user : receivers) {
|
ByteBuf payload = Unpooled.buffer();
|
byte[] byteData = message.toByteArray();
|
payload.ensureWritable(byteData.length).writeBytes(byteData);
|
MqttPublishMessage publishMsg;
|
publishMsg = notRetainedPublish(IMTopic.SendMessageTopic, MqttQoS.AT_MOST_ONCE, payload);
|
|
Client client = UserStore.getById(user).getClient(user);
|
|
Session sessionC = client.getSession();
|
if (sessionC != null) {
|
Channel channel = sessionC.getChannel();
|
if (publishMsg != null) {
|
channel.writeAndFlush(publishMsg);
|
}
|
}
|
}
|
}
|
|
private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
|
return notRetainedPublishWithMessageId(topic, qos, message, 0);
|
}
|
|
private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
|
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
|
return new MqttPublishMessage(fixedHeader, varHeader, message);
|
}
|
}
|