package com.highdatas.mdm.service.impl; import com.highdatas.mdm.entity.*; import com.highdatas.mdm.pojo.CodeMsg; import com.highdatas.mdm.pojo.Page; import com.highdatas.mdm.pojo.Result; import com.highdatas.mdm.service.*; import com.highdatas.mdm.util.Constant; import com.highdatas.mdm.util.DbUtils; import com.highdatas.mdm.util.pool.MqEntity; import com.highdatas.mdm.util.pool.MqMessage; import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @author kimi * @description * @date 2020-04-14 12:58 */ @Service @Slf4j @ConfigurationProperties(prefix = "pool") public class DispenseServiceImpl implements DispenseService { @Autowired MasterDataService masterDataService; @Autowired IMasterAuthorService masterAuthorService; @Autowired ISysViewService viewService; @Autowired IMaintainService maintainService; @Autowired IMenuMappingService menuMappingService; @Autowired ISysDispenseLogsService dispenseLogsService; @Value("${pool.coresize}") private String coreSizeStr; @Value("${pool.aes}") private Boolean aes; @Value("${pool.aesurl}") private String aesUrl; private int coreSize; private Integer queueSize = 5; private ExecutorService priorityExecutorService; private volatile PriorityBlockingQueue priorityQueue; private volatile LinkedBlockingQueue infiniteQueue; private volatile CopyOnWriteArrayList passiveRequestList; private ThreadPoolExecutor infiniteExecutorService; /** * * @description: 初始化的时候创建主动,被动两个排队队列 * @return: void * */ @PostConstruct public void init() { if (StringUtils.isEmpty(coreSizeStr)) { //IO密集型 设置默认 cpu数量 预留 主动和被动两组线程池 coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue(); } else { coreSize = Integer.valueOf(coreSizeStr); } log.info("Queue_Consume_thread_size:{}",coreSize); this.priorityQueue = new PriorityBlockingQueue<>(queueSize); this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue); this.infiniteQueue = new LinkedBlockingQueue(); this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy()); this.passiveRequestList = new CopyOnWriteArrayList<>(); createPassiveListenerThread(); } /** * * @description: 获取是否需要 * @return: 是否需要aes加密 * */ @Override public Boolean getAes() { return aes; } /** * * @description: 获取ars加密请求的路径 * @return: 加密接口路径 * */ @Override public String getAesUrl() { return aesUrl; } /** * * @description: 创建主动分发队列 * @return: void * */ private void createPassiveListenerThread() { new Thread(() -> { log.info("被动接口准备待命。"); while (true) { try{ if (passiveRequestList.size() == 0) { Thread.sleep(5 * 1000); } List removedList = new ArrayList<>(); for (MqMessage mqMessage : passiveRequestList) { MqEntity mqEntity = mqMessage.getMqEntity(); AtomicInteger pageNo = mqEntity.getPageNo(); if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) { removedList.add(mqMessage); } else { MqMessage nextSubMq = createNextSubMq(mqEntity); infiniteExecutorService.execute(nextSubMq); } } for (MqMessage mqMessage : removedList) { passiveRequestList.remove(mqMessage); } } catch (Exception e){ e.printStackTrace(); //log } } }).start(); } /** * * @description: 创建下页分发的数据包信息 * @param mqEntity 当前分发的entity数据 * @return: void * */ private MqMessage createNextSubMq(MqEntity mqEntity) { String type = mqEntity.getType(); AtomicInteger pageNoAI = mqEntity.getPageNo(); String logId = mqEntity.getLogId(); SysDispenseLogs logs = dispenseLogsService.selectById(logId); if(pageNoAI == null) { String userId = mqEntity.getUserId(); TUser user = DbUtils.getUserById(userId); if (user == null && logs != null) { logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById(); } else if (logs != null) { logs.setUserName(user.getUserName()); logs.updateById(); } if (type.equalsIgnoreCase(Constant.Master)) { String dataId = mqEntity.getDataId(); String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId); Maintain maintain = maintainService.selectById(mqEntity.getMaintainId()); MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu); masterAuthor.setIncrement(mqEntity.getIncrement()); Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement()); AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages()); mqEntity.setPages(pagesAI); mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize()); mqEntity.setPageSize(pageSizeAI); if (logs != null) { SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName()); if (menuByTableName != null) { logs.setName(menuByTableName.getName()); } logs.setVersion(maintain.getVersion()); logs.updateById(); } } else { Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId()); AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages()); mqEntity.setPages(pagesAI); AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize()); mqEntity.setPageSize(pageSizeAI); mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); if (logs != null) { SysView sysView = viewService.selectById(mqEntity.getDataId()); if (sysView != null) { logs.setName(sysView.getName()); logs.updateById(); } } } pageNoAI = new AtomicInteger(); mqEntity.setPageNo(pageNoAI); } int i = pageNoAI.addAndGet(1); MqMessage mqMessage = new MqMessage(mqEntity); return mqMessage; } /** * * @description: 添加到主动分发队列 * @param message 数据包信息 * @return: 是否添加成功 * */ @Override public boolean pushActiveMq(MqMessage message) { return pushActiveMq(message, null); } @Override public boolean pushActiveMq(MqMessage message, String touchType) { MqEntity mqEntity = message.getMqEntity(); SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity); if (preUnSuccessByMqEntity == null) { SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity); if (StringUtils.isEmpty(touchType)) { logByMqEntity.setTouchType(touchType); } else { logByMqEntity.setTouchType("sys"); } logByMqEntity.updateById(); mqEntity.setLogId(logByMqEntity.getId()); } else { mqEntity.setLogId(preUnSuccessByMqEntity.getId()); mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId()); mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo())); mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize())); mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages())); } this.passiveRequestList.add(message); return true; } /** * * @description: 添加到被动分发队列 * @param message 数据包信息 * @param touchType 触发类型 * @param logs 日志对象 * @return: 是否添加成功 * */ @Override public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) { try{ MqMessage preMsg = null; if (queueSize.equals(priorityQueue.size())) { logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById(); return Result.error(CodeMsg.ADDQUEUE_OVER); } MqEntity mqEntity = message.getMqEntity(); for (Runnable runnable : priorityQueue) { if (!(runnable instanceof MqMessage)) { continue; } MqMessage mqMessage = (MqMessage) runnable; String code = mqMessage.getMqEntity().getMsgCode(); if (StringUtils.isEmpty(code)){ continue; } if (code.equalsIgnoreCase(mqEntity.getMsgCode())) { preMsg = (MqMessage) runnable; break; } } if (preMsg != null) { int cnt = preMsg.getCnt(); preMsg.setCnt(cnt + 1); logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById(); return Result.error(CodeMsg.REPEAT_ERROR); }else { message.setTime(new Date()); message.setCnt(0); } priorityExecutorService.execute(message); SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity); if (preUnSuccessByMqEntity == null) { logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById(); } else { mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId()); mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo())); mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize())); mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages())); } return Result.success(CodeMsg.ADDQUEUE_SUCCESS); }catch (Exception e) { log.error(e.getMessage()); logs.setErrorInfo(e.getMessage()); return Result.success(CodeMsg.ADDQUEUE_FAIL); } } /** * * @description: 获取主动分发队列中的排队数 * @return: 主动分发排队数 * */ @Override public Integer passiveQueueSize() { return priorityQueue.size(); } /** * * @description: 获取被动分发队列中的排队数 * @return: 被动分发排队数 * */ @Override public Integer avtiveQueueSize() { return infiniteQueue.size(); } }