| | |
| | | package com.highdatas.mdm.service.impl;
|
| | |
|
| | | import com.highdatas.mdm.entity.Maintain;
|
| | | import com.highdatas.mdm.service.DispenseService;
|
| | | import com.highdatas.mdm.service.MasterDataService;
|
| | | import com.highdatas.mdm.entity.*;
|
| | | import com.highdatas.mdm.pojo.CodeMsg;
|
| | | import com.highdatas.mdm.pojo.Page;
|
| | | import com.highdatas.mdm.pojo.Result;
|
| | | import com.highdatas.mdm.service.*;
|
| | | import com.highdatas.mdm.util.Constant;
|
| | | import com.highdatas.mdm.util.DbUtils;
|
| | | import com.highdatas.mdm.util.pool.MqEntity;
|
| | | import com.highdatas.mdm.util.pool.MqMessage;
|
| | | import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
|
| | | import lombok.extern.slf4j.Slf4j;
|
| | | import org.apache.commons.lang3.StringUtils;
|
| | | import org.springframework.beans.factory.annotation.Autowired;
|
| | | import org.springframework.beans.factory.annotation.Value;
|
| | | import org.springframework.boot.context.properties.ConfigurationProperties;
|
| | | import org.springframework.stereotype.Service;
|
| | |
|
| | | import javax.annotation.PostConstruct;
|
| | | import java.util.ArrayList;
|
| | | import java.util.Date;
|
| | | import java.util.List;
|
| | | import java.util.concurrent.*;
|
| | | import java.util.concurrent.atomic.AtomicInteger;
|
| | |
|
| | | /**
|
| | | * @author kimi
|
| | |
| | |
|
| | | @Service
|
| | | @Slf4j
|
| | | @ConfigurationProperties(prefix = "pool")
|
| | | public class DispenseServiceImpl implements DispenseService {
|
| | | @Autowired
|
| | | MasterDataService masterDataService;
|
| | | @Autowired
|
| | | IMasterAuthorService masterAuthorService;
|
| | | @Autowired
|
| | | ISysViewService viewService;
|
| | | @Autowired
|
| | | IMaintainService maintainService;
|
| | | @Autowired
|
| | | IMenuMappingService menuMappingService;
|
| | | @Autowired
|
| | | ISysDispenseLogsService dispenseLogsService;
|
| | |
|
| | | @Value("${pool.coresize}")
|
| | | private Integer coreSize;
|
| | | private ExecutorService executorService;
|
| | | private PriorityBlockingQueue<Runnable> queue;
|
| | | private LinkedBlockingQueue activeQueue;
|
| | | private ThreadPoolExecutor activeExecutorService;
|
| | | private String coreSizeStr;
|
| | |
|
| | | @Value("${pool.aes}")
|
| | | private Boolean aes;
|
| | | @Value("${pool.aesurl}")
|
| | | private String aesUrl;
|
| | |
|
| | | private int coreSize;
|
| | | private Integer queueSize = 5;
|
| | | private ExecutorService priorityExecutorService;
|
| | | private volatile PriorityBlockingQueue<Runnable> priorityQueue;
|
| | | private volatile LinkedBlockingQueue infiniteQueue;
|
| | | private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
|
| | | private ThreadPoolExecutor infiniteExecutorService;
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 初始化的时候创建主动,被动两个排队队列
|
| | | * @return: void
|
| | | *
|
| | | */
|
| | |
|
| | | @PostConstruct
|
| | | public void init() {
|
| | | if (coreSize == null && coreSize == 0) {
|
| | |
|
| | | if (StringUtils.isEmpty(coreSizeStr)) {
|
| | | //IO密集型 设置默认 cpu数量 预留 主动和被动两组线程池
|
| | | coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue();
|
| | | } else {
|
| | | coreSize = Integer.valueOf(coreSizeStr);
|
| | | }
|
| | | log.info("Queue_Consume_thread_size:{}",coreSize);
|
| | | this.queue = new PriorityBlockingQueue<>();
|
| | | this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue);
|
| | | this.activeQueue = new LinkedBlockingQueue();
|
| | | this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy());
|
| | | this.priorityQueue = new PriorityBlockingQueue<>(queueSize);
|
| | | this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue);
|
| | | this.infiniteQueue = new LinkedBlockingQueue();
|
| | | this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy());
|
| | | this.passiveRequestList = new CopyOnWriteArrayList<>();
|
| | | createPassiveListenerThread();
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 获取是否需要
|
| | | * @return: 是否需要aes加密
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public Boolean getAes() {
|
| | | return aes;
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 获取ars加密请求的路径
|
| | | * @return: 加密接口路径
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public String getAesUrl() {
|
| | | return aesUrl;
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 创建主动分发队列
|
| | | * @return: void
|
| | | *
|
| | | */
|
| | |
|
| | | private void createPassiveListenerThread() {
|
| | | new Thread(() -> {
|
| | | log.info("被动接口准备待命。");
|
| | | while (true) {
|
| | | try{
|
| | | if (passiveRequestList.size() == 0) {
|
| | | Thread.sleep(5 * 1000);
|
| | | }
|
| | | List<MqMessage> removedList = new ArrayList<>();
|
| | | for (MqMessage mqMessage : passiveRequestList) {
|
| | | MqEntity mqEntity = mqMessage.getMqEntity();
|
| | | AtomicInteger pageNo = mqEntity.getPageNo();
|
| | | if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) {
|
| | | removedList.add(mqMessage);
|
| | | } else {
|
| | | MqMessage nextSubMq = createNextSubMq(mqEntity);
|
| | | infiniteExecutorService.execute(nextSubMq);
|
| | | }
|
| | |
|
| | | }
|
| | | for (MqMessage mqMessage : removedList) {
|
| | | passiveRequestList.remove(mqMessage);
|
| | | }
|
| | |
|
| | | }
|
| | | catch (Exception e){
|
| | | e.printStackTrace();
|
| | | //log
|
| | | }
|
| | | }
|
| | | }).start();
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 创建下页分发的数据包信息
|
| | | * @param mqEntity 当前分发的entity数据
|
| | | * @return: void
|
| | | *
|
| | | */
|
| | | private MqMessage createNextSubMq(MqEntity mqEntity) {
|
| | | String type = mqEntity.getType();
|
| | | AtomicInteger pageNoAI = mqEntity.getPageNo();
|
| | | String logId = mqEntity.getLogId();
|
| | | SysDispenseLogs logs = dispenseLogsService.selectById(logId);
|
| | |
|
| | | if(pageNoAI == null) {
|
| | | String userId = mqEntity.getUserId();
|
| | | TUser user = DbUtils.getUserById(userId);
|
| | | if (user == null && logs != null) {
|
| | | logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById();
|
| | | } else if (logs != null) {
|
| | | logs.setUserName(user.getUserName());
|
| | | logs.updateById();
|
| | | }
|
| | | if (type.equalsIgnoreCase(Constant.Master)) {
|
| | | String dataId = mqEntity.getDataId();
|
| | | String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
|
| | | Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
|
| | | MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
|
| | | masterAuthor.setIncrement(mqEntity.getIncrement());
|
| | | Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
|
| | | AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
|
| | | mqEntity.setPages(pagesAI);
|
| | | mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
| | | AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
|
| | | mqEntity.setPageSize(pageSizeAI);
|
| | | if (logs != null) {
|
| | | SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName());
|
| | | if (menuByTableName != null) {
|
| | | logs.setName(menuByTableName.getName());
|
| | | }
|
| | | logs.setVersion(maintain.getVersion());
|
| | | logs.updateById();
|
| | | }
|
| | |
|
| | | } else {
|
| | | Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
|
| | | AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
|
| | | mqEntity.setPages(pagesAI);
|
| | | AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
|
| | | mqEntity.setPageSize(pageSizeAI);
|
| | | mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
| | | if (logs != null) {
|
| | | SysView sysView = viewService.selectById(mqEntity.getDataId());
|
| | | if (sysView != null) {
|
| | | logs.setName(sysView.getName());
|
| | | logs.updateById();
|
| | | }
|
| | | }
|
| | | }
|
| | | pageNoAI = new AtomicInteger();
|
| | | mqEntity.setPageNo(pageNoAI);
|
| | | }
|
| | | int i = pageNoAI.addAndGet(1);
|
| | | MqMessage mqMessage = new MqMessage(mqEntity);
|
| | | return mqMessage;
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 添加到主动分发队列
|
| | | * @param message 数据包信息
|
| | | * @return: 是否添加成功
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public boolean pushActiveMq(MqMessage message) {
|
| | | try{
|
| | | activeExecutorService.execute(message);
|
| | | return true;
|
| | | }catch (Exception e) {
|
| | | log.error(e.getMessage());
|
| | | return false;
|
| | | }
|
| | | return pushActiveMq(message, null);
|
| | | }
|
| | |
|
| | | @Override
|
| | | public boolean pushPassiveMq(MqMessage message) {
|
| | | public boolean pushActiveMq(MqMessage message, String touchType) {
|
| | | MqEntity mqEntity = message.getMqEntity();
|
| | | SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
|
| | | if (preUnSuccessByMqEntity == null) {
|
| | | SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
|
| | | if (StringUtils.isEmpty(touchType)) {
|
| | | logByMqEntity.setTouchType(touchType);
|
| | | } else {
|
| | | logByMqEntity.setTouchType("sys");
|
| | | }
|
| | |
|
| | |
|
| | | logByMqEntity.updateById();
|
| | |
|
| | | mqEntity.setLogId(logByMqEntity.getId());
|
| | |
|
| | | } else {
|
| | | mqEntity.setLogId(preUnSuccessByMqEntity.getId());
|
| | | mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
|
| | | mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
|
| | | mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
|
| | | mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
|
| | | }
|
| | |
|
| | | this.passiveRequestList.add(message);
|
| | |
|
| | | return true;
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 添加到被动分发队列
|
| | | * @param message 数据包信息
|
| | | * @param touchType 触发类型
|
| | | * @param logs 日志对象
|
| | | * @return: 是否添加成功
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) {
|
| | | try{
|
| | | MqMessage preMsg = null;
|
| | | for (Runnable runnable : queue) {
|
| | | if (queueSize.equals(priorityQueue.size())) {
|
| | | logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById();
|
| | | return Result.error(CodeMsg.ADDQUEUE_OVER);
|
| | | }
|
| | | MqEntity mqEntity = message.getMqEntity();
|
| | | for (Runnable runnable : priorityQueue) {
|
| | | if (!(runnable instanceof MqMessage)) {
|
| | | continue;
|
| | | }
|
| | | MqMessage mqMessage = (MqMessage) runnable;
|
| | | String code = mqMessage.getCode();
|
| | | String code = mqMessage.getMqEntity().getMsgCode();
|
| | | if (StringUtils.isEmpty(code)){
|
| | | continue;
|
| | | }
|
| | | if (code.equalsIgnoreCase(message.getCode())) {
|
| | | if (code.equalsIgnoreCase(mqEntity.getMsgCode())) {
|
| | | preMsg = (MqMessage) runnable;
|
| | | break;
|
| | | }
|
| | | }
|
| | | if (preMsg != null) {
|
| | | int cnt = preMsg.getCnt();
|
| | | preMsg.setCnt(cnt + 1);
|
| | | message.printRepeat();
|
| | | logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById();
|
| | | return Result.error(CodeMsg.REPEAT_ERROR);
|
| | |
|
| | | }else {
|
| | | message.setTime(new Date());
|
| | | message.setCnt(0);
|
| | | }
|
| | | executorService.execute(message);
|
| | | return true;
|
| | | priorityExecutorService.execute(message);
|
| | | SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
|
| | | if (preUnSuccessByMqEntity == null) {
|
| | | logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById();
|
| | | } else {
|
| | | mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
|
| | | mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
|
| | | mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
|
| | | mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
|
| | | }
|
| | |
|
| | | return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
|
| | | }catch (Exception e) {
|
| | | log.error(e.getMessage());
|
| | | return false;
|
| | | logs.setErrorInfo(e.getMessage());
|
| | | return Result.success(CodeMsg.ADDQUEUE_FAIL);
|
| | | }
|
| | | }
|
| | |
|
| | | @Override
|
| | | public PriorityBlockingQueue<Runnable> getQueue() {
|
| | | return queue;
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 获取主动分发队列中的排队数
|
| | | * @return: 主动分发排队数
|
| | | *
|
| | | */
|
| | |
|
| | |
|
| | | @Override
|
| | | public Integer passiveQueueSize() {
|
| | | return queue.size();
|
| | | return priorityQueue.size();
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 获取被动分发队列中的排队数
|
| | | * @return: 被动分发排队数
|
| | | *
|
| | | */
|
| | | @Override
|
| | | public Integer avtiveQueueSize() {
|
| | | return activeQueue.size();
|
| | | return infiniteQueue.size();
|
| | | }
|
| | |
|
| | | private void createView(Maintain baseMaintain) {
|
| | | String maintainTableName = baseMaintain.getTableName();
|
| | | String version = baseMaintain.getVersion();
|
| | |
|
| | | }
|
| | | }
|