package chat.module; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import chat.consts.ProtoConstants; import chat.module.entity.ChatSpaceType; import chat.module.entity.Group; import chat.module.entity.Member; import chat.module.entity.MessageContainer; import chat.module.entity.MessageRecord; import chat.server.im.IMTopic; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttPublishVariableHeader; import chat.server.moquette.message.MqttQoS; import chat.user.Client; import chat.user.Session; import chat.user.User; import chat.user.UserStore; import chat.util.MessageShardingUtil; import cn.wildfirechat.proto.WFCMessage; import cn.wildfirechat.proto.WFCMessage.Message; import frame.object.data.DataObject; import frame.object.data.Entity; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import static chat.consts.ProtoConstants.PersistFlag.Transparent; public class Bucket { private ConcurrentMapList itemMap; private static int pullType; public Bucket() { itemMap = new ConcurrentMapList(); } public static Bucket getInstance(ChatSpaceType spaceType) { if (ChatSpaceType.Private == spaceType) { pullType = ProtoConstants.PullType.Pull_Normal; return PrivateFriendBucket.getInstance(); } else if (ChatSpaceType.Group == spaceType) { pullType = ProtoConstants.PullType.Pull_Normal; return GroupBucket.getInstance(); } return null; } public void sendGroupNotification(User user, WFCMessage.CreateGroupRequest request, WFCMessage.MessageContent content, Group group) { String targetId = group.getId(); List lines = request.getToLineList(); if (lines == null) { lines = new ArrayList<>(); } else { lines = new ArrayList<>(lines); } if (lines.isEmpty()) { lines.add(0); } for (int line : lines) { long timestamp = System.currentTimeMillis(); WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder().setContent(content).setServerTimestamp(timestamp); builder.setConversation(builder.getConversationBuilder().setType(ProtoConstants.ConversationType.ConversationType_Group).setTarget(targetId).setLine(line)); builder.setFromUser(user.getId()); try { long messageId = MessageShardingUtil.generateId(); builder.setMessageId(messageId); pushOneMessage(builder.build(), user, request); } catch (Exception e) { e.printStackTrace(); } } } public void pushOneMessage(Message message, User user) throws Exception { ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, null, true)); } public void pushOneMessage(Message message, User user, boolean isExceptUser) throws Exception { ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, null, isExceptUser)); } public void pushOneMessage(Message message, User user, WFCMessage.CreateGroupRequest request) throws Exception { ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, request, true)); } public void doPushOneMessage(Message message, User user, WFCMessage.CreateGroupRequest request, boolean isExceptUser) { //String containerId = getMessageContainerID(message); //1.得到messageContainer 如果是私聊......,如果是群聊..... Set notifyReceivers = new LinkedHashSet(); WFCMessage.Message.Builder messageBuilder = message.toBuilder(); int pullType = 0; if (request == null) { pullType = getNotifyReceivers(user.getId(), messageBuilder, notifyReceivers); } else {//是群组的消息 pullType = getGroupNotifyReceiver(request.getGroup().getMembersList(), notifyReceivers); } if (message.getContent().getPersistFlag() == Transparent) { notifyReceivers.remove(messageBuilder.getFromUser()); } if (message.getContent().getPersistFlag() == Transparent) { publishTransparentMessage2Receivers(message, notifyReceivers, pullType); return; } MessageContainer container = getMessageContainerID(message); if (container == null) { return; } String userId = user.getId(); String spaceCode = container.getOneMember(userId).getSpacecode(); String spaceId = container.getOneMember(userId).getSpaceid(); try { MessageRecord record = MessageRecord.getInstance(spaceCode, spaceId, message); container.addOneMessage(pullType, record, user, message, isExceptUser); } catch(Exception e) { e.printStackTrace(); } } private void publishTransparentMessage2Receivers(WFCMessage.Message message, Collection receivers, int pullType) { for (String user : receivers) { ByteBuf payload = Unpooled.buffer(); byte[] byteData = message.toByteArray(); payload.ensureWritable(byteData.length).writeBytes(byteData); MqttPublishMessage publishMsg; publishMsg = notRetainedPublish(IMTopic.SendMessageTopic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(user).getClient(user); Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } } private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) { return notRetainedPublishWithMessageId(topic, qos, message, 0); } private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId); return new MqttPublishMessage(fixedHeader, varHeader, message); } public int getGroupNotifyReceiver(List members, Set notifyReceivers) { int pullType = ProtoConstants.PullType.Pull_Normal; for (WFCMessage.GroupMember member : members) { notifyReceivers.add(member.getMemberId()); } return pullType; } public int getNotifyReceivers(String userId, WFCMessage.Message.Builder messageBuilder, Set notifyReceivers) { int pullType = ProtoConstants.PullType.Pull_Normal; if (ChatSpaceType.Group == ChatSpaceType.pase(messageBuilder.getConversation().getType())) { Group friendGroup = GroupBucket.getGroup(messageBuilder.getConversation().getTarget()); for(Member member : friendGroup.getMemberList()) { notifyReceivers.add(member.getUserId()); } } else if (ChatSpaceType.Private == ChatSpaceType.pase(messageBuilder.getConversation().getType())){ notifyReceivers.add(userId); notifyReceivers.add(messageBuilder.getConversation().getTarget()); } return pullType; } private MessageContainer getMessageContainerID(Message message) { MessageContainer msgContainer = null; if (message.getConversation().getType() == 0) { //取出所有私有的MeessageContainer,根据人来查找具体的私聊id(可以想象为一个特殊的群组,只有二个人) //需要比较发送者与接收者 for (MessageContainer messageContainer : getAll()) { if (messageContainer.getOneMember(message.getFromUser()) != null && messageContainer.getOneMember(message.getConversation().getTarget()) != null) { msgContainer = messageContainer; break; } } } else if (message.getConversation().getType() == 1) { msgContainer = GroupBucket.getGroup(message.getConversation().getTarget());//itemMap.get(message.getConversation().getTarget()); } return msgContainer; } public T createOne(String tableName, T one) throws Exception { DataObject dataObject = DataObject.getInstance(tableName); Entity entity = dataObject.newEntity(); one.pushTo(entity); dataObject.insertToDataBase(entity); return one; } public void addOne(String key, T item) { itemMap.add(key, item); } public T getOne(String key) { if (key == null) { return null; } return itemMap.get(key); } public T deleteOne(String key) { return itemMap.delete(key); } public List getAll() { return itemMap.getList(); } public int size() { return itemMap.size(); } }