package chat.module.entity; import java.util.ArrayList; import java.util.Base64; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import com.google.protobuf.ByteString; import chat.module.ConcurrentMapList; import chat.server.im.IMTopic; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessage; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttPublishVariableHeader; import chat.server.moquette.message.MqttQoS; import chat.user.Client; import chat.user.Session; import chat.user.User; import chat.user.UserStore; import cn.wildfirechat.pojos.GroupNotificationBinaryContent; import cn.wildfirechat.proto.WFCMessage; import cn.wildfirechat.proto.WFCMessage.GroupInfo; import cn.wildfirechat.proto.WFCMessage.Message; import cn.wildfirechat.proto.WFCMessage.MessageContent; import frame.object.data.DataObject; import frame.object.data.Entity; import frame.persist.NamedSQL; import frame.persist.SQLRunner; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import chat.persist.*; public abstract class MessageContainer { protected ConcurrentMapList messageList; protected ConcurrentMapList memberList; protected ConcurrentMapList messageRelationList; private static Map allLoadedMessageMap; static { allLoadedMessageMap = new HashMap(); } public MessageContainer() { messageList = new ConcurrentMapList(); memberList = new ConcurrentMapList(); messageRelationList = new ConcurrentMapList(); } public MessageContainer(String name, User creator) { this(); } public void loadOneMember(Entity entity) throws Exception { String id = entity.getString("id"); Member member = new Member(id, ""); member.load(entity); memberList.add(member.getUserId(), member); } public void loadOneMessage(Entity entity) throws Exception { MessageRecord messageRecord = new MessageRecord(); messageRecord.load(entity); String id = messageRecord.getId(); messageList.add(id, messageRecord); allLoadedMessageMap.put(id, messageRecord); } public void loadOneMessageRelation(Entity entity) throws Exception { MessageRelation messageRelation = new MessageRelation(); messageRelation.load(entity); //1.将消息关系取出 messageRelationList.add(messageRelation.getId(), messageRelation); //2.取出所有没有发的消息 String userId = messageRelation.getTargetid(); User user = UserStore.getById(userId); user.addOneUnreceivedMessage(messageRelation); } public abstract void pushTo(Entity entity) throws Exception; public List getMemberList() { return memberList.getList(); } public int getMemberCount() { return memberList.getList().size(); } public static List getMessageList(MessageRecord messageRecord, User user) { List messageList = new ArrayList(); WFCMessage.Message message = null; try { WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder(); builder.setMessageId(messageRecord.getMessageid()); builder.setFromUser(messageRecord.getSenderId()); WFCMessage.Conversation.Builder cb = WFCMessage.Conversation.newBuilder(); cb.setType(0); cb.setTarget(user.getId()); cb.setLine(0); builder.setConversation(cb.build()); WFCMessage.MessageContent.Builder contentBuilder = WFCMessage.MessageContent.newBuilder(); contentBuilder.setType(1); String str = messageRecord.getContent(); byte[] bytes = str.getBytes(); bytes = Base64.getEncoder().encode(bytes); contentBuilder.setSearchableContent(str); contentBuilder.setMediaType(0); contentBuilder.setPersistFlag(0); contentBuilder.setExpireDuration(0); contentBuilder.setMentionedType(0); WFCMessage.MessageContent messageContent = contentBuilder.build(); builder.setContent(messageContent); builder.setServerTimestamp(System.currentTimeMillis()); message = builder.build(); messageList.add(message); } catch (Exception e) { e.printStackTrace(); } return messageList; } public void addMembers(String parentId, List userList) throws Exception { for (String userId: userList) { User user = User.getInstance(userId); Member member = new Member(parentId, ""); String key = user.getId(); addOneMemberToDataBase(member); memberList.add(key, member); } } public void addInitMembers(String spaceId, List userList) throws Exception { for (String userId: userList) { User user = User.getInstance(userId); Member member = new Member(spaceId, user); String key = user.getId(); addOneMemberToDataBase(member); memberList.add(key, member); } } public void addMembersWfc(String parentId, List membersList) throws Exception { for (WFCMessage.GroupMember groupMember : membersList) { User user = User.getInstance(groupMember.getMemberId()); Member member = new Member(parentId, ""); String key = user.getId(); addOneMemberToDataBase(member); memberList.add(key, member); } } public void kickOffMembers(List userList) throws Exception { for (String userId: userList) { Member member = memberList.get(userId); if (member == null) { continue; } removeOneMemberToDataBase(member); memberList.delete(userId); } } public void addOneMember(Member member) { String key = member.getUserId(); memberList.add(key, member); } public void deleteOneMember(String userId) throws Exception { Member member = memberList.get(userId); if (member == null) { return; } removeOneMemberToDataBase(member); memberList.delete(userId); } public void join(String parentId, User user) { Member member = new Member(parentId, ""); String key = user.getId(); memberList.add(key, member); } public void leave(User user) { String key = user.getId(); memberList.delete(key); } public Member getOneMember(String userId) { return memberList.get(userId); } public List getMessageList(long timestamp) { List result = new ArrayList(); List list = messageList.getList(); int max = list.size() - 1; MessageRecord record; for (int i = max; i >= 0; i--) { record = list.get(i); if (record.isFreshThan(timestamp)) { result.add(record); } else { break; } } return result; } public void addOneMessage(int pullType, MessageRecord record, User sender, Message message, boolean isExceptUser) throws Exception { //1. set sender record.setSender(sender); //2. insert to database insertOneMessage(record); //3. add to bucket messageList.add(record.getId(), record); //4. send notify to target publishOneNotify(pullType, record, sender, message, isExceptUser); } public void publishOneNotify(int pullType, MessageRecord record, User sender, Message message, boolean isExceptUser) throws Exception { int line = 0; String userId = sender.getId(); for (Member member: memberList) { //0. 剔除自己 if (userId.equals(member.getUserId()) && message.getContent().getType() != 104 && isExceptUser) { continue; } //1. add message relation MessageRelation relation = MessageRelation.getInstance(line, member.getUserId(), record, message); addOneMessageRelations(relation); if (message.getContent().getType() == 3) { relation.getMessageRecord().setMessage(createOneMessage(relation.getMessageRecord(), message)); } else { relation.getMessageRecord().setMessage(createOneMessage(relation.getMessageRecord(), relation, message)); } //2. 将消息挂到目标用户下面 User user = member.getUser(); user.pushOneMessageRelation(relation); //3. try to notify to user notifyOneClient(pullType, member.getUserId(), isExceptUser); } } public MessageContent createGroupMessage(GroupInfo groupInfo, String fromUser) { // TODO Auto-generated method stub WFCMessage.MessageContent content = new GroupNotificationBinaryContent(groupInfo.getTargetId(), fromUser, groupInfo.getName(), "").getCreateGroupNotifyContent(); return content; } public WFCMessage.Message createOneMessage(MessageRecord message, Message messageSrc) { WFCMessage.Message messageWFC = null; try { WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder(); builder.setMessageId(message.getMessageid()); builder.setServerTimestamp(message.getTimestamp()); builder.setFromUser(messageSrc.getFromUser()); WFCMessage.Conversation.Builder cb = WFCMessage.Conversation.newBuilder(); cb.setType(messageSrc.getConversation().getType()); cb.setTarget(messageSrc.getConversation().getTarget()); cb.setLine(messageSrc.getConversation().getLine()); builder.setConversation(cb.build()); WFCMessage.MessageContent.Builder contentBuilder = WFCMessage.MessageContent.newBuilder(); contentBuilder.setType(message.getType()); if (message.getType() == 104) { contentBuilder.setData(ByteString.copyFrom(message.getContent().getBytes())); } else if (message.getType() == 90) { contentBuilder.setContent(messageSrc.getContent().getContent()); System.out.println(message.getContent()); } else if (message.getType() == 3) { contentBuilder.setType(messageSrc.getContent().getType()); contentBuilder.setSearchableContent(messageSrc.getContent().getSearchableContent()); contentBuilder.setContent(messageSrc.getContent().getContent()); contentBuilder.setData(messageSrc.getContent().getData()); contentBuilder.setMediaType(messageSrc.getContent().getMediaType()); contentBuilder.setRemoteMediaUrl(messageSrc.getContent().getRemoteMediaUrl()); contentBuilder.setPersistFlag(messageSrc.getContent().getPersistFlag()); } else { contentBuilder.setSearchableContent(message.getContent()); contentBuilder.setMediaType(0); contentBuilder.setPersistFlag(message.getPersistflag()); contentBuilder.setExpireDuration(0); contentBuilder.setMentionedType(0); } WFCMessage.MessageContent messageContent = contentBuilder.build(); builder.setContent(messageContent); messageWFC = builder.build(); } catch(Exception e) { e.printStackTrace(); } return messageWFC; } public WFCMessage.Message createOneMessage(MessageRecord message, MessageRelation messageRelation, WFCMessage.Message messageSend) { WFCMessage.Message messageWFC = null; try { WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder(); builder.setMessageId(message.getMessageid()); builder.setServerTimestamp(message.getTimestamp()); if (message.getSenderId().equals(messageRelation.getTargetid())) { builder.setFromUser(messageSend.getConversation().getTarget()); } else { builder.setFromUser(message.getSenderId()); } WFCMessage.Conversation.Builder cb = WFCMessage.Conversation.newBuilder(); cb.setType(messageRelation.getType()); cb.setTarget(messageRelation.getTargetid()); cb.setLine(messageRelation.getLine()); builder.setConversation(cb.build()); WFCMessage.MessageContent.Builder contentBuilder = WFCMessage.MessageContent.newBuilder(); contentBuilder.setType(message.getType()); if (message.getType() == 104) { contentBuilder.setData(ByteString.copyFrom(message.getContent().getBytes())); } else if (message.getType() == 90) { contentBuilder.setContent(message.getContent()); } else { contentBuilder.setSearchableContent(message.getContent()); contentBuilder.setMediaType(0); contentBuilder.setPersistFlag(message.getPersistflag()); contentBuilder.setExpireDuration(0); contentBuilder.setMentionedType(0); } WFCMessage.MessageContent messageContent = contentBuilder.build(); builder.setContent(messageContent); messageWFC = builder.build(); } catch(Exception e) { e.printStackTrace(); } return messageWFC; } public void addOneMessageRelations(MessageRelation relation) throws Exception { //1. insert to database relation.insertToDataBase(relation); //2. add to bucket messageRelationList.add(relation.getId(), relation); } public void setMemberAlias(String memberId, String alias) { Member member = memberList.get(memberId); if (member == null) { return; } // member.set // Friendship friendShip = mapList.get(key); // friendShip.setAlias(friendId, alias); } public void notify(PublishOperator operator, String senderId) { // TODO Auto-generated method stub } public void notify(PublishOperator operator, Object data, String senderId) { // TODO Auto-generated method stub } public void notify(PublishOperator operator, MessageRecord record, String senderId) { // TODO Auto-generated method stub } public void notify(PublishOperator operator, User sender) { // TODO Auto-generated method stub } public void notify(PublishOperator operator, MessageRecord record, User sender) { // TODO Auto-generated method stub } public boolean existsDeleteRight(String senderId) { // TODO Auto-generated method stub return false; } private static int insertOneMessage(MessageRecord message) throws Exception { DataObject dataObject = DataObject.getInstance("message"); Entity entity = dataObject.newEntity(); message.pushTo(entity); return dataObject.insertToDataBase(entity); } //向添加好友的账号,发送消息 public void notifyOneFriend(String topic, String receiver, long head) { MqttPublishMessage publishMsg; ByteBuf payload = Unpooled.buffer(); try { payload.writeLong(head); publishMsg = notRetainedPublish(topic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(receiver).getClient(receiver); if (client == null) { return; } Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } catch (Exception e) { e.printStackTrace(); } } //向添加好友的账号,发送消息 public void notifyOneFriendRequest(String topic, WFCMessage.AddFriendRequest request, long head, String fromUser) { MqttPublishMessage publishMsg; ByteBuf payload = Unpooled.buffer(); String receiver = request.getTargetUid(); //String pushContent = request.getReason(); try { payload.writeLong(head); publishMsg = notRetainedPublish(topic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(receiver).getClient(receiver); if (client == null) { return; } Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } catch (Exception e) { e.printStackTrace(); } } protected boolean notifyOneClient(int pullType, String targetId, boolean isExceptUser) { boolean result = false; try { long messageSeq = System.currentTimeMillis(); WFCMessage.NotifyMessage notifyMessage = WFCMessage.NotifyMessage .newBuilder() .setType(pullType) .setHead(messageSeq) .build(); ByteBuf payload = Unpooled.buffer(); byte[] byteData = notifyMessage.toByteArray(); payload.ensureWritable(byteData.length).writeBytes(byteData); MqttPublishMessage publishMsg; publishMsg = notRetainedPublish(IMTopic.NotifyMessageTopic, MqttQoS.AT_MOST_ONCE, payload); Client client = UserStore.getById(targetId).getClient(targetId); if (client == null) { return false; } Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } result = true; } catch (Exception e) { e.printStackTrace(); } return result; } private void addOneMemberToDataBase(Member member) throws Exception { DataObject dataObject = DataObject.getInstance("member"); Entity entity = dataObject.newEntity(); member.pushTo(entity); dataObject.insertToDataBase(entity); } private void removeOneMemberToDataBase(Member member) throws Exception { NamedSQL namedSQL = NamedSQL.getInstance("removeOneMember"); namedSQL.setParam("memberId", member.getId()); SQLRunner.execSQL(namedSQL); } public void saveMessage(String usr, Message messageT) { // TODO Auto-generated method stub } public void saveMessageRelation(String usr, Message messageT) { // TODO Auto-generated method stub } public static MessageRecord getLoadedMessage(String parentid) { if (parentid == null) { return null; } return allLoadedMessageMap.get(parentid); } private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) { return notRetainedPublishWithMessageId(topic, qos, message, 0); } private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId); return new MqttPublishMessage(fixedHeader, varHeader, message); } }