package chat.message; import static chat.consts.ProtoConstants.PersistFlag.Transparent; import java.util.Collection; import java.util.List; import chat.consts.ProtoConstants; import chat.module.PrivateFriendBucket; import chat.module.entity.MessageContainer; import chat.module.entity.PrivateFriend; import chat.server.im.IMTopic; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttPublishVariableHeader; import chat.server.moquette.message.MqttQoS; import chat.user.Client; import chat.user.Session; import chat.user.UserStore; import chat.util.MessageShardingUtil; import cn.wildfirechat.proto.WFCMessage; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; public class MessagesPublisher { public void publish2Receivers(WFCMessage.Message messageT, Collection receivers, int pullType) { String sender = messageT.getFromUser(); int conversationType = messageT.getConversation().getType(); String target = messageT.getConversation().getTarget(); int line = messageT.getConversation().getLine(); long messageId = messageT.getMessageId(); int messageContentType = messageT.getContent().getType(); MessageContainer messageContainer; String pushContent; if (messageT.getContent().getPersistFlag() == Transparent) { pushContent = null; } else { if (messageContentType == ProtoConstants.ContentType.Text) { pushContent = messageT.getContent().getPushContent(); } } String pushData = messageT.getContent().getPushData(); long serverTime = messageT.getServerTimestamp(); int mentionType = messageT.getContent().getMentionedType(); List mentionTargets = messageT.getContent().getMentionedTargetList(); int persistFlag = messageT.getContent().getPersistFlag(); WFCMessage.Message message = null; if (persistFlag == Transparent) { publishTransparentMessage2Receivers(messageT, receivers, pullType); return; } try { for (String usr : receivers) { if (usr.equals(sender)) { continue; } long messageSeq = MessageShardingUtil.generateId(); //根据sender、targetId 定位ChatSpaceType, 保存message数据 if(messageT.getConversation().getType() == 0) { //PrivateFriendBucket.getInstance().saveMessage(messageT, messageSeq); } //群组发消息 else if (messageT.getConversation().getType() == 1) { } if (!usr.equals(sender)) { WFCMessage.NotifyMessage notifyMessage = WFCMessage.NotifyMessage .newBuilder() .setType(pullType) .setHead(messageSeq) .build(); ByteBuf payload = Unpooled.buffer(); byte[] byteData = notifyMessage.toByteArray(); payload.ensureWritable(byteData.length).writeBytes(byteData); MqttPublishMessage publishMsg; publishMsg = notRetainedPublish(IMTopic.NotifyMessageTopic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(usr).getClient(usr); Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } } } catch(Exception e) { e.printStackTrace(); } } private void publishTransparentMessage2Receivers(WFCMessage.Message message, Collection receivers, int pullType) { for (String user : receivers) { ByteBuf payload = Unpooled.buffer(); byte[] byteData = message.toByteArray(); payload.ensureWritable(byteData.length).writeBytes(byteData); MqttPublishMessage publishMsg; publishMsg = notRetainedPublish(IMTopic.SendMessageTopic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(user).getClient(user); Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } } private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) { return notRetainedPublishWithMessageId(topic, qos, message, 0); } private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId); return new MqttPublishMessage(fixedHeader, varHeader, message); } }