package chat.handler; import java.util.List; import java.util.concurrent.ThreadFactory; import com.google.protobuf.InvalidProtocolBufferException; import chat.message.SendMessageCallback; import chat.module.GroupBucket; import chat.module.entity.Group; import chat.module.entity.GroupType; import chat.module.entity.MessageRecord; import chat.module.entity.PublishOperator; import chat.server.call.CallObject; import chat.server.im.IMTopic; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessageType; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttPublishVariableHeader; import chat.server.moquette.message.MqttQoS; import chat.user.Client; import chat.user.Session; import chat.user.User; import cn.wildfirechat.proto.WFCMessage; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; public class GroupHandler extends CallObject { @Override protected void publishMethod() { addOneMethod("createGroup"); addOneMethod("quitGroup"); addOneMethod("getInfo"); addOneMethod("addMembers"); addOneMethod("getMemberList"); addOneMethod("setGroupManager"); addOneMethod("friendUnread"); addOneMethod("transferGroup"); addOneMethod("join"); addOneMethod("leave"); addOneMethod("kickoff"); addOneMethod("changeGroupName"); addOneMethod("changeMemberAlias"); addOneMethod("modifyGroupAlias"); addOneMethod("modifyGroup"); addOneMethod("pull"); addOneMethod("publish"); } public void createGroup() throws Exception { WFCMessage.CreateGroupRequest request = dataPool.getWFCMessage(WFCMessage.CreateGroupRequest.class); //1. create WFCMessage.Group wfcGroup = request.getGroup(); GroupType type = GroupType.parse(wfcGroup.getGroupInfo().getExtra()); GroupBucket groupBucket = GroupBucket.getInstance(type); Group group = groupBucket.getOrCreate(wfcGroup, user); //2.发送通知消息 //WFCMessage.GroupInfo groupInfo = group.createGroupInfo(user.getId(), request.getGroup().getGroupInfo(), request.getGroup().getMembersList()); WFCMessage.MessageContent content = group.createGroupMessage(wfcGroup.getGroupInfo(), user.getId()); groupBucket.sendGroupNotification(user, request, content, group); //3. return String groupId = group.getId(); byte[] data = groupId.getBytes(); resultPool.setBytes(data); //3. publish --hefeixia去掉 // SendMessageCallback callback = new SendMessageCallback(); // callback.setSender(user); // callback.setMemberList(group.getMemberList()); // callback.setMessage("hello "); // // resultPool.setCallback(callback); } public void quitGroup() throws Exception { //头 WFCMessage.QuitGroupRequest request = dataPool.getWFCMessage(WFCMessage.QuitGroupRequest.class); String groupId = request.getGroupId(); Group group = GroupBucket.getGroup(groupId); if (group == null) { return; } group.quit(); //2. return resultPool.setBytes(null); } public void getInfo() throws InvalidProtocolBufferException { //1. WFCMessage.PullUserRequest request = dataPool.getWFCMessage(WFCMessage.PullUserRequest.class); List groupList = request.getRequestList(); //2. List friendGroupList = GroupBucket.getGroupList(groupList); WFCMessage.PullGroupInfoResult.Builder builder = WFCMessage.PullGroupInfoResult.newBuilder(); builder.addAllInfo(friendGroupList); byte[] resBytes = builder.build().toByteArray(); resultPool.setBytes(resBytes); } public void addMembers() throws Exception { WFCMessage.AddGroupMemberRequest request = dataPool.getWFCMessage(WFCMessage.AddGroupMemberRequest.class); //1. add members String groupId = request.getGroupId(); Group group = GroupBucket.getGroup(groupId); group.addMembersWfc(groupId, request.getAddedMemberList()); //2. return resultPool.setBytes(null); //3. 发消息 SendMessageCallback callback = new SendMessageCallback(); callback.setSender(user); callback.setMemberList(group.getMemberList()); callback.setMessage("hello "); resultPool.setCallback(callback); } public void getMemberList() throws InvalidProtocolBufferException { //1. WFCMessage.PullGroupMemberRequest request = dataPool.getWFCMessage(WFCMessage.PullGroupMemberRequest.class); String groupId = request.getTarget(); Group friendGroup = GroupBucket.getGroup(groupId); if (friendGroup == null) { return; } //1. get member list List members = friendGroup.getWFCMemberList(); WFCMessage.PullGroupMemberResult.Builder builder = WFCMessage.PullGroupMemberResult.newBuilder(); builder.addAllMember(members); //2. result byte[] resBytes = builder.build().toByteArray(); resultPool.setBytes(resBytes); } // public void addMember() { // String userId = user.getId(); // String groupId = dataPool.getString("groupId"); // List memberList = dataPool.getList("memberlist"); // // Group FriendGroup = GroupBucket.getGroup(groupId); // // if (FriendGroup == null) { // return; // } // // //1. add members // FriendGroup.addMembers(groupId, memberList); // // //2. notify // FriendGroup.notify(PublishOperator.AddMembers, memberList, userId); // } // public void leave() { // String userId = user.getId(); // String groupId = dataPool.getString("groupId"); // // Group FriendGroup = GroupBucket.getGroup(groupId); // // if (FriendGroup == null) { // return; // } // // //1. notify // FriendGroup.notify(PublishOperator.Leave, userId); // // //2. delete member // FriendGroup.deleteOneMember(userId); // } public void kickoff() throws Exception { String userId = user.getId(); String groupId = dataPool.getString("groupId"); String memberId = dataPool.getString("memberId"); Group FriendGroup = GroupBucket.getGroup(groupId); if (FriendGroup == null) { return; } //1. notify FriendGroup.notify(PublishOperator.KickOff, memberId, userId); //2. delete member FriendGroup.deleteOneMember(memberId); } public void changeGroupName() { String userId = user.getId(); String groupId = dataPool.getString("groupId"); String groupName = dataPool.getString("groupName"); Group FriendGroup = GroupBucket.getGroup(groupId); if (FriendGroup == null) { return; } //1. change group name FriendGroup. setName(groupName); //2. notify FriendGroup.notify(PublishOperator.ChangeName, groupName, userId); } public void changeMemberAlias() { String groupId = dataPool.getString("groupId"); String memberId = dataPool.getString("memberId"); String alias = dataPool.getString("alias"); Group FriendGroup = GroupBucket.getGroup(groupId); if (FriendGroup == null) { return; } //1. change member alias FriendGroup.setMemberAlias(memberId, alias); } protected void pull() { String groupId = dataPool.getString("groupId"); long timeStamp = Long.parseLong(dataPool.getString("timePoint")); Group FriendGroup = GroupBucket.getGroup(groupId); if (FriendGroup == null) { return; } //1. get message list List messageList = FriendGroup.getMessageList(timeStamp); //2. result resultPool.add(messageList); } protected void publish() { String groupId = dataPool.getString("groupId"); MessageRecord record = dataPool.getMessageRecord(); Group FriendGroup = GroupBucket.getGroup(groupId); if (FriendGroup == null) { return; } //1. add to message list //FriendGroup.publishOneMessage(record, user, true, null); //2. publish FriendGroup.notify(PublishOperator.PublishMessage, record, user); } public void exeMessage(User user) { try { long messageSeq = MessageRecord.newMessageId(); WFCMessage.NotifyMessage notifyMessage = WFCMessage.NotifyMessage .newBuilder() .setType(0) .setHead(messageSeq) .build(); ByteBuf payload = Unpooled.buffer(); byte[] byteData = notifyMessage.toByteArray(); payload.ensureWritable(byteData.length).writeBytes(byteData); MqttPublishMessage publishMsg = notRetainedPublish(IMTopic.NotifyMessageTopic, MqttQoS.AT_MOST_ONCE, payload); Client client = user.getClient(user.getId()); Session sessionC = client.getSession(); if (sessionC != null) { Channel channel = sessionC.getChannel(); if (publishMsg != null) { channel.writeAndFlush(publishMsg); } } } catch(Exception e) { e.printStackTrace(); } } private ThreadFactory threadFactory = new ThreadFactory() { @Override public Thread newThread(final Runnable r) { return new Thread() { @Override public void run() { r.run(); } }; } }; public static MqttPublishMessage notRetainedPublish(String topic, MqttQoS qos, ByteBuf message) { return notRetainedPublishWithMessageId(topic, qos, message, 0); } private static MqttPublishMessage notRetainedPublishWithMessageId(String topic, MqttQoS qos, ByteBuf message, int messageId) { MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, qos, false, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId); return new MqttPublishMessage(fixedHeader, varHeader, message); } public void modifyGroupAlias() throws Exception { //头 WFCMessage.ModifyGroupMemberAlias request = (WFCMessage.ModifyGroupMemberAlias) dataPool.getWFCMessage(WFCMessage.ModifyGroupMemberAlias.class); //尾 resultPool.setBytes(null); } public void modifyGroup() throws Exception { //头 WFCMessage.ModifyGroupInfoRequest request = (WFCMessage.ModifyGroupInfoRequest) dataPool.getWFCMessage(WFCMessage.ModifyGroupInfoRequest.class); //尾 resultPool.setBytes(null); } public void setGroupManager() throws Exception { //头 WFCMessage.SetGroupManagerRequest request = (WFCMessage.SetGroupManagerRequest) dataPool.getWFCMessage(WFCMessage.SetGroupManagerRequest.class); //尾 resultPool.setBytes(null); } public void friendUnread() throws Exception { long[] head = new long[1]; //头 WFCMessage.Version request = (WFCMessage.Version) dataPool.getWFCMessage(WFCMessage.Version.class); //尾 resultPool.setBytes(null); } public void transferGroup() throws Exception { //头 WFCMessage.TransferGroupRequest request = (WFCMessage.TransferGroupRequest) dataPool.getWFCMessage(WFCMessage.TransferGroupRequest.class); //尾 resultPool.setBytes(null); } }