package chat.module; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import chat.consts.ProtoConstants; import chat.module.entity.ChatSpaceType; import chat.module.entity.Group; import chat.module.entity.Member; import chat.module.entity.MessageContainer; import chat.module.entity.MessageRecord; import chat.server.im.IMTopic; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttPublishVariableHeader; import chat.server.moquette.message.MqttQoS; import chat.user.Client; import chat.user.Session; import chat.user.User; import chat.user.UserStore; import chat.util.MessageShardingUtil; import cn.wildfirechat.proto.WFCMessage; import cn.wildfirechat.proto.WFCMessage.Message; import frame.object.data.DataObject; import frame.object.data.Entity; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import static chat.consts.ProtoConstants.PersistFlag.Transparent; public class Bucket<T extends MessageContainer> { private ConcurrentMapList<T> itemMap; private static int pullType; public Bucket() { itemMap = new ConcurrentMapList<T>(); } public static Bucket<?> getInstance(ChatSpaceType spaceType) { if (ChatSpaceType.Private == spaceType) { pullType = ProtoConstants.PullType.Pull_Normal; return PrivateFriendBucket.getInstance(); } else if (ChatSpaceType.Group == spaceType) { pullType = ProtoConstants.PullType.Pull_Normal; return GroupBucket.getInstance(); } return null; } public void sendGroupNotification(User user, WFCMessage.CreateGroupRequest request, WFCMessage.MessageContent content, Group group) { String targetId = group.getId(); List<Integer> lines = request.getToLineList(); if (lines == null) { lines = new ArrayList<>(); } else { lines = new ArrayList<>(lines); } if (lines.isEmpty()) { lines.add(0); } for (int line : lines) { long timestamp = System.currentTimeMillis(); WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder().setContent(content).setServerTimestamp(timestamp); builder.setConversation(builder.getConversationBuilder().setType(ProtoConstants.ConversationType.ConversationType_Group).setTarget(targetId).setLine(line)); builder.setFromUser(user.getId()); try { long messageId = MessageShardingUtil.generateId(); builder.setMessageId(messageId); pushOneMessage(builder.build(), user, request); } catch (Exception e) { e.printStackTrace(); } } } public void pushOneMessage(Message message, User user) throws Exception { ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, null, true)); } public void pushOneMessage(Message message, User user, boolean isExceptUser) throws Exception { ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, null, isExceptUser)); } public void pushOneMessage(Message message, User user, WFCMessage.CreateGroupRequest request) throws Exception { ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, request, true)); } public void doPushOneMessage(Message message, User user, WFCMessage.CreateGroupRequest request, boolean isExceptUser) { //String containerId = getMessageContainerID(message); //1.得到messageContainer 如果是ç§èŠ......,如果是群èŠ..... Set<String> notifyReceivers = new LinkedHashSet<String>(); WFCMessage.Message.Builder messageBuilder = message.toBuilder(); int pullType = 0; if (request == null) { pullType = getNotifyReceivers(user.getId(), messageBuilder, notifyReceivers); } else {//æ˜¯ç¾¤ç»„çš„æ¶ˆæ¯ pullType = getGroupNotifyReceiver(request.getGroup().getMembersList(), notifyReceivers); } if (message.getContent().getPersistFlag() == Transparent) { notifyReceivers.remove(messageBuilder.getFromUser()); } if (message.getContent().getPersistFlag() == Transparent) { publishTransparentMessage2Receivers(message, notifyReceivers, pullType); return; } MessageContainer container = getMessageContainerID(message); if (container == null) { return; } String userId = user.getId(); String spaceCode = container.getOneMember(userId).getSpacecode(); String spaceId = container.getOneMember(userId).getSpaceid(); try { MessageRecord record = MessageRecord.getInstance(spaceCode, spaceId, message); container.addOneMessage(pullType, record, user, message, isExceptUser); } catch(Exception e) { e.printStackTrace(); } } private void publishTransparentMessage2Receivers(WFCMessage.Message message, Collection<String> receivers, int pullType) { for (String user : receivers) { ByteBuf payload = Unpooled.buffer(); byte[] byteData = message.toByteArray(); payload.ensureWritable(byteData.length).writeBytes(byteData); MqttPublishMessage publishMsg; publishMsg = notRetainedPublish(IMTopic.SendMessageTopic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(user).getClient(user); Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } } private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) { return notRetainedPublishWithMessageId(topic, qos, message, 0); } private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId); return new MqttPublishMessage(fixedHeader, varHeader, message); } public int getGroupNotifyReceiver(List<WFCMessage.GroupMember> members, Set<String> notifyReceivers) { int pullType = ProtoConstants.PullType.Pull_Normal; for (WFCMessage.GroupMember member : members) { notifyReceivers.add(member.getMemberId()); } return pullType; } public int getNotifyReceivers(String userId, WFCMessage.Message.Builder messageBuilder, Set<String> notifyReceivers) { int pullType = ProtoConstants.PullType.Pull_Normal; if (ChatSpaceType.Group == ChatSpaceType.pase(messageBuilder.getConversation().getType())) { Group friendGroup = GroupBucket.getGroup(messageBuilder.getConversation().getTarget()); for(Member member : friendGroup.getMemberList()) { notifyReceivers.add(member.getUserId()); } } else if (ChatSpaceType.Private == ChatSpaceType.pase(messageBuilder.getConversation().getType())){ notifyReceivers.add(userId); notifyReceivers.add(messageBuilder.getConversation().getTarget()); } return pullType; } private MessageContainer getMessageContainerID(Message message) { MessageContainer msgContainer = null; if (message.getConversation().getType() == 0) { //å–å‡ºæ‰€æœ‰ç§æœ‰çš„MeessageContainerï¼Œæ ¹æ®äººæ¥æŸ¥æ‰¾å…·ä½“çš„ç§èŠid(å¯ä»¥æƒ³è±¡ä¸ºä¸€ä¸ªç‰¹æ®Šçš„ç¾¤ç»„ï¼Œåªæœ‰äºŒä¸ªäºº) //éœ€è¦æ¯”较å‘é€è€…与接收者 for (MessageContainer messageContainer : getAll()) { if (messageContainer.getOneMember(message.getFromUser()) != null && messageContainer.getOneMember(message.getConversation().getTarget()) != null) { msgContainer = messageContainer; break; } } } else if (message.getConversation().getType() == 1) { msgContainer = GroupBucket.getGroup(message.getConversation().getTarget());//itemMap.get(message.getConversation().getTarget()); } return msgContainer; } public T createOne(String tableName, T one) throws Exception { DataObject dataObject = DataObject.getInstance(tableName); Entity entity = dataObject.newEntity(); one.pushTo(entity); dataObject.insertToDataBase(entity); return one; } public void addOne(String key, T item) { itemMap.add(key, item); } public T getOne(String key) { if (key == null) { return null; } return itemMap.get(key); } public T deleteOne(String key) { return itemMap.delete(key); } public List<T> getAll() { return itemMap.getList(); } public int size() { return itemMap.size(); } }