package com.highdatas.mdm.util.pool; import com.alibaba.fastjson.JSONObject; import com.highdatas.mdm.entity.*; import com.highdatas.mdm.pojo.Page; import com.highdatas.mdm.pojo.Result; import com.highdatas.mdm.util.Constant; import com.highdatas.mdm.util.DbUtils; import lombok.extern.slf4j.Slf4j; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** * @author kimi * @description * @date 2020-04-14 12:49 */ @Slf4j public class MqMessage extends PriorityTask{ public MqMessage(MqEntity mqEntity) { super(mqEntity); } @Override public void run() { log.info("mq: {} ---------- run", mqEntity.toString()); String type = mqEntity.getType(); SysDispenseLogs logByMqEntity = DbUtils.dispenseLogsService.getLogByMqEntity(mqEntity); try{ if (type.equalsIgnoreCase(Constant.Master)) { sendMasterData(); } else if (type.equalsIgnoreCase(Constant.View)){ sendViewData(); } } catch (Exception e) { e.printStackTrace(); if (logByMqEntity != null) { logByMqEntity.setErrorInfo(e.getMessage()).updateById(); } } } private void sendMasterData() throws Exception { AtomicInteger pageNo = mqEntity.getPageNo(); AtomicInteger pageSize = mqEntity.getPageSize(); AtomicInteger pages = mqEntity.getPages(); String maintainId = mqEntity.getMaintainId(); String userId = mqEntity.getUserId(); TUser user = new TUser().setUserId(userId); Maintain maintain = DbUtils.maintainService.selectById(maintainId); long totalCnt = 0; if (pageSize == null) { MasterAuthor masterAuthor = new MasterAuthor().setTableName(maintain.getTableName()); Page initPageInfo = DbUtils.masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement()); pages = new AtomicInteger(initPageInfo.getPages()); pageSize = new AtomicInteger(initPageInfo.getPageSize()); mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); mqEntity.setPages(pages); mqEntity.setPageSize(pageSize); } if (pageNo == null) { pageNo = new AtomicInteger(1); mqEntity.setPageNo(pageNo); for (pageNo.get(); pageNo.get() <= pages.get(); pageNo.addAndGet(1)){ sendOneMasterPackage(pageNo, pageSize,totalCnt, maintain, user); } } else { sendOneMasterPackage(pageNo, pageSize, totalCnt, maintain, user); } } private void sendOneMasterPackage(AtomicInteger pageNo, AtomicInteger pageSize, long totalCnt, Maintain maintain, TUser user) throws Exception { Boolean increment = mqEntity.getIncrement(); List fieldByMaintain = DbUtils.fieldService.getFieldByMaintain(maintain.getId()); List fieldList = fieldByMaintain.stream().map(sysField -> sysField.getField()).collect(Collectors.toList()); String fields = fieldByMaintain.stream().map(sysField -> sysField.getField()).collect(Collectors.joining(Constant.COMMA)); String filter = DbUtils.masterAuthorService.getFilter(user, maintain.getId()); String content; if (increment) { String maintainTableName = maintain.getTableName(); String tempTableName = maintainTableName + Constant.RECORD; Page page = new Page(totalCnt); page.setPageSize(mqEntity.getPageSize().get()); page.setPageNo(mqEntity.getPageNo().get()); List> maps = DbUtils.maintainDetailMapper.selectMaintainDetail(fields, tempTableName, maintain.getId(), filter, page.getLimitSQL()); JSONObject object = new JSONObject(); object.fluentPut("total", page.getRecordCount()); object.fluentPut("size", page.getPageSize()); object.fluentPut("pages", page.getPageCount()); object.fluentPut("current", page.getPageNo()); object.fluentPut("records", maps); content = JSONObject.toJSONString(object); } else { Result result = DbUtils.masterDataService.selectListByPageByVersion(user, maintain.getTableName(), fieldList, filter, pageNo.get(), pageSize.get(), maintain.getVersion(), false); JSONObject data = (JSONObject) result.getData(); JSONObject grid = data.getJSONObject("grid"); JSONObject object = new JSONObject(); object.fluentPut("total", grid.get("total")); object.fluentPut("size", grid.get("size")); object.fluentPut("pages", grid.get("pages")); object.fluentPut("current", grid.get("current")); object.fluentPut("records", grid.get("record")); content = JSONObject.toJSONString(object); } sendMqBody(pageSize.get(), content); } private void sendViewData() throws Exception { //logs String dataId = mqEntity.getDataId(); AtomicInteger pageNo = mqEntity.getPageNo(); AtomicInteger pageSize = mqEntity.getPageSize(); AtomicInteger pages = mqEntity.getPages(); SysView sysView = DbUtils.viewService.selectById(dataId); ViewStatus status = sysView.getStatus(); if (!ViewStatus.working.equals(status)) { return; } if (pageSize == null) { Page initPageInfo = DbUtils.viewService.getInitPageInfo(sysView.getId()); pages = new AtomicInteger(initPageInfo.getPages()); pageSize = new AtomicInteger(initPageInfo.getPageSize()); mqEntity.setPages(pages); mqEntity.setPageSize(pageSize); mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); } if (pageNo == null) { //主动 pageNo =new AtomicInteger(1); mqEntity.setPageNo(pageNo); for (pageNo.get(); pageNo.get() <= pages.get(); pageNo.addAndGet(1)) { sendOneViewPackage(pageNo, pageSize, sysView); } } else { //被动 sendOneViewPackage(pageNo, pageSize, sysView); } } private void sendOneViewPackage(AtomicInteger pageNo, AtomicInteger pageSize, SysView sysView) throws Exception { Result viewData = DbUtils.viewService.getViewData(sysView, pageNo.get(), pageSize.get()); JSONObject object = (JSONObject) viewData.getData(); String content = JSONObject.toJSONString(object); sendMqBody(pageSize.get(), content); } private void sendMqBody(Integer pageSize, String content) throws Exception { byte[] bytes = content.getBytes("UTF-8"); String size = (bytes.length / 1024) + "k"; log.info(size); Date now = new Date(); Boolean aes = DbUtils.dispenseService.getAes(); String mqBody; if (aes) { String aesUrl = DbUtils.dispenseService.getAesUrl(); mqBody = com.datacvg.common.AESUtil.getInstance().encoder(content, mqEntity.getUserId(), aesUrl); } else { mqBody = content; } bytes = mqBody.getBytes("UTF-8"); size = (bytes.length / 1024) + "k"; log.info(size); Date after = new Date(); long l = after.getTime() - now.getTime(); log.info("time:"+l); // String mqBody = content; MqEntity.MsgBodyBean msgBody = mqEntity.getMsgBody(); msgBody.setData(mqBody); msgBody.setTotal(pageSize); mqEntity.setMsgBody(msgBody); // Thread.sleep(1000 * 60 * 3); mqEntity.send2Mq(); } }