package com.highdatas.mdm.util.pool;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.highdatas.mdm.entity.*;
|
import com.highdatas.mdm.pojo.Page;
|
import com.highdatas.mdm.pojo.Result;
|
import com.highdatas.mdm.util.Constant;
|
import com.highdatas.mdm.util.DbUtils;
|
import lombok.extern.slf4j.Slf4j;
|
|
import java.util.Date;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.stream.Collectors;
|
|
/**
|
* @author kimi
|
* @description
|
* @date 2020-04-14 12:49
|
*/
|
|
@Slf4j
|
public class MqMessage extends PriorityTask{
|
|
|
public MqMessage(MqEntity mqEntity) {
|
super(mqEntity);
|
}
|
|
@Override
|
public void run() {
|
log.info("mq: {} ---------- run", mqEntity.toString());
|
String type = mqEntity.getType();
|
try{
|
if (type.equalsIgnoreCase(Constant.Master)) {
|
sendMasterData();
|
} else if (type.equalsIgnoreCase(Constant.View)){
|
sendViewData();
|
}
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
//log
|
}
|
|
}
|
|
private void sendMasterData() throws Exception {
|
AtomicInteger pageNo = mqEntity.getPageNo();
|
AtomicInteger pageSize = mqEntity.getPageSize();
|
AtomicInteger pages = mqEntity.getPages();
|
String maintainId = mqEntity.getMaintainId();
|
String userId = mqEntity.getUserId();
|
TUser user = new TUser().setUserId(userId);
|
Maintain maintain = DbUtils.maintainService.selectById(maintainId);
|
long totalCnt = 0;
|
|
if (pageSize == null) {
|
MasterAuthor masterAuthor = new MasterAuthor().setTableName(maintain.getTableName());
|
Page initPageInfo = DbUtils.masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
|
pages = new AtomicInteger(initPageInfo.getPages());
|
pageSize = new AtomicInteger(initPageInfo.getPageSize());
|
mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
mqEntity.setPages(pages);
|
mqEntity.setPageSize(pageSize);
|
}
|
|
if (pageNo == null) {
|
pageNo = new AtomicInteger(1);
|
mqEntity.setPageNo(pageNo);
|
for (pageNo.get(); pageNo.get() <= pages.get(); pageNo.addAndGet(1)){
|
sendOneMasterPackage(pageNo, pageSize,totalCnt, maintain, user);
|
}
|
} else {
|
sendOneMasterPackage(pageNo, pageSize, totalCnt, maintain, user);
|
}
|
}
|
|
private void sendOneMasterPackage(AtomicInteger pageNo, AtomicInteger pageSize, long totalCnt, Maintain maintain, TUser user) throws Exception {
|
Boolean increment = mqEntity.getIncrement();
|
List<SysField> fieldByMaintain = DbUtils.fieldService.getFieldByMaintain(maintain.getId());
|
List<String> fieldList = fieldByMaintain.stream().map(sysField -> sysField.getField()).collect(Collectors.toList());
|
String fields = fieldByMaintain.stream().map(sysField -> sysField.getField()).collect(Collectors.joining(Constant.COMMA));
|
|
String filter = DbUtils.masterAuthorService.getFilter(user, maintain.getId());
|
String content;
|
if (increment) {
|
String maintainTableName = maintain.getTableName();
|
String tempTableName = maintainTableName + Constant.RECORD;
|
Page page = new Page(totalCnt);
|
page.setPageSize(mqEntity.getPageSize().get());
|
page.setPageNo(mqEntity.getPageNo().get());
|
List<Map<String, Object>> maps = DbUtils.maintainDetailMapper.selectMaintainDetail(fields, tempTableName, maintain.getId(), filter, page.getLimitSQL());
|
JSONObject object = new JSONObject();
|
object.fluentPut("total", page.getRecordCount());
|
object.fluentPut("size", page.getPageSize());
|
object.fluentPut("pages", page.getPageCount());
|
object.fluentPut("current", page.getPageNo());
|
object.fluentPut("records", maps);
|
content = JSONObject.toJSONString(object);
|
} else {
|
Result result = DbUtils.masterDataService.selectListByPageByVersion(user, maintain.getTableName(), fieldList, filter, pageNo.get(), pageSize.get(), maintain.getVersion(), false);
|
JSONObject data = (JSONObject) result.getData();
|
JSONObject grid = data.getJSONObject("grid");
|
JSONObject object = new JSONObject();
|
object.fluentPut("total", grid.get("total"));
|
object.fluentPut("size", grid.get("size"));
|
object.fluentPut("pages", grid.get("pages"));
|
object.fluentPut("current", grid.get("current"));
|
object.fluentPut("records", grid.get("record"));
|
|
content = JSONObject.toJSONString(object);
|
|
}
|
sendMqBody(pageSize.get(), content);
|
}
|
|
private void sendViewData() throws Exception {
|
//logs
|
String dataId = mqEntity.getDataId();
|
AtomicInteger pageNo = mqEntity.getPageNo();
|
AtomicInteger pageSize = mqEntity.getPageSize();
|
AtomicInteger pages = mqEntity.getPages();
|
SysView sysView = DbUtils.viewService.selectById(dataId);
|
ViewStatus status = sysView.getStatus();
|
if (!ViewStatus.working.equals(status)) {
|
return;
|
}
|
if (pageSize == null) {
|
Page initPageInfo = DbUtils.viewService.getInitPageInfo(sysView.getId());
|
pages = new AtomicInteger(initPageInfo.getPages());
|
pageSize = new AtomicInteger(initPageInfo.getPageSize());
|
mqEntity.setPages(pages);
|
mqEntity.setPageSize(pageSize);
|
mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
}
|
if (pageNo == null) {
|
//主动
|
pageNo =new AtomicInteger(1);
|
mqEntity.setPageNo(pageNo);
|
for (pageNo.get(); pageNo.get() <= pages.get(); pageNo.addAndGet(1)) {
|
sendOneViewPackage(pageNo, pageSize, sysView);
|
}
|
|
} else {
|
//被动
|
sendOneViewPackage(pageNo, pageSize, sysView);
|
}
|
|
}
|
|
private void sendOneViewPackage(AtomicInteger pageNo, AtomicInteger pageSize, SysView sysView) throws Exception {
|
Result viewData = DbUtils.viewService.getViewData(sysView, pageNo.get(), pageSize.get());
|
JSONObject object = (JSONObject) viewData.getData();
|
String content = JSONObject.toJSONString(object);
|
sendMqBody(pageSize.get(), content);
|
}
|
|
private void sendMqBody(Integer pageSize, String content) throws Exception {
|
byte[] bytes = content.getBytes("UTF-8");
|
String size = (bytes.length / 1024) + "k";
|
log.info(size);
|
Date now = new Date();
|
|
// String mqBody = com.datacvg.common.AESUtil.getInstance().encoder(content, mqEntity.getUserId(),"http://180.169.94.250:9107");
|
String mqBody = content;
|
bytes = mqBody.getBytes("UTF-8");
|
size = (bytes.length / 1024) + "k";
|
log.info(size);
|
Date after = new Date();
|
long l = after.getTime() - now.getTime();
|
log.info("time:"+l);
|
// String mqBody = content;
|
MqEntity.MsgBodyBean msgBody = mqEntity.getMsgBody();
|
msgBody.setData(mqBody);
|
msgBody.setTotal(pageSize);
|
mqEntity.setMsgBody(msgBody);
|
Thread.sleep(1000 * 60 * 3);
|
mqEntity.send2Mq();
|
}
|
}
|