| | |
| | | package com.highdatas.mdm.service.impl;
|
| | |
|
| | | import com.highdatas.mdm.entity.Maintain;
|
| | | import com.highdatas.mdm.entity.MasterAuthor;
|
| | | import com.highdatas.mdm.entity.SysDispenseLogs;
|
| | | import com.highdatas.mdm.entity.TUser;
|
| | | import com.highdatas.mdm.entity.*;
|
| | | import com.highdatas.mdm.pojo.CodeMsg;
|
| | | import com.highdatas.mdm.pojo.Page;
|
| | | import com.highdatas.mdm.pojo.Result;
|
| | | import com.highdatas.mdm.service.*;
|
| | | import com.highdatas.mdm.util.Constant;
|
| | | import com.highdatas.mdm.util.DbUtils;
|
| | | import com.highdatas.mdm.util.pool.MqEntity;
|
| | | import com.highdatas.mdm.util.pool.MqMessage;
|
| | | import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
|
| | |
| | |
|
| | | @Value("${pool.coresize}")
|
| | | private String coreSizeStr;
|
| | |
|
| | | @Value("${pool.aes}")
|
| | | private Boolean aes;
|
| | | @Value("${pool.aesurl}")
|
| | | private String aesUrl;
|
| | |
|
| | | private int coreSize;
|
| | | private Integer queueSize = 5;
|
| | | private ExecutorService priorityExecutorService;
|
| | |
| | | private volatile LinkedBlockingQueue infiniteQueue;
|
| | | private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
|
| | | private ThreadPoolExecutor infiniteExecutorService;
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 初始化的时候创建主动,被动两个排队队列
|
| | | * @return: void
|
| | | *
|
| | | */
|
| | |
|
| | | @PostConstruct
|
| | | public void init() {
|
| | |
| | | this.passiveRequestList = new CopyOnWriteArrayList<>();
|
| | | createPassiveListenerThread();
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 获取是否需要
|
| | | * @return: 是否需要aes加密
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public Boolean getAes() {
|
| | | return aes;
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 获取ars加密请求的路径
|
| | | * @return: 加密接口路径
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public String getAesUrl() {
|
| | | return aesUrl;
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 创建主动分发队列
|
| | | * @return: void
|
| | | *
|
| | | */
|
| | |
|
| | | private void createPassiveListenerThread() {
|
| | | new Thread(() -> {
|
| | |
| | | }).start();
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 创建下页分发的数据包信息
|
| | | * @param mqEntity 当前分发的entity数据
|
| | | * @return: void
|
| | | *
|
| | | */
|
| | | private MqMessage createNextSubMq(MqEntity mqEntity) {
|
| | | String type = mqEntity.getType();
|
| | | AtomicInteger pageNoAI = mqEntity.getPageNo();
|
| | | String logId = mqEntity.getLogId();
|
| | | SysDispenseLogs logs = dispenseLogsService.selectById(logId);
|
| | |
|
| | | if(pageNoAI == null) {
|
| | | String userId = mqEntity.getUserId();
|
| | | TUser user = DbUtils.getUserById(userId);
|
| | | if (user == null && logs != null) {
|
| | | logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById();
|
| | | } else if (logs != null) {
|
| | | logs.setUserName(user.getUserName());
|
| | | logs.updateById();
|
| | | }
|
| | | if (type.equalsIgnoreCase(Constant.Master)) {
|
| | | String dataId = mqEntity.getDataId();
|
| | | String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
|
| | | String userId = mqEntity.getUserId();
|
| | | TUser user = new TUser().setUserId(userId);
|
| | | Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
|
| | | MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
|
| | | masterAuthor.setIncrement(mqEntity.getIncrement());
|
| | | Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
|
| | | AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
|
| | | mqEntity.setPages(pagesAI);
|
| | | mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
| | | AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
|
| | | mqEntity.setPageSize(pageSizeAI);
|
| | | if (logs != null) {
|
| | | SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName());
|
| | | if (menuByTableName != null) {
|
| | | logs.setName(menuByTableName.getName());
|
| | | }
|
| | | logs.setVersion(maintain.getVersion());
|
| | | logs.updateById();
|
| | | }
|
| | |
|
| | | } else {
|
| | | Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
|
| | | AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
|
| | | mqEntity.setPages(pagesAI);
|
| | | AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
|
| | | mqEntity.setPageSize(pageSizeAI);
|
| | | mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
| | | if (logs != null) {
|
| | | SysView sysView = viewService.selectById(mqEntity.getDataId());
|
| | | if (sysView != null) {
|
| | | logs.setName(sysView.getName());
|
| | | logs.updateById();
|
| | | }
|
| | | }
|
| | | }
|
| | | pageNoAI = new AtomicInteger();
|
| | | mqEntity.setPageNo(pageNoAI);
|
| | |
| | | MqMessage mqMessage = new MqMessage(mqEntity);
|
| | | return mqMessage;
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 添加到主动分发队列
|
| | | * @param message 数据包信息
|
| | | * @return: 是否添加成功
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public boolean pushActiveMq(MqMessage message) {
|
| | | return pushActiveMq(message, null);
|
| | | }
|
| | | @Override
|
| | | public boolean pushActiveMq(MqMessage message, String touchType) {
|
| | | MqEntity mqEntity = message.getMqEntity();
|
| | | SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
|
| | | if (preUnSuccessByMqEntity == null) {
|
| | | SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
|
| | | logByMqEntity.setTouchType("sys").updateById();
|
| | | if (StringUtils.isEmpty(touchType)) {
|
| | | logByMqEntity.setTouchType(touchType);
|
| | | } else {
|
| | | logByMqEntity.setTouchType("sys");
|
| | | }
|
| | |
|
| | |
|
| | | logByMqEntity.updateById();
|
| | |
|
| | | mqEntity.setLogId(logByMqEntity.getId());
|
| | |
|
| | | } else {
|
| | | mqEntity.setLogId(preUnSuccessByMqEntity.getId());
|
| | | mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
|
| | | mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
|
| | | mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
|
| | |
| | |
|
| | | return true;
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 添加到被动分发队列
|
| | | * @param message 数据包信息
|
| | | * @param touchType 触发类型
|
| | | * @param logs 日志对象
|
| | | * @return: 是否添加成功
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public Result pushPassiveMq(MqMessage message, String touchType) {
|
| | | public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) {
|
| | | try{
|
| | | MqMessage preMsg = null;
|
| | | if (queueSize.equals(priorityQueue.size())) {
|
| | | logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById();
|
| | | return Result.error(CodeMsg.ADDQUEUE_OVER);
|
| | | }
|
| | | MqEntity mqEntity = message.getMqEntity();
|
| | |
| | | if (preMsg != null) {
|
| | | int cnt = preMsg.getCnt();
|
| | | preMsg.setCnt(cnt + 1);
|
| | | logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById();
|
| | | return Result.error(CodeMsg.REPEAT_ERROR);
|
| | |
|
| | | }else {
|
| | |
| | | priorityExecutorService.execute(message);
|
| | | SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
|
| | | if (preUnSuccessByMqEntity == null) {
|
| | | SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
|
| | | logByMqEntity.setTouchType(touchType).updateById();
|
| | | logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById();
|
| | | } else {
|
| | | mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
|
| | | mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
|
| | |
| | | return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
|
| | | }catch (Exception e) {
|
| | | log.error(e.getMessage());
|
| | | logs.setErrorInfo(e.getMessage());
|
| | | return Result.success(CodeMsg.ADDQUEUE_FAIL);
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 获取主动分发队列中的排队数
|
| | | * @return: 主动分发排队数
|
| | | *
|
| | | */
|
| | |
|
| | |
|
| | | @Override
|
| | | public Integer passiveQueueSize() {
|
| | | return priorityQueue.size();
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 获取被动分发队列中的排队数
|
| | | * @return: 被动分发排队数
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public Integer avtiveQueueSize() {
|
| | | return infiniteQueue.size();
|