From 5268a2b7dfa556bd6f5a2d5e446cea3ea9940c10 Mon Sep 17 00:00:00 2001 From: kimi <kimi42345@gmail.com> Date: 星期三, 22 四月 2020 11:18:23 +0800 Subject: [PATCH] add 分发 master_author 添加字段 subscribe increment, 添加7个表 master_author_subscribe master_author_unactive sys_dispense_config sys_dispense_logs sys_view sys_view_join sys_view_logic --- src/main/java/com/highdatas/mdm/util/pool/MqMessage.java | 178 +++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 160 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/highdatas/mdm/util/pool/MqMessage.java b/src/main/java/com/highdatas/mdm/util/pool/MqMessage.java index a0d4746..10d9797 100644 --- a/src/main/java/com/highdatas/mdm/util/pool/MqMessage.java +++ b/src/main/java/com/highdatas/mdm/util/pool/MqMessage.java @@ -1,14 +1,18 @@ package com.highdatas.mdm.util.pool; import com.alibaba.fastjson.JSONObject; -import com.highdatas.mdm.pojo.CodeMsg; +import com.highdatas.mdm.entity.*; +import com.highdatas.mdm.pojo.Page; import com.highdatas.mdm.pojo.Result; +import com.highdatas.mdm.util.Constant; +import com.highdatas.mdm.util.DbUtils; import lombok.extern.slf4j.Slf4j; -import org.springframework.http.MediaType; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.io.PrintWriter; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; /** * @author kimi @@ -18,23 +22,161 @@ @Slf4j public class MqMessage extends PriorityTask{ - public MqMessage(String code, HttpServletResponse response) { - super(code,response); + + + public MqMessage(MqEntity mqEntity) { + super(mqEntity); } @Override public void run() { - try { - log.info("code {} ---------- run", code); - Thread.sleep(1000 * 30 * 1); - response.setCharacterEncoding("UTF-8"); - response.setContentType(MediaType.APPLICATION_JSON_VALUE); - PrintWriter writer = response.getWriter(); - writer.write(JSONObject.toJSONString(Result.success(CodeMsg.SUCCESS))); - } catch (InterruptedException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); + log.info("mq: {} ---------- run", mqEntity.toString()); + String type = mqEntity.getType(); + try{ + if (type.equalsIgnoreCase(Constant.Master)) { + sendMasterData(); + } else if (type.equalsIgnoreCase(Constant.View)){ + sendViewData(); + } } + catch (Exception e) { + e.printStackTrace(); + //log + } + + } + + private void sendMasterData() throws Exception { + AtomicInteger pageNo = mqEntity.getPageNo(); + AtomicInteger pageSize = mqEntity.getPageSize(); + AtomicInteger pages = mqEntity.getPages(); + String maintainId = mqEntity.getMaintainId(); + String userId = mqEntity.getUserId(); + TUser user = new TUser().setUserId(userId); + Maintain maintain = DbUtils.maintainService.selectById(maintainId); + long totalCnt = 0; + + if (pageSize == null) { + MasterAuthor masterAuthor = new MasterAuthor().setTableName(maintain.getTableName()); + Page initPageInfo = DbUtils.masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement()); + pages = new AtomicInteger(initPageInfo.getPages()); + pageSize = new AtomicInteger(initPageInfo.getPageSize()); + mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); + mqEntity.setPages(pages); + mqEntity.setPageSize(pageSize); + } + + if (pageNo == null) { + pageNo = new AtomicInteger(1); + mqEntity.setPageNo(pageNo); + for (pageNo.get(); pageNo.get() <= pages.get(); pageNo.addAndGet(1)){ + sendOneMasterPackage(pageNo, pageSize,totalCnt, maintain, user); + } + } else { + sendOneMasterPackage(pageNo, pageSize, totalCnt, maintain, user); + } + } + + private void sendOneMasterPackage(AtomicInteger pageNo, AtomicInteger pageSize, long totalCnt, Maintain maintain, TUser user) throws Exception { + Boolean increment = mqEntity.getIncrement(); + List<SysField> fieldByMaintain = DbUtils.fieldService.getFieldByMaintain(maintain.getId()); + List<String> fieldList = fieldByMaintain.stream().map(sysField -> sysField.getField()).collect(Collectors.toList()); + String fields = fieldByMaintain.stream().map(sysField -> sysField.getField()).collect(Collectors.joining(Constant.COMMA)); + + String filter = DbUtils.masterAuthorService.getFilter(user, maintain.getId()); + String content; + if (increment) { + String maintainTableName = maintain.getTableName(); + String tempTableName = maintainTableName + Constant.RECORD; + Page page = new Page(totalCnt); + page.setPageSize(mqEntity.getPageSize().get()); + page.setPageNo(mqEntity.getPageNo().get()); + List<Map<String, Object>> maps = DbUtils.maintainDetailMapper.selectMaintainDetail(fields, tempTableName, maintain.getId(), filter, page.getLimitSQL()); + JSONObject object = new JSONObject(); + object.fluentPut("total", page.getRecordCount()); + object.fluentPut("size", page.getPageSize()); + object.fluentPut("pages", page.getPageCount()); + object.fluentPut("current", page.getPageNo()); + object.fluentPut("records", maps); + content = JSONObject.toJSONString(object); + } else { + Result result = DbUtils.masterDataService.selectListByPageByVersion(user, maintain.getTableName(), fieldList, filter, pageNo.get(), pageSize.get(), maintain.getVersion(), false); + JSONObject data = (JSONObject) result.getData(); + JSONObject grid = data.getJSONObject("grid"); + JSONObject object = new JSONObject(); + object.fluentPut("total", grid.get("total")); + object.fluentPut("size", grid.get("size")); + object.fluentPut("pages", grid.get("pages")); + object.fluentPut("current", grid.get("current")); + object.fluentPut("records", grid.get("record")); + + content = JSONObject.toJSONString(object); + + } + sendMqBody(pageSize.get(), content); + } + + private void sendViewData() throws Exception { + //logs + String dataId = mqEntity.getDataId(); + AtomicInteger pageNo = mqEntity.getPageNo(); + AtomicInteger pageSize = mqEntity.getPageSize(); + AtomicInteger pages = mqEntity.getPages(); + SysView sysView = DbUtils.viewService.selectById(dataId); + ViewStatus status = sysView.getStatus(); + if (!ViewStatus.working.equals(status)) { + return; + } + if (pageSize == null) { + Page initPageInfo = DbUtils.viewService.getInitPageInfo(sysView.getId()); + pages = new AtomicInteger(initPageInfo.getPages()); + pageSize = new AtomicInteger(initPageInfo.getPageSize()); + mqEntity.setPages(pages); + mqEntity.setPageSize(pageSize); + mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); + } + if (pageNo == null) { + //涓诲姩 + pageNo =new AtomicInteger(1); + mqEntity.setPageNo(pageNo); + for (pageNo.get(); pageNo.get() <= pages.get(); pageNo.addAndGet(1)) { + sendOneViewPackage(pageNo, pageSize, sysView); + } + + } else { + //琚姩 + sendOneViewPackage(pageNo, pageSize, sysView); + } + + } + + private void sendOneViewPackage(AtomicInteger pageNo, AtomicInteger pageSize, SysView sysView) throws Exception { + Result viewData = DbUtils.viewService.getViewData(sysView, pageNo.get(), pageSize.get()); + JSONObject object = (JSONObject) viewData.getData(); + String content = JSONObject.toJSONString(object); + sendMqBody(pageSize.get(), content); + } + + private void sendMqBody(Integer pageSize, String content) throws Exception { + byte[] bytes = content.getBytes("UTF-8"); + String size = (bytes.length / 1024) + "k"; + log.info(size); + Date now = new Date(); + +// String mqBody = com.datacvg.common.AESUtil.getInstance().encoder(content, mqEntity.getUserId(),"http://180.169.94.250:9107"); + String mqBody = content; + bytes = mqBody.getBytes("UTF-8"); + size = (bytes.length / 1024) + "k"; + log.info(size); + Date after = new Date(); + long l = after.getTime() - now.getTime(); + log.info("time:"+l); +// String mqBody = content; + MqEntity.MsgBodyBean msgBody = mqEntity.getMsgBody(); + msgBody.setData(mqBody); + msgBody.setTotal(pageSize); + mqEntity.setMsgBody(msgBody); + Thread.sleep(1000 * 60 * 3); + mqEntity.send2Mq(); } } -- Gitblit v1.8.0