package chat.module.entity;
|
|
import java.util.ArrayList;
|
import java.util.Base64;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Set;
|
|
import com.google.protobuf.ByteString;
|
|
import chat.module.ConcurrentMapList;
|
import chat.server.im.IMTopic;
|
import chat.server.moquette.message.MqttFixedHeader;
|
import chat.server.moquette.message.MqttMessage;
|
import chat.server.moquette.message.MqttMessageType;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.server.moquette.message.MqttPublishVariableHeader;
|
import chat.server.moquette.message.MqttQoS;
|
import chat.user.Client;
|
import chat.user.Session;
|
import chat.user.User;
|
import chat.user.UserStore;
|
import cn.wildfirechat.pojos.GroupNotificationBinaryContent;
|
import cn.wildfirechat.proto.WFCMessage;
|
import cn.wildfirechat.proto.WFCMessage.GroupInfo;
|
import cn.wildfirechat.proto.WFCMessage.Message;
|
import cn.wildfirechat.proto.WFCMessage.MessageContent;
|
import frame.object.data.DataObject;
|
import frame.object.data.Entity;
|
import frame.persist.NamedSQL;
|
import frame.persist.SQLRunner;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.Channel;
|
import chat.persist.*;
|
|
public abstract class MessageContainer {
|
|
protected ConcurrentMapList<MessageRecord> messageList;
|
protected ConcurrentMapList<Member> memberList;
|
protected ConcurrentMapList<MessageRelation> messageRelationList;
|
private static Map<String, MessageRecord> allLoadedMessageMap;
|
|
static {
|
allLoadedMessageMap = new HashMap<String, MessageRecord>();
|
}
|
|
public MessageContainer() {
|
messageList = new ConcurrentMapList<MessageRecord>();
|
memberList = new ConcurrentMapList<Member>();
|
messageRelationList = new ConcurrentMapList<MessageRelation>();
|
}
|
|
public MessageContainer(String name, User creator) {
|
this();
|
}
|
|
public void loadOneMember(Entity entity) throws Exception {
|
String id = entity.getString("id");
|
Member member = new Member(id, "");
|
member.load(entity);
|
|
memberList.add(member.getUserId(), member);
|
}
|
|
public void loadOneMessage(Entity entity) throws Exception {
|
MessageRecord messageRecord = new MessageRecord();
|
messageRecord.load(entity);
|
|
String id = messageRecord.getId();
|
|
messageList.add(id, messageRecord);
|
allLoadedMessageMap.put(id, messageRecord);
|
}
|
|
public void loadOneMessageRelation(Entity entity) throws Exception {
|
MessageRelation messageRelation = new MessageRelation();
|
messageRelation.load(entity);
|
|
//1.将消息关系取出
|
messageRelationList.add(messageRelation.getId(), messageRelation);
|
|
//2.取出所有没有发的消息
|
String userId = messageRelation.getTargetid();
|
User user = UserStore.getById(userId);
|
user.addOneUnreceivedMessage(messageRelation);
|
}
|
|
public abstract void pushTo(Entity entity) throws Exception;
|
|
public List<Member> getMemberList() {
|
return memberList.getList();
|
}
|
|
public int getMemberCount() {
|
return memberList.getList().size();
|
}
|
|
public static List<WFCMessage.Message> getMessageList(MessageRecord messageRecord, User user) {
|
List<WFCMessage.Message> messageList = new ArrayList<WFCMessage.Message>();
|
|
WFCMessage.Message message = null;
|
|
try {
|
WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder();
|
|
builder.setMessageId(messageRecord.getMessageid());
|
builder.setFromUser(messageRecord.getSenderId());
|
|
WFCMessage.Conversation.Builder cb = WFCMessage.Conversation.newBuilder();
|
cb.setType(0);
|
cb.setTarget(user.getId());
|
cb.setLine(0);
|
builder.setConversation(cb.build());
|
|
WFCMessage.MessageContent.Builder contentBuilder = WFCMessage.MessageContent.newBuilder();
|
contentBuilder.setType(1);
|
String str = messageRecord.getContent();
|
byte[] bytes = str.getBytes();
|
bytes = Base64.getEncoder().encode(bytes);
|
contentBuilder.setSearchableContent(str);
|
contentBuilder.setMediaType(0);
|
contentBuilder.setPersistFlag(0);
|
contentBuilder.setExpireDuration(0);
|
contentBuilder.setMentionedType(0);
|
WFCMessage.MessageContent messageContent = contentBuilder.build();
|
builder.setContent(messageContent);
|
|
builder.setServerTimestamp(System.currentTimeMillis());
|
|
message = builder.build();
|
|
messageList.add(message);
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
|
return messageList;
|
}
|
|
public void addMembers(String parentId, List<String> userList) throws Exception {
|
for (String userId: userList) {
|
User user = User.getInstance(userId);
|
Member member = new Member(parentId, "");
|
String key = user.getId();
|
|
addOneMemberToDataBase(member);
|
memberList.add(key, member);
|
}
|
}
|
|
public void addInitMembers(String spaceId, List<String> userList) throws Exception {
|
for (String userId: userList) {
|
User user = User.getInstance(userId);
|
Member member = new Member(spaceId, user);
|
String key = user.getId();
|
|
addOneMemberToDataBase(member);
|
memberList.add(key, member);
|
}
|
}
|
|
public void addMembersWfc(String parentId, List<WFCMessage.GroupMember> membersList) throws Exception {
|
for (WFCMessage.GroupMember groupMember : membersList) {
|
User user = User.getInstance(groupMember.getMemberId());
|
Member member = new Member(parentId, "");
|
String key = user.getId();
|
|
addOneMemberToDataBase(member);
|
memberList.add(key, member);
|
}
|
}
|
|
public void kickOffMembers(List<String> userList) throws Exception {
|
for (String userId: userList) {
|
Member member = memberList.get(userId);
|
|
if (member == null) {
|
continue;
|
}
|
|
removeOneMemberToDataBase(member);
|
memberList.delete(userId);
|
}
|
}
|
|
public void addOneMember(Member member) {
|
String key = member.getUserId();
|
memberList.add(key, member);
|
}
|
|
public void deleteOneMember(String userId) throws Exception {
|
Member member = memberList.get(userId);
|
|
if (member == null) {
|
return;
|
}
|
|
removeOneMemberToDataBase(member);
|
memberList.delete(userId);
|
}
|
|
public void join(String parentId, User user) {
|
Member member = new Member(parentId, "");
|
String key = user.getId();
|
|
memberList.add(key, member);
|
}
|
|
public void leave(User user) {
|
String key = user.getId();
|
memberList.delete(key);
|
}
|
|
public Member getOneMember(String userId) {
|
return memberList.get(userId);
|
}
|
|
public List<MessageRecord> getMessageList(long timestamp) {
|
List<MessageRecord> result = new ArrayList<MessageRecord>();
|
|
List<MessageRecord> list = messageList.getList();
|
int max = list.size() - 1; MessageRecord record;
|
|
for (int i = max; i >= 0; i--) {
|
record = list.get(i);
|
|
if (record.isFreshThan(timestamp)) {
|
result.add(record);
|
}
|
else {
|
break;
|
}
|
}
|
|
return result;
|
}
|
|
public void addOneMessage(int pullType, MessageRecord record, User sender, Message message, boolean isExceptUser) throws Exception {
|
//1. set sender
|
record.setSender(sender);
|
|
//2. insert to database
|
insertOneMessage(record);
|
|
//3. add to bucket
|
messageList.add(record.getId(), record);
|
|
//4. send notify to target
|
publishOneNotify(pullType, record, sender, message, isExceptUser);
|
}
|
|
public void publishOneNotify(int pullType, MessageRecord record, User sender, Message message, boolean isExceptUser) throws Exception {
|
int line = 0;
|
String userId = sender.getId();
|
|
for (Member member: memberList) {
|
//0. 剔除自己
|
if (userId.equals(member.getUserId()) && message.getContent().getType() != 104 && isExceptUser) {
|
continue;
|
}
|
|
//1. add message relation
|
MessageRelation relation = MessageRelation.getInstance(line, member.getUserId(), record, message);
|
addOneMessageRelations(relation);
|
|
if (message.getContent().getType() == 3) {
|
relation.getMessageRecord().setMessage(createOneMessage(relation.getMessageRecord(), message));
|
} else {
|
relation.getMessageRecord().setMessage(createOneMessage(relation.getMessageRecord(), relation, message));
|
}
|
|
//2. 将消息挂到目标用户下面
|
User user = member.getUser();
|
user.pushOneMessageRelation(relation);
|
|
//3. try to notify to user
|
notifyOneClient(pullType, member.getUserId(), isExceptUser);
|
}
|
}
|
|
|
public MessageContent createGroupMessage(GroupInfo groupInfo, String fromUser) {
|
// TODO Auto-generated method stub
|
WFCMessage.MessageContent content = new GroupNotificationBinaryContent(groupInfo.getTargetId(), fromUser, groupInfo.getName(), "").getCreateGroupNotifyContent();
|
return content;
|
}
|
|
public WFCMessage.Message createOneMessage(MessageRecord message, Message messageSrc) {
|
WFCMessage.Message messageWFC = null;
|
|
try {
|
WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder();
|
builder.setMessageId(message.getMessageid());
|
builder.setServerTimestamp(message.getTimestamp());
|
builder.setFromUser(messageSrc.getFromUser());
|
|
WFCMessage.Conversation.Builder cb = WFCMessage.Conversation.newBuilder();
|
cb.setType(messageSrc.getConversation().getType());
|
cb.setTarget(messageSrc.getConversation().getTarget());
|
cb.setLine(messageSrc.getConversation().getLine());
|
builder.setConversation(cb.build());
|
|
WFCMessage.MessageContent.Builder contentBuilder = WFCMessage.MessageContent.newBuilder();
|
contentBuilder.setType(message.getType());
|
if (message.getType() == 104) {
|
contentBuilder.setData(ByteString.copyFrom(message.getContent().getBytes()));
|
}
|
else if (message.getType() == 90) {
|
contentBuilder.setContent(messageSrc.getContent().getContent());
|
System.out.println(message.getContent());
|
}
|
else if (message.getType() == 3) {
|
contentBuilder.setType(messageSrc.getContent().getType());
|
contentBuilder.setSearchableContent(messageSrc.getContent().getSearchableContent());
|
contentBuilder.setContent(messageSrc.getContent().getContent());
|
contentBuilder.setData(messageSrc.getContent().getData());
|
contentBuilder.setMediaType(messageSrc.getContent().getMediaType());
|
contentBuilder.setRemoteMediaUrl(messageSrc.getContent().getRemoteMediaUrl());
|
contentBuilder.setPersistFlag(messageSrc.getContent().getPersistFlag());
|
}
|
else {
|
contentBuilder.setSearchableContent(message.getContent());
|
contentBuilder.setMediaType(0);
|
contentBuilder.setPersistFlag(message.getPersistflag());
|
contentBuilder.setExpireDuration(0);
|
contentBuilder.setMentionedType(0);
|
}
|
|
WFCMessage.MessageContent messageContent = contentBuilder.build();
|
|
builder.setContent(messageContent);
|
|
messageWFC = builder.build();
|
|
} catch(Exception e) {
|
e.printStackTrace();
|
}
|
return messageWFC;
|
}
|
|
public WFCMessage.Message createOneMessage(MessageRecord message, MessageRelation messageRelation, WFCMessage.Message messageSend) {
|
WFCMessage.Message messageWFC = null;
|
|
try {
|
WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder();
|
builder.setMessageId(message.getMessageid());
|
builder.setServerTimestamp(message.getTimestamp());
|
if (message.getSenderId().equals(messageRelation.getTargetid())) {
|
builder.setFromUser(messageSend.getConversation().getTarget());
|
}
|
else {
|
builder.setFromUser(message.getSenderId());
|
}
|
|
WFCMessage.Conversation.Builder cb = WFCMessage.Conversation.newBuilder();
|
cb.setType(messageRelation.getType());
|
cb.setTarget(messageRelation.getTargetid());
|
cb.setLine(messageRelation.getLine());
|
builder.setConversation(cb.build());
|
|
WFCMessage.MessageContent.Builder contentBuilder = WFCMessage.MessageContent.newBuilder();
|
contentBuilder.setType(message.getType());
|
if (message.getType() == 104) {
|
contentBuilder.setData(ByteString.copyFrom(message.getContent().getBytes()));
|
}
|
else if (message.getType() == 90) {
|
contentBuilder.setContent(message.getContent());
|
}
|
else {
|
contentBuilder.setSearchableContent(message.getContent());
|
contentBuilder.setMediaType(0);
|
contentBuilder.setPersistFlag(message.getPersistflag());
|
contentBuilder.setExpireDuration(0);
|
contentBuilder.setMentionedType(0);
|
}
|
|
WFCMessage.MessageContent messageContent = contentBuilder.build();
|
|
builder.setContent(messageContent);
|
|
messageWFC = builder.build();
|
|
} catch(Exception e) {
|
e.printStackTrace();
|
}
|
return messageWFC;
|
}
|
|
public void addOneMessageRelations(MessageRelation relation) throws Exception {
|
//1. insert to database
|
relation.insertToDataBase(relation);
|
|
//2. add to bucket
|
messageRelationList.add(relation.getId(), relation);
|
}
|
|
public void setMemberAlias(String memberId, String alias) {
|
Member member = memberList.get(memberId);
|
|
if (member == null) {
|
return;
|
}
|
|
// member.set
|
|
// Friendship friendShip = mapList.get(key);
|
// friendShip.setAlias(friendId, alias);
|
}
|
|
public void notify(PublishOperator operator, String senderId) {
|
// TODO Auto-generated method stub
|
|
}
|
|
public void notify(PublishOperator operator, Object data, String senderId) {
|
// TODO Auto-generated method stub
|
|
}
|
|
public void notify(PublishOperator operator, MessageRecord record, String senderId) {
|
// TODO Auto-generated method stub
|
|
}
|
|
public void notify(PublishOperator operator, User sender) {
|
// TODO Auto-generated method stub
|
}
|
|
public void notify(PublishOperator operator, MessageRecord record, User sender) {
|
// TODO Auto-generated method stub
|
}
|
|
public boolean existsDeleteRight(String senderId) {
|
// TODO Auto-generated method stub
|
return false;
|
}
|
|
private static int insertOneMessage(MessageRecord message) throws Exception {
|
DataObject dataObject = DataObject.getInstance("message");
|
|
Entity entity = dataObject.newEntity();
|
message.pushTo(entity);
|
|
return dataObject.insertToDataBase(entity);
|
}
|
|
//向添加好友的账号,发送消息
|
public void notifyOneFriend(String topic, String receiver, long head) {
|
MqttPublishMessage publishMsg;
|
ByteBuf payload = Unpooled.buffer();
|
try {
|
payload.writeLong(head);
|
publishMsg = notRetainedPublish(topic, MqttQoS.AT_MOST_ONCE, payload);
|
|
Client client = UserStore.getById(receiver).getClient(receiver);
|
|
if (client == null) {
|
return;
|
}
|
Session sessionC = client.getSession();
|
if (sessionC != null) {
|
Channel channel = sessionC.getChannel();
|
if (publishMsg != null) {
|
channel.writeAndFlush(publishMsg);
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
//向添加好友的账号,发送消息
|
public void notifyOneFriendRequest(String topic, WFCMessage.AddFriendRequest request, long head, String fromUser) {
|
MqttPublishMessage publishMsg;
|
ByteBuf payload = Unpooled.buffer();
|
String receiver = request.getTargetUid();
|
//String pushContent = request.getReason();
|
try {
|
payload.writeLong(head);
|
publishMsg = notRetainedPublish(topic, MqttQoS.AT_MOST_ONCE, payload);
|
|
Client client = UserStore.getById(receiver).getClient(receiver);
|
|
if (client == null) {
|
return;
|
}
|
Session sessionC = client.getSession();
|
if (sessionC != null) {
|
Channel channel = sessionC.getChannel();
|
if (publishMsg != null) {
|
channel.writeAndFlush(publishMsg);
|
}
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
protected boolean notifyOneClient(int pullType, String targetId, boolean isExceptUser) {
|
boolean result = false;
|
try {
|
long messageSeq = System.currentTimeMillis();
|
WFCMessage.NotifyMessage notifyMessage = WFCMessage.NotifyMessage
|
.newBuilder()
|
.setType(pullType)
|
.setHead(messageSeq)
|
.build();
|
ByteBuf payload = Unpooled.buffer();
|
byte[] byteData = notifyMessage.toByteArray();
|
|
payload.ensureWritable(byteData.length).writeBytes(byteData);
|
MqttPublishMessage publishMsg;
|
publishMsg = notRetainedPublish(IMTopic.NotifyMessageTopic, MqttQoS.AT_MOST_ONCE, payload);
|
|
Client client = UserStore.getById(targetId).getClient(targetId);
|
|
if (client == null) {
|
return false;
|
}
|
Session sessionC = client.getSession();
|
if (sessionC != null) {
|
Channel channel = sessionC.getChannel();
|
if (publishMsg != null) {
|
channel.writeAndFlush(publishMsg);
|
}
|
}
|
result = true;
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
return result;
|
}
|
|
private void addOneMemberToDataBase(Member member) throws Exception {
|
DataObject dataObject = DataObject.getInstance("member");
|
|
Entity entity = dataObject.newEntity();
|
member.pushTo(entity);
|
|
dataObject.insertToDataBase(entity);
|
}
|
|
private void removeOneMemberToDataBase(Member member) throws Exception {
|
NamedSQL namedSQL = NamedSQL.getInstance("removeOneMember");
|
namedSQL.setParam("memberId", member.getId());
|
SQLRunner.execSQL(namedSQL);
|
}
|
|
public void saveMessage(String usr, Message messageT) {
|
// TODO Auto-generated method stub
|
}
|
|
public void saveMessageRelation(String usr, Message messageT) {
|
// TODO Auto-generated method stub
|
|
}
|
|
public static MessageRecord getLoadedMessage(String parentid) {
|
if (parentid == null) {
|
return null;
|
}
|
|
return allLoadedMessageMap.get(parentid);
|
}
|
|
private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
|
return notRetainedPublishWithMessageId(topic, qos, message, 0);
|
}
|
|
private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
|
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
|
return new MqttPublishMessage(fixedHeader, varHeader, message);
|
}
|
|
}
|