From 5268a2b7dfa556bd6f5a2d5e446cea3ea9940c10 Mon Sep 17 00:00:00 2001
From: kimi <kimi42345@gmail.com>
Date: 星期三, 22 四月 2020 11:18:23 +0800
Subject: [PATCH] add 分发     master_author 添加字段   subscribe increment,   添加7个表  master_author_subscribe   master_author_unactive   sys_dispense_config   sys_dispense_logs  sys_view   sys_view_join   sys_view_logic

---
 src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java |  186 +++++++++++++++++++++++++++++++++++++---------
 1 files changed, 150 insertions(+), 36 deletions(-)

diff --git a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
index 3ae9347..8b397a7 100644
--- a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
+++ b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
@@ -1,19 +1,30 @@
 package com.highdatas.mdm.service.impl;
 
 import com.highdatas.mdm.entity.Maintain;
-import com.highdatas.mdm.service.DispenseService;
-import com.highdatas.mdm.service.MasterDataService;
+import com.highdatas.mdm.entity.MasterAuthor;
+import com.highdatas.mdm.entity.SysDispenseLogs;
+import com.highdatas.mdm.entity.TUser;
+import com.highdatas.mdm.pojo.CodeMsg;
+import com.highdatas.mdm.pojo.Page;
+import com.highdatas.mdm.pojo.Result;
+import com.highdatas.mdm.service.*;
+import com.highdatas.mdm.util.Constant;
+import com.highdatas.mdm.util.pool.MqEntity;
 import com.highdatas.mdm.util.pool.MqMessage;
 import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author kimi
@@ -23,91 +34,194 @@
 
 @Service
 @Slf4j
+@ConfigurationProperties(prefix = "pool")
 public class DispenseServiceImpl implements DispenseService {
     @Autowired
     MasterDataService masterDataService;
+    @Autowired
+    IMasterAuthorService masterAuthorService;
+    @Autowired
+    ISysViewService viewService;
+    @Autowired
+    IMaintainService maintainService;
+    @Autowired
+    IMenuMappingService menuMappingService;
+    @Autowired
+    ISysDispenseLogsService dispenseLogsService;
+
     @Value("${pool.coresize}")
-    private Integer coreSize;
-    private ExecutorService executorService;
-    private PriorityBlockingQueue<Runnable> queue;
-    private LinkedBlockingQueue activeQueue;
-    private ThreadPoolExecutor activeExecutorService;
+    private String coreSizeStr;
+    private int coreSize;
+    private Integer queueSize = 5;
+    private ExecutorService priorityExecutorService;
+    private volatile PriorityBlockingQueue<Runnable> priorityQueue;
+    private volatile LinkedBlockingQueue infiniteQueue;
+    private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
+    private  ThreadPoolExecutor infiniteExecutorService;
 
     @PostConstruct
     public void init() {
-        if (coreSize == null && coreSize == 0) {
+
+        if (StringUtils.isEmpty(coreSizeStr)) {
             //IO瀵嗛泦鍨� 璁剧疆榛樿 cpu鏁伴噺   棰勭暀 涓诲姩鍜岃鍔ㄤ袱缁勭嚎绋嬫睜
             coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue();
+        } else {
+            coreSize = Integer.valueOf(coreSizeStr);
         }
         log.info("Queue_Consume_thread_size:{}",coreSize);
-        this.queue = new PriorityBlockingQueue<>();
-        this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue);
-        this.activeQueue = new LinkedBlockingQueue();
-        this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy());
+        this.priorityQueue = new PriorityBlockingQueue<>(queueSize);
+        this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue);
+        this.infiniteQueue = new LinkedBlockingQueue();
+        this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy());
+        this.passiveRequestList = new CopyOnWriteArrayList<>();
+        createPassiveListenerThread();
+    }
+
+
+    private void createPassiveListenerThread() {
+        new Thread(() -> {
+            log.info("琚姩鎺ュ彛鍑嗗寰呭懡銆�");
+            while (true) {
+                try{
+                    if (passiveRequestList.size() == 0) {
+                        Thread.sleep(5 * 1000);
+                    }
+                    List<MqMessage> removedList = new ArrayList<>();
+                    for (MqMessage mqMessage : passiveRequestList) {
+                        MqEntity mqEntity = mqMessage.getMqEntity();
+                        AtomicInteger pageNo = mqEntity.getPageNo();
+                        if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) {
+                            removedList.add(mqMessage);
+                        } else {
+                            MqMessage nextSubMq = createNextSubMq(mqEntity);
+                            infiniteExecutorService.execute(nextSubMq);
+                        }
+
+                    }
+                    for (MqMessage mqMessage : removedList) {
+                        passiveRequestList.remove(mqMessage);
+                    }
+
+                }
+                catch (Exception e){
+                    e.printStackTrace();
+                    //log
+                }
+            }
+        }).start();
+    }
+
+    private MqMessage createNextSubMq(MqEntity mqEntity) {
+        String type = mqEntity.getType();
+        AtomicInteger pageNoAI = mqEntity.getPageNo();
+        if(pageNoAI == null) {
+            if (type.equalsIgnoreCase(Constant.Master)) {
+                String dataId = mqEntity.getDataId();
+                String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
+                String userId = mqEntity.getUserId();
+                TUser user = new TUser().setUserId(userId);
+                Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
+                MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
+                masterAuthor.setIncrement(mqEntity.getIncrement());
+                Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
+                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
+                mqEntity.setPages(pagesAI);
+                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
+                mqEntity.setPageSize(pageSizeAI);
+            } else {
+                Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
+                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
+                mqEntity.setPages(pagesAI);
+                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
+                mqEntity.setPageSize(pageSizeAI);
+            }
+            pageNoAI = new AtomicInteger();
+            mqEntity.setPageNo(pageNoAI);
+        }
+        int i = pageNoAI.addAndGet(1);
+        MqMessage mqMessage = new MqMessage(mqEntity);
+        return mqMessage;
     }
 
     @Override
     public boolean pushActiveMq(MqMessage message) {
-        try{
-            activeExecutorService.execute(message);
-            return true;
-        }catch (Exception e) {
-            log.error(e.getMessage());
-            return false;
+        MqEntity mqEntity = message.getMqEntity();
+        SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
+        if (preUnSuccessByMqEntity == null) {
+            SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
+            logByMqEntity.setTouchType("sys").updateById();
+        } else {
+            mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
+            mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
+            mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
+            mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
         }
+
+        this.passiveRequestList.add(message);
+
+        return true;
     }
 
     @Override
-    public boolean pushPassiveMq(MqMessage message) {
+    public Result pushPassiveMq(MqMessage message, String touchType) {
         try{
             MqMessage preMsg = null;
-            for (Runnable runnable : queue) {
+            if (queueSize.equals(priorityQueue.size())) {
+                return Result.error(CodeMsg.ADDQUEUE_OVER);
+            }
+            MqEntity mqEntity = message.getMqEntity();
+            for (Runnable runnable : priorityQueue) {
                 if (!(runnable instanceof MqMessage)) {
                     continue;
                 }
                 MqMessage mqMessage = (MqMessage) runnable;
-                String code = mqMessage.getCode();
+                String code = mqMessage.getMqEntity().getMsgCode();
                 if (StringUtils.isEmpty(code)){
                     continue;
                 }
-                if (code.equalsIgnoreCase(message.getCode())) {
+                if (code.equalsIgnoreCase(mqEntity.getMsgCode())) {
                     preMsg = (MqMessage) runnable;
+                    break;
                 }
             }
             if (preMsg != null) {
                 int cnt = preMsg.getCnt();
                 preMsg.setCnt(cnt + 1);
-                message.printRepeat();
+                return Result.error(CodeMsg.REPEAT_ERROR);
+
             }else {
                 message.setTime(new Date());
                 message.setCnt(0);
             }
-            executorService.execute(message);
-            return true;
+            priorityExecutorService.execute(message);
+            SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
+            if (preUnSuccessByMqEntity == null) {
+                SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
+                logByMqEntity.setTouchType(touchType).updateById();
+            } else {
+                mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
+                mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
+                mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
+                mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
+            }
+
+            return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
         }catch (Exception e) {
             log.error(e.getMessage());
-            return false;
+            return Result.success(CodeMsg.ADDQUEUE_FAIL);
         }
     }
 
-    @Override
-    public PriorityBlockingQueue<Runnable> getQueue() {
-        return queue;
-    }
 
     @Override
     public Integer passiveQueueSize() {
-        return queue.size();
+        return priorityQueue.size();
     }
 
     @Override
     public Integer avtiveQueueSize() {
-        return activeQueue.size();
+        return infiniteQueue.size();
     }
 
-    private void  createView(Maintain baseMaintain) {
-        String maintainTableName = baseMaintain.getTableName();
-        String version = baseMaintain.getVersion();
 
-    }
 }

--
Gitblit v1.8.0