package chat.module;
|
|
import java.util.ArrayList;
|
import java.util.Collection;
|
import java.util.LinkedHashSet;
|
import java.util.List;
|
import java.util.Set;
|
|
import chat.consts.ProtoConstants;
|
import chat.module.entity.ChatSpaceType;
|
import chat.module.entity.Group;
|
import chat.module.entity.Member;
|
import chat.module.entity.MessageContainer;
|
import chat.module.entity.MessageRecord;
|
import chat.server.im.IMTopic;
|
import chat.server.moquette.message.MqttFixedHeader;
|
import chat.server.moquette.message.MqttMessageType;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.server.moquette.message.MqttPublishVariableHeader;
|
import chat.server.moquette.message.MqttQoS;
|
import chat.user.Client;
|
import chat.user.Session;
|
import chat.user.User;
|
import chat.user.UserStore;
|
import chat.util.MessageShardingUtil;
|
import cn.wildfirechat.proto.WFCMessage;
|
import cn.wildfirechat.proto.WFCMessage.Message;
|
import frame.object.data.DataObject;
|
import frame.object.data.Entity;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.Channel;
|
|
import static chat.consts.ProtoConstants.PersistFlag.Transparent;
|
|
public class Bucket<T extends MessageContainer> {
|
|
private ConcurrentMapList<T> itemMap;
|
private static int pullType;
|
|
public Bucket() {
|
itemMap = new ConcurrentMapList<T>();
|
}
|
|
public static Bucket<?> getInstance(ChatSpaceType spaceType) {
|
if (ChatSpaceType.Private == spaceType) {
|
pullType = ProtoConstants.PullType.Pull_Normal;
|
return PrivateFriendBucket.getInstance();
|
}
|
else if (ChatSpaceType.Group == spaceType) {
|
pullType = ProtoConstants.PullType.Pull_Normal;
|
return GroupBucket.getInstance();
|
}
|
return null;
|
}
|
|
public void sendGroupNotification(User user, WFCMessage.CreateGroupRequest request, WFCMessage.MessageContent content, Group group) {
|
String targetId = group.getId();
|
List<Integer> lines = request.getToLineList();
|
|
if (lines == null) {
|
lines = new ArrayList<>();
|
} else {
|
lines = new ArrayList<>(lines);
|
}
|
|
if (lines.isEmpty()) {
|
lines.add(0);
|
}
|
|
for (int line : lines) {
|
long timestamp = System.currentTimeMillis();
|
WFCMessage.Message.Builder builder = WFCMessage.Message.newBuilder().setContent(content).setServerTimestamp(timestamp);
|
builder.setConversation(builder.getConversationBuilder().setType(ProtoConstants.ConversationType.ConversationType_Group).setTarget(targetId).setLine(line));
|
builder.setFromUser(user.getId());
|
try {
|
long messageId = MessageShardingUtil.generateId();
|
builder.setMessageId(messageId);
|
pushOneMessage(builder.build(), user, request);
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
|
public void pushOneMessage(Message message, User user) throws Exception {
|
ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, null, true));
|
}
|
|
public void pushOneMessage(Message message, User user, boolean isExceptUser) throws Exception {
|
ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, null, isExceptUser));
|
}
|
|
public void pushOneMessage(Message message, User user, WFCMessage.CreateGroupRequest request) throws Exception {
|
ModuleLoader.getImBusinessScheduler().execute(() -> doPushOneMessage(message, user, request, true));
|
}
|
|
public void doPushOneMessage(Message message, User user, WFCMessage.CreateGroupRequest request, boolean isExceptUser) {
|
//String containerId = getMessageContainerID(message);
|
//1.得到messageContainer 如果是私聊......,如果是群聊.....
|
|
Set<String> notifyReceivers = new LinkedHashSet<String>();
|
WFCMessage.Message.Builder messageBuilder = message.toBuilder();
|
|
int pullType = 0;
|
if (request == null) {
|
pullType = getNotifyReceivers(user.getId(), messageBuilder, notifyReceivers);
|
}
|
else {//是群组的消息
|
pullType = getGroupNotifyReceiver(request.getGroup().getMembersList(), notifyReceivers);
|
}
|
|
if (message.getContent().getPersistFlag() == Transparent) {
|
notifyReceivers.remove(messageBuilder.getFromUser());
|
}
|
|
if (message.getContent().getPersistFlag() == Transparent) {
|
publishTransparentMessage2Receivers(message, notifyReceivers, pullType);
|
return;
|
}
|
|
MessageContainer container = getMessageContainerID(message);
|
|
if (container == null) {
|
return;
|
}
|
|
String userId = user.getId();
|
String spaceCode = container.getOneMember(userId).getSpacecode();
|
String spaceId = container.getOneMember(userId).getSpaceid();
|
try {
|
MessageRecord record = MessageRecord.getInstance(spaceCode, spaceId, message);
|
container.addOneMessage(pullType, record, user, message, isExceptUser);
|
} catch(Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
private void publishTransparentMessage2Receivers(WFCMessage.Message message, Collection<String> receivers, int pullType) {
|
for (String user : receivers) {
|
ByteBuf payload = Unpooled.buffer();
|
byte[] byteData = message.toByteArray();
|
payload.ensureWritable(byteData.length).writeBytes(byteData);
|
MqttPublishMessage publishMsg;
|
publishMsg = notRetainedPublish(IMTopic.SendMessageTopic, MqttQoS.AT_MOST_ONCE, payload);
|
|
Client client = UserStore.getById(user).getClient(user);
|
|
Session sessionC = client.getSession();
|
if (sessionC != null) {
|
Channel channel = sessionC.getChannel();
|
if (publishMsg != null) {
|
channel.writeAndFlush(publishMsg);
|
}
|
}
|
}
|
}
|
|
private static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) {
|
return notRetainedPublishWithMessageId(topic, qos, message, 0);
|
}
|
|
private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) {
|
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0);
|
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId);
|
return new MqttPublishMessage(fixedHeader, varHeader, message);
|
}
|
|
public int getGroupNotifyReceiver(List<WFCMessage.GroupMember> members, Set<String> notifyReceivers) {
|
int pullType = ProtoConstants.PullType.Pull_Normal;
|
for (WFCMessage.GroupMember member : members) {
|
notifyReceivers.add(member.getMemberId());
|
}
|
|
return pullType;
|
}
|
|
public int getNotifyReceivers(String userId, WFCMessage.Message.Builder messageBuilder, Set<String> notifyReceivers) {
|
int pullType = ProtoConstants.PullType.Pull_Normal;
|
if (ChatSpaceType.Group == ChatSpaceType.pase(messageBuilder.getConversation().getType())) {
|
Group friendGroup = GroupBucket.getGroup(messageBuilder.getConversation().getTarget());
|
for(Member member : friendGroup.getMemberList()) {
|
notifyReceivers.add(member.getUserId());
|
}
|
}
|
else if (ChatSpaceType.Private == ChatSpaceType.pase(messageBuilder.getConversation().getType())){
|
notifyReceivers.add(userId);
|
notifyReceivers.add(messageBuilder.getConversation().getTarget());
|
}
|
|
return pullType;
|
}
|
|
private MessageContainer getMessageContainerID(Message message) {
|
MessageContainer msgContainer = null;
|
|
if (message.getConversation().getType() == 0) {
|
//取出所有私有的MeessageContainer,根据人来查找具体的私聊id(可以想象为一个特殊的群组,只有二个人)
|
//需要比较发送者与接收者
|
for (MessageContainer messageContainer : getAll()) {
|
if (messageContainer.getOneMember(message.getFromUser()) != null
|
&& messageContainer.getOneMember(message.getConversation().getTarget()) != null) {
|
msgContainer = messageContainer;
|
break;
|
}
|
}
|
}
|
else if (message.getConversation().getType() == 1) {
|
msgContainer = GroupBucket.getGroup(message.getConversation().getTarget());//itemMap.get(message.getConversation().getTarget());
|
}
|
|
return msgContainer;
|
}
|
|
public T createOne(String tableName, T one) throws Exception {
|
DataObject dataObject = DataObject.getInstance(tableName);
|
|
Entity entity = dataObject.newEntity();
|
one.pushTo(entity);
|
|
dataObject.insertToDataBase(entity);
|
return one;
|
}
|
|
public void addOne(String key, T item) {
|
itemMap.add(key, item);
|
}
|
|
public T getOne(String key) {
|
if (key == null) {
|
return null;
|
}
|
|
return itemMap.get(key);
|
}
|
|
public T deleteOne(String key) {
|
return itemMap.delete(key);
|
}
|
|
public List<T> getAll() {
|
return itemMap.getList();
|
}
|
|
public int size() {
|
return itemMap.size();
|
}
|
|
}
|