kimi
2020-05-27 c007f0ca1785db093d48f4846cda82fe8e955765
src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
@@ -1,14 +1,12 @@
package com.highdatas.mdm.service.impl;
import com.highdatas.mdm.entity.Maintain;
import com.highdatas.mdm.entity.MasterAuthor;
import com.highdatas.mdm.entity.SysDispenseLogs;
import com.highdatas.mdm.entity.TUser;
import com.highdatas.mdm.entity.*;
import com.highdatas.mdm.pojo.CodeMsg;
import com.highdatas.mdm.pojo.Page;
import com.highdatas.mdm.pojo.Result;
import com.highdatas.mdm.service.*;
import com.highdatas.mdm.util.Constant;
import com.highdatas.mdm.util.DbUtils;
import com.highdatas.mdm.util.pool.MqEntity;
import com.highdatas.mdm.util.pool.MqMessage;
import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
@@ -51,6 +49,12 @@
    @Value("${pool.coresize}")
    private String coreSizeStr;
    @Value("${pool.aes}")
    private Boolean aes;
    @Value("${pool.aesurl}")
    private String aesUrl;
    private int coreSize;
    private Integer queueSize = 5;
    private ExecutorService priorityExecutorService;
@@ -58,6 +62,13 @@
    private volatile LinkedBlockingQueue infiniteQueue;
    private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
    private  ThreadPoolExecutor infiniteExecutorService;
    /**
     *
     * @description:  初始化的时候创建主动,被动两个排队队列
     * @return: void
     *
     */
    @PostConstruct
    public void init() {
@@ -76,7 +87,33 @@
        this.passiveRequestList = new CopyOnWriteArrayList<>();
        createPassiveListenerThread();
    }
    /**
     *
     * @description:  获取是否需要
     * @return: 是否需要aes加密
     *
     */
    @Override
    public Boolean getAes() {
        return aes;
    }
    /**
     *
     * @description:  获取ars加密请求的路径
     * @return:  加密接口路径
     *
     */
    @Override
    public String getAesUrl() {
        return aesUrl;
    }
    /**
     *
     * @description:  创建主动分发队列
     * @return: void
     *
     */
    private void createPassiveListenerThread() {
        new Thread(() -> {
@@ -111,29 +148,63 @@
        }).start();
    }
    /**
     *
     * @description:  创建下页分发的数据包信息
     * @param  mqEntity 当前分发的entity数据
     * @return: void
     *
     */
    private MqMessage createNextSubMq(MqEntity mqEntity) {
        String type = mqEntity.getType();
        AtomicInteger pageNoAI = mqEntity.getPageNo();
        String logId = mqEntity.getLogId();
        SysDispenseLogs logs = dispenseLogsService.selectById(logId);
        if(pageNoAI == null) {
            String userId = mqEntity.getUserId();
            TUser user = DbUtils.getUserById(userId);
            if (user == null && logs != null) {
                logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById();
            } else if (logs != null) {
                logs.setUserName(user.getUserName());
                logs.updateById();
            }
            if (type.equalsIgnoreCase(Constant.Master)) {
                String dataId = mqEntity.getDataId();
                String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
                String userId = mqEntity.getUserId();
                TUser user = new TUser().setUserId(userId);
                Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
                MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
                masterAuthor.setIncrement(mqEntity.getIncrement());
                Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
                mqEntity.setPages(pagesAI);
                mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
                mqEntity.setPageSize(pageSizeAI);
                if (logs != null) {
                    SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName());
                    if (menuByTableName != null) {
                        logs.setName(menuByTableName.getName());
                    }
                    logs.setVersion(maintain.getVersion());
                    logs.updateById();
                }
            } else {
                Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
                mqEntity.setPages(pagesAI);
                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
                mqEntity.setPageSize(pageSizeAI);
                mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
                if (logs != null) {
                    SysView sysView = viewService.selectById(mqEntity.getDataId());
                    if (sysView != null) {
                        logs.setName(sysView.getName());
                        logs.updateById();
                    }
                }
            }
            pageNoAI = new AtomicInteger();
            mqEntity.setPageNo(pageNoAI);
@@ -142,15 +213,36 @@
        MqMessage mqMessage = new MqMessage(mqEntity);
        return mqMessage;
    }
    /**
     *
     * @description:  添加到主动分发队列
     * @param  message 数据包信息
     * @return: 是否添加成功
     *
     */
    @Override
    public boolean pushActiveMq(MqMessage message) {
        return pushActiveMq(message, null);
    }
    @Override
    public boolean pushActiveMq(MqMessage message, String touchType) {
        MqEntity mqEntity = message.getMqEntity();
        SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
        if (preUnSuccessByMqEntity == null) {
            SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
            logByMqEntity.setTouchType("sys").updateById();
            if (StringUtils.isEmpty(touchType)) {
                logByMqEntity.setTouchType(touchType);
            } else {
                logByMqEntity.setTouchType("sys");
            }
            logByMqEntity.updateById();
            mqEntity.setLogId(logByMqEntity.getId());
        } else {
            mqEntity.setLogId(preUnSuccessByMqEntity.getId());
            mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
            mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
            mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
@@ -161,12 +253,21 @@
        return true;
    }
    /**
     *
     * @description:  添加到被动分发队列
     * @param  message 数据包信息
     * @param  touchType 触发类型
     * @param  logs 日志对象
     * @return: 是否添加成功
     *
     */
    @Override
    public Result pushPassiveMq(MqMessage message, String touchType) {
    public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) {
        try{
            MqMessage preMsg = null;
            if (queueSize.equals(priorityQueue.size())) {
                logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById();
                return Result.error(CodeMsg.ADDQUEUE_OVER);
            }
            MqEntity mqEntity = message.getMqEntity();
@@ -187,6 +288,7 @@
            if (preMsg != null) {
                int cnt = preMsg.getCnt();
                preMsg.setCnt(cnt + 1);
                logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById();
                return Result.error(CodeMsg.REPEAT_ERROR);
            }else {
@@ -196,8 +298,7 @@
            priorityExecutorService.execute(message);
            SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
            if (preUnSuccessByMqEntity == null) {
                SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
                logByMqEntity.setTouchType(touchType).updateById();
               logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById();
            } else {
                mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
                mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
@@ -208,16 +309,29 @@
            return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
        }catch (Exception e) {
            log.error(e.getMessage());
            logs.setErrorInfo(e.getMessage());
            return Result.success(CodeMsg.ADDQUEUE_FAIL);
        }
    }
    /**
     *
     * @description:  获取主动分发队列中的排队数
     * @return: 主动分发排队数
     *
     */
    @Override
    public Integer passiveQueueSize() {
        return priorityQueue.size();
    }
    /**
     *
     * @description:  获取被动分发队列中的排队数
     * @return: 被动分发排队数
     *
     */
    @Override
    public Integer avtiveQueueSize() {
        return infiniteQueue.size();