From c007f0ca1785db093d48f4846cda82fe8e955765 Mon Sep 17 00:00:00 2001 From: kimi <kimi42345@gmail.com> Date: 星期三, 27 五月 2020 09:59:29 +0800 Subject: [PATCH] merage --- src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java | 308 ++++++++++++++++++++++++++++++++++++++++++++------ 1 files changed, 268 insertions(+), 40 deletions(-) diff --git a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java index 3ae9347..694a810 100644 --- a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java +++ b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java @@ -1,19 +1,28 @@ package com.highdatas.mdm.service.impl; -import com.highdatas.mdm.entity.Maintain; -import com.highdatas.mdm.service.DispenseService; -import com.highdatas.mdm.service.MasterDataService; +import com.highdatas.mdm.entity.*; +import com.highdatas.mdm.pojo.CodeMsg; +import com.highdatas.mdm.pojo.Page; +import com.highdatas.mdm.pojo.Result; +import com.highdatas.mdm.service.*; +import com.highdatas.mdm.util.Constant; +import com.highdatas.mdm.util.DbUtils; +import com.highdatas.mdm.util.pool.MqEntity; import com.highdatas.mdm.util.pool.MqMessage; import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @author kimi @@ -23,91 +32,310 @@ @Service @Slf4j +@ConfigurationProperties(prefix = "pool") public class DispenseServiceImpl implements DispenseService { @Autowired MasterDataService masterDataService; + @Autowired + IMasterAuthorService masterAuthorService; + @Autowired + ISysViewService viewService; + @Autowired + IMaintainService maintainService; + @Autowired + IMenuMappingService menuMappingService; + @Autowired + ISysDispenseLogsService dispenseLogsService; + @Value("${pool.coresize}") - private Integer coreSize; - private ExecutorService executorService; - private PriorityBlockingQueue<Runnable> queue; - private LinkedBlockingQueue activeQueue; - private ThreadPoolExecutor activeExecutorService; + private String coreSizeStr; + + @Value("${pool.aes}") + private Boolean aes; + @Value("${pool.aesurl}") + private String aesUrl; + + private int coreSize; + private Integer queueSize = 5; + private ExecutorService priorityExecutorService; + private volatile PriorityBlockingQueue<Runnable> priorityQueue; + private volatile LinkedBlockingQueue infiniteQueue; + private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList; + private ThreadPoolExecutor infiniteExecutorService; + + /** + * + * @description: 鍒濆鍖栫殑鏃跺�欏垱寤轰富鍔紝琚姩涓や釜鎺掗槦闃熷垪 + * @return: void + * + */ @PostConstruct public void init() { - if (coreSize == null && coreSize == 0) { + + if (StringUtils.isEmpty(coreSizeStr)) { //IO瀵嗛泦鍨� 璁剧疆榛樿 cpu鏁伴噺 棰勭暀 涓诲姩鍜岃鍔ㄤ袱缁勭嚎绋嬫睜 coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue(); + } else { + coreSize = Integer.valueOf(coreSizeStr); } log.info("Queue_Consume_thread_size:{}",coreSize); - this.queue = new PriorityBlockingQueue<>(); - this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue); - this.activeQueue = new LinkedBlockingQueue(); - this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy()); + this.priorityQueue = new PriorityBlockingQueue<>(queueSize); + this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue); + this.infiniteQueue = new LinkedBlockingQueue(); + this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy()); + this.passiveRequestList = new CopyOnWriteArrayList<>(); + createPassiveListenerThread(); + } + /** + * + * @description: 鑾峰彇鏄惁闇�瑕� + * @return: 鏄惁闇�瑕乤es鍔犲瘑 + * + */ + @Override + public Boolean getAes() { + return aes; + } + /** + * + * @description: 鑾峰彇ars鍔犲瘑璇锋眰鐨勮矾寰� + * @return: 鍔犲瘑鎺ュ彛璺緞 + * + */ + @Override + public String getAesUrl() { + return aesUrl; } + /** + * + * @description: 鍒涘缓涓诲姩鍒嗗彂闃熷垪 + * @return: void + * + */ + + private void createPassiveListenerThread() { + new Thread(() -> { + log.info("琚姩鎺ュ彛鍑嗗寰呭懡銆�"); + while (true) { + try{ + if (passiveRequestList.size() == 0) { + Thread.sleep(5 * 1000); + } + List<MqMessage> removedList = new ArrayList<>(); + for (MqMessage mqMessage : passiveRequestList) { + MqEntity mqEntity = mqMessage.getMqEntity(); + AtomicInteger pageNo = mqEntity.getPageNo(); + if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) { + removedList.add(mqMessage); + } else { + MqMessage nextSubMq = createNextSubMq(mqEntity); + infiniteExecutorService.execute(nextSubMq); + } + + } + for (MqMessage mqMessage : removedList) { + passiveRequestList.remove(mqMessage); + } + + } + catch (Exception e){ + e.printStackTrace(); + //log + } + } + }).start(); + } + + /** + * + * @description: 鍒涘缓涓嬮〉鍒嗗彂鐨勬暟鎹寘淇℃伅 + * @param mqEntity 褰撳墠鍒嗗彂鐨別ntity鏁版嵁 + * @return: void + * + */ + private MqMessage createNextSubMq(MqEntity mqEntity) { + String type = mqEntity.getType(); + AtomicInteger pageNoAI = mqEntity.getPageNo(); + String logId = mqEntity.getLogId(); + SysDispenseLogs logs = dispenseLogsService.selectById(logId); + + if(pageNoAI == null) { + String userId = mqEntity.getUserId(); + TUser user = DbUtils.getUserById(userId); + if (user == null && logs != null) { + logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById(); + } else if (logs != null) { + logs.setUserName(user.getUserName()); + logs.updateById(); + } + if (type.equalsIgnoreCase(Constant.Master)) { + String dataId = mqEntity.getDataId(); + String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId); + Maintain maintain = maintainService.selectById(mqEntity.getMaintainId()); + MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu); + masterAuthor.setIncrement(mqEntity.getIncrement()); + Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement()); + AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages()); + mqEntity.setPages(pagesAI); + mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); + AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize()); + mqEntity.setPageSize(pageSizeAI); + if (logs != null) { + SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName()); + if (menuByTableName != null) { + logs.setName(menuByTableName.getName()); + } + logs.setVersion(maintain.getVersion()); + logs.updateById(); + } + + } else { + Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId()); + AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages()); + mqEntity.setPages(pagesAI); + AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize()); + mqEntity.setPageSize(pageSizeAI); + mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue())); + if (logs != null) { + SysView sysView = viewService.selectById(mqEntity.getDataId()); + if (sysView != null) { + logs.setName(sysView.getName()); + logs.updateById(); + } + } + } + pageNoAI = new AtomicInteger(); + mqEntity.setPageNo(pageNoAI); + } + int i = pageNoAI.addAndGet(1); + MqMessage mqMessage = new MqMessage(mqEntity); + return mqMessage; + } + /** + * + * @description: 娣诲姞鍒颁富鍔ㄥ垎鍙戦槦鍒� + * @param message 鏁版嵁鍖呬俊鎭� + * @return: 鏄惁娣诲姞鎴愬姛 + * + */ @Override public boolean pushActiveMq(MqMessage message) { - try{ - activeExecutorService.execute(message); - return true; - }catch (Exception e) { - log.error(e.getMessage()); - return false; - } + return pushActiveMq(message, null); } - @Override - public boolean pushPassiveMq(MqMessage message) { + public boolean pushActiveMq(MqMessage message, String touchType) { + MqEntity mqEntity = message.getMqEntity(); + SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity); + if (preUnSuccessByMqEntity == null) { + SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity); + if (StringUtils.isEmpty(touchType)) { + logByMqEntity.setTouchType(touchType); + } else { + logByMqEntity.setTouchType("sys"); + } + + + logByMqEntity.updateById(); + + mqEntity.setLogId(logByMqEntity.getId()); + + } else { + mqEntity.setLogId(preUnSuccessByMqEntity.getId()); + mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId()); + mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo())); + mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize())); + mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages())); + } + + this.passiveRequestList.add(message); + + return true; + } + /** + * + * @description: 娣诲姞鍒拌鍔ㄥ垎鍙戦槦鍒� + * @param message 鏁版嵁鍖呬俊鎭� + * @param touchType 瑙﹀彂绫诲瀷 + * @param logs 鏃ュ織瀵硅薄 + * @return: 鏄惁娣诲姞鎴愬姛 + * + */ + @Override + public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) { try{ MqMessage preMsg = null; - for (Runnable runnable : queue) { + if (queueSize.equals(priorityQueue.size())) { + logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById(); + return Result.error(CodeMsg.ADDQUEUE_OVER); + } + MqEntity mqEntity = message.getMqEntity(); + for (Runnable runnable : priorityQueue) { if (!(runnable instanceof MqMessage)) { continue; } MqMessage mqMessage = (MqMessage) runnable; - String code = mqMessage.getCode(); + String code = mqMessage.getMqEntity().getMsgCode(); if (StringUtils.isEmpty(code)){ continue; } - if (code.equalsIgnoreCase(message.getCode())) { + if (code.equalsIgnoreCase(mqEntity.getMsgCode())) { preMsg = (MqMessage) runnable; + break; } } if (preMsg != null) { int cnt = preMsg.getCnt(); preMsg.setCnt(cnt + 1); - message.printRepeat(); + logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById(); + return Result.error(CodeMsg.REPEAT_ERROR); + }else { message.setTime(new Date()); message.setCnt(0); } - executorService.execute(message); - return true; + priorityExecutorService.execute(message); + SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity); + if (preUnSuccessByMqEntity == null) { + logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById(); + } else { + mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId()); + mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo())); + mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize())); + mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages())); + } + + return Result.success(CodeMsg.ADDQUEUE_SUCCESS); }catch (Exception e) { log.error(e.getMessage()); - return false; + logs.setErrorInfo(e.getMessage()); + return Result.success(CodeMsg.ADDQUEUE_FAIL); } } - @Override - public PriorityBlockingQueue<Runnable> getQueue() { - return queue; - } + /** + * + * @description: 鑾峰彇涓诲姩鍒嗗彂闃熷垪涓殑鎺掗槦鏁� + * @return: 涓诲姩鍒嗗彂鎺掗槦鏁� + * + */ + @Override public Integer passiveQueueSize() { - return queue.size(); + return priorityQueue.size(); } - + /** + * + * @description: 鑾峰彇琚姩鍒嗗彂闃熷垪涓殑鎺掗槦鏁� + * @return: 琚姩鍒嗗彂鎺掗槦鏁� + * + */ @Override public Integer avtiveQueueSize() { - return activeQueue.size(); + return infiniteQueue.size(); } - private void createView(Maintain baseMaintain) { - String maintainTableName = baseMaintain.getTableName(); - String version = baseMaintain.getVersion(); - } } -- Gitblit v1.8.0