From 5268a2b7dfa556bd6f5a2d5e446cea3ea9940c10 Mon Sep 17 00:00:00 2001 From: kimi <kimi42345@gmail.com> Date: 星期三, 22 四月 2020 11:18:23 +0800 Subject: [PATCH] add 分发 master_author 添加字段 subscribe increment, 添加7个表 master_author_subscribe master_author_unactive sys_dispense_config sys_dispense_logs sys_view sys_view_join sys_view_logic --- src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java | 186 +++++++++++++++++++++++++++++++++++++--------- 1 files changed, 150 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java index 3ae9347..8b397a7 100644 --- a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java +++ b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java @@ -1,19 +1,30 @@ package com.highdatas.mdm.service.impl; import com.highdatas.mdm.entity.Maintain; -import com.highdatas.mdm.service.DispenseService; -import com.highdatas.mdm.service.MasterDataService; +import com.highdatas.mdm.entity.MasterAuthor; +import com.highdatas.mdm.entity.SysDispenseLogs; +import com.highdatas.mdm.entity.TUser; +import com.highdatas.mdm.pojo.CodeMsg; +import com.highdatas.mdm.pojo.Page; +import com.highdatas.mdm.pojo.Result; +import com.highdatas.mdm.service.*; +import com.highdatas.mdm.util.Constant; +import com.highdatas.mdm.util.pool.MqEntity; import com.highdatas.mdm.util.pool.MqMessage; import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * @author kimi @@ -23,91 +34,194 @@ @Service @Slf4j +@ConfigurationProperties(prefix = "pool") public class DispenseServiceImpl implements DispenseService { @Autowired MasterDataService masterDataService; + @Autowired + IMasterAuthorService masterAuthorService; + @Autowired + ISysViewService viewService; + @Autowired + IMaintainService maintainService; + @Autowired + IMenuMappingService menuMappingService; + @Autowired + ISysDispenseLogsService dispenseLogsService; + @Value("${pool.coresize}") - private Integer coreSize; - private ExecutorService executorService; - private PriorityBlockingQueue<Runnable> queue; - private LinkedBlockingQueue activeQueue; - private ThreadPoolExecutor activeExecutorService; + private String coreSizeStr; + private int coreSize; + private Integer queueSize = 5; + private ExecutorService priorityExecutorService; + private volatile PriorityBlockingQueue<Runnable> priorityQueue; + private volatile LinkedBlockingQueue infiniteQueue; + private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList; + private ThreadPoolExecutor infiniteExecutorService; @PostConstruct public void init() { - if (coreSize == null && coreSize == 0) { + + if (StringUtils.isEmpty(coreSizeStr)) { //IO瀵嗛泦鍨� 璁剧疆榛樿 cpu鏁伴噺 棰勭暀 涓诲姩鍜岃鍔ㄤ袱缁勭嚎绋嬫睜 coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue(); + } else { + coreSize = Integer.valueOf(coreSizeStr); } log.info("Queue_Consume_thread_size:{}",coreSize); - this.queue = new PriorityBlockingQueue<>(); - this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue); - this.activeQueue = new LinkedBlockingQueue(); - this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy()); + this.priorityQueue = new PriorityBlockingQueue<>(queueSize); + this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue); + this.infiniteQueue = new LinkedBlockingQueue(); + this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy()); + this.passiveRequestList = new CopyOnWriteArrayList<>(); + createPassiveListenerThread(); + } + + + private void createPassiveListenerThread() { + new Thread(() -> { + log.info("琚姩鎺ュ彛鍑嗗寰呭懡銆�"); + while (true) { + try{ + if (passiveRequestList.size() == 0) { + Thread.sleep(5 * 1000); + } + List<MqMessage> removedList = new ArrayList<>(); + for (MqMessage mqMessage : passiveRequestList) { + MqEntity mqEntity = mqMessage.getMqEntity(); + AtomicInteger pageNo = mqEntity.getPageNo(); + if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) { + removedList.add(mqMessage); + } else { + MqMessage nextSubMq = createNextSubMq(mqEntity); + infiniteExecutorService.execute(nextSubMq); + } + + } + for (MqMessage mqMessage : removedList) { + passiveRequestList.remove(mqMessage); + } + + } + catch (Exception e){ + e.printStackTrace(); + //log + } + } + }).start(); + } + + private MqMessage createNextSubMq(MqEntity mqEntity) { + String type = mqEntity.getType(); + AtomicInteger pageNoAI = mqEntity.getPageNo(); + if(pageNoAI == null) { + if (type.equalsIgnoreCase(Constant.Master)) { + String dataId = mqEntity.getDataId(); + String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId); + String userId = mqEntity.getUserId(); + TUser user = new TUser().setUserId(userId); + Maintain maintain = maintainService.selectById(mqEntity.getMaintainId()); + MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu); + masterAuthor.setIncrement(mqEntity.getIncrement()); + Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement()); + AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages()); + mqEntity.setPages(pagesAI); + AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize()); + mqEntity.setPageSize(pageSizeAI); + } else { + Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId()); + AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages()); + mqEntity.setPages(pagesAI); + AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize()); + mqEntity.setPageSize(pageSizeAI); + } + pageNoAI = new AtomicInteger(); + mqEntity.setPageNo(pageNoAI); + } + int i = pageNoAI.addAndGet(1); + MqMessage mqMessage = new MqMessage(mqEntity); + return mqMessage; } @Override public boolean pushActiveMq(MqMessage message) { - try{ - activeExecutorService.execute(message); - return true; - }catch (Exception e) { - log.error(e.getMessage()); - return false; + MqEntity mqEntity = message.getMqEntity(); + SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity); + if (preUnSuccessByMqEntity == null) { + SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity); + logByMqEntity.setTouchType("sys").updateById(); + } else { + mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId()); + mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo())); + mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize())); + mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages())); } + + this.passiveRequestList.add(message); + + return true; } @Override - public boolean pushPassiveMq(MqMessage message) { + public Result pushPassiveMq(MqMessage message, String touchType) { try{ MqMessage preMsg = null; - for (Runnable runnable : queue) { + if (queueSize.equals(priorityQueue.size())) { + return Result.error(CodeMsg.ADDQUEUE_OVER); + } + MqEntity mqEntity = message.getMqEntity(); + for (Runnable runnable : priorityQueue) { if (!(runnable instanceof MqMessage)) { continue; } MqMessage mqMessage = (MqMessage) runnable; - String code = mqMessage.getCode(); + String code = mqMessage.getMqEntity().getMsgCode(); if (StringUtils.isEmpty(code)){ continue; } - if (code.equalsIgnoreCase(message.getCode())) { + if (code.equalsIgnoreCase(mqEntity.getMsgCode())) { preMsg = (MqMessage) runnable; + break; } } if (preMsg != null) { int cnt = preMsg.getCnt(); preMsg.setCnt(cnt + 1); - message.printRepeat(); + return Result.error(CodeMsg.REPEAT_ERROR); + }else { message.setTime(new Date()); message.setCnt(0); } - executorService.execute(message); - return true; + priorityExecutorService.execute(message); + SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity); + if (preUnSuccessByMqEntity == null) { + SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity); + logByMqEntity.setTouchType(touchType).updateById(); + } else { + mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId()); + mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo())); + mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize())); + mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages())); + } + + return Result.success(CodeMsg.ADDQUEUE_SUCCESS); }catch (Exception e) { log.error(e.getMessage()); - return false; + return Result.success(CodeMsg.ADDQUEUE_FAIL); } } - @Override - public PriorityBlockingQueue<Runnable> getQueue() { - return queue; - } @Override public Integer passiveQueueSize() { - return queue.size(); + return priorityQueue.size(); } @Override public Integer avtiveQueueSize() { - return activeQueue.size(); + return infiniteQueue.size(); } - private void createView(Maintain baseMaintain) { - String maintainTableName = baseMaintain.getTableName(); - String version = baseMaintain.getVersion(); - } } -- Gitblit v1.8.0