From c007f0ca1785db093d48f4846cda82fe8e955765 Mon Sep 17 00:00:00 2001
From: kimi <kimi42345@gmail.com>
Date: 星期三, 27 五月 2020 09:59:29 +0800
Subject: [PATCH] merage

---
 src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java |  308 ++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 268 insertions(+), 40 deletions(-)

diff --git a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
index 3ae9347..694a810 100644
--- a/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
+++ b/src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
@@ -1,19 +1,28 @@
 package com.highdatas.mdm.service.impl;
 
-import com.highdatas.mdm.entity.Maintain;
-import com.highdatas.mdm.service.DispenseService;
-import com.highdatas.mdm.service.MasterDataService;
+import com.highdatas.mdm.entity.*;
+import com.highdatas.mdm.pojo.CodeMsg;
+import com.highdatas.mdm.pojo.Page;
+import com.highdatas.mdm.pojo.Result;
+import com.highdatas.mdm.service.*;
+import com.highdatas.mdm.util.Constant;
+import com.highdatas.mdm.util.DbUtils;
+import com.highdatas.mdm.util.pool.MqEntity;
 import com.highdatas.mdm.util.pool.MqMessage;
 import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.List;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author kimi
@@ -23,91 +32,310 @@
 
 @Service
 @Slf4j
+@ConfigurationProperties(prefix = "pool")
 public class DispenseServiceImpl implements DispenseService {
     @Autowired
     MasterDataService masterDataService;
+    @Autowired
+    IMasterAuthorService masterAuthorService;
+    @Autowired
+    ISysViewService viewService;
+    @Autowired
+    IMaintainService maintainService;
+    @Autowired
+    IMenuMappingService menuMappingService;
+    @Autowired
+    ISysDispenseLogsService dispenseLogsService;
+
     @Value("${pool.coresize}")
-    private Integer coreSize;
-    private ExecutorService executorService;
-    private PriorityBlockingQueue<Runnable> queue;
-    private LinkedBlockingQueue activeQueue;
-    private ThreadPoolExecutor activeExecutorService;
+    private String coreSizeStr;
+
+    @Value("${pool.aes}")
+    private Boolean aes;
+    @Value("${pool.aesurl}")
+    private String aesUrl;
+
+    private int coreSize;
+    private Integer queueSize = 5;
+    private ExecutorService priorityExecutorService;
+    private volatile PriorityBlockingQueue<Runnable> priorityQueue;
+    private volatile LinkedBlockingQueue infiniteQueue;
+    private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
+    private  ThreadPoolExecutor infiniteExecutorService;
+
+    /**
+     *
+     * @description:  鍒濆鍖栫殑鏃跺�欏垱寤轰富鍔紝琚姩涓や釜鎺掗槦闃熷垪
+     * @return: void
+     *
+     */
 
     @PostConstruct
     public void init() {
-        if (coreSize == null && coreSize == 0) {
+
+        if (StringUtils.isEmpty(coreSizeStr)) {
             //IO瀵嗛泦鍨� 璁剧疆榛樿 cpu鏁伴噺   棰勭暀 涓诲姩鍜岃鍔ㄤ袱缁勭嚎绋嬫睜
             coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue();
+        } else {
+            coreSize = Integer.valueOf(coreSizeStr);
         }
         log.info("Queue_Consume_thread_size:{}",coreSize);
-        this.queue = new PriorityBlockingQueue<>();
-        this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue);
-        this.activeQueue = new LinkedBlockingQueue();
-        this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy());
+        this.priorityQueue = new PriorityBlockingQueue<>(queueSize);
+        this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue);
+        this.infiniteQueue = new LinkedBlockingQueue();
+        this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy());
+        this.passiveRequestList = new CopyOnWriteArrayList<>();
+        createPassiveListenerThread();
+    }
+    /**
+     *
+     * @description:  鑾峰彇鏄惁闇�瑕�
+     * @return: 鏄惁闇�瑕乤es鍔犲瘑
+     *
+     */
+    @Override
+    public Boolean getAes() {
+        return aes;
+    }
+    /**
+     *
+     * @description:  鑾峰彇ars鍔犲瘑璇锋眰鐨勮矾寰�
+     * @return:  鍔犲瘑鎺ュ彛璺緞
+     *
+     */
+    @Override
+    public String getAesUrl() {
+        return aesUrl;
     }
 
+    /**
+     *
+     * @description:  鍒涘缓涓诲姩鍒嗗彂闃熷垪
+     * @return: void
+     *
+     */
+
+    private void createPassiveListenerThread() {
+        new Thread(() -> {
+            log.info("琚姩鎺ュ彛鍑嗗寰呭懡銆�");
+            while (true) {
+                try{
+                    if (passiveRequestList.size() == 0) {
+                        Thread.sleep(5 * 1000);
+                    }
+                    List<MqMessage> removedList = new ArrayList<>();
+                    for (MqMessage mqMessage : passiveRequestList) {
+                        MqEntity mqEntity = mqMessage.getMqEntity();
+                        AtomicInteger pageNo = mqEntity.getPageNo();
+                        if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) {
+                            removedList.add(mqMessage);
+                        } else {
+                            MqMessage nextSubMq = createNextSubMq(mqEntity);
+                            infiniteExecutorService.execute(nextSubMq);
+                        }
+
+                    }
+                    for (MqMessage mqMessage : removedList) {
+                        passiveRequestList.remove(mqMessage);
+                    }
+
+                }
+                catch (Exception e){
+                    e.printStackTrace();
+                    //log
+                }
+            }
+        }).start();
+    }
+
+    /**
+     *
+     * @description:  鍒涘缓涓嬮〉鍒嗗彂鐨勬暟鎹寘淇℃伅
+     * @param  mqEntity 褰撳墠鍒嗗彂鐨別ntity鏁版嵁
+     * @return: void
+     *
+     */
+    private MqMessage createNextSubMq(MqEntity mqEntity) {
+        String type = mqEntity.getType();
+        AtomicInteger pageNoAI = mqEntity.getPageNo();
+        String logId = mqEntity.getLogId();
+        SysDispenseLogs logs = dispenseLogsService.selectById(logId);
+
+        if(pageNoAI == null) {
+            String userId = mqEntity.getUserId();
+            TUser user = DbUtils.getUserById(userId);
+            if (user == null && logs != null) {
+                logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById();
+            } else if (logs != null) {
+                logs.setUserName(user.getUserName());
+                logs.updateById();
+            }
+            if (type.equalsIgnoreCase(Constant.Master)) {
+                String dataId = mqEntity.getDataId();
+                String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
+                Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
+                MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
+                masterAuthor.setIncrement(mqEntity.getIncrement());
+                Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
+                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
+                mqEntity.setPages(pagesAI);
+                mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
+                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
+                mqEntity.setPageSize(pageSizeAI);
+                if (logs != null) {
+                    SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName());
+                    if (menuByTableName != null) {
+                        logs.setName(menuByTableName.getName());
+                    }
+                    logs.setVersion(maintain.getVersion());
+                    logs.updateById();
+                }
+
+            } else {
+                Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
+                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
+                mqEntity.setPages(pagesAI);
+                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
+                mqEntity.setPageSize(pageSizeAI);
+                mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
+                if (logs != null) {
+                    SysView sysView = viewService.selectById(mqEntity.getDataId());
+                    if (sysView != null) {
+                        logs.setName(sysView.getName());
+                        logs.updateById();
+                    }
+                }
+            }
+            pageNoAI = new AtomicInteger();
+            mqEntity.setPageNo(pageNoAI);
+        }
+        int i = pageNoAI.addAndGet(1);
+        MqMessage mqMessage = new MqMessage(mqEntity);
+        return mqMessage;
+    }
+    /**
+     *
+     * @description:  娣诲姞鍒颁富鍔ㄥ垎鍙戦槦鍒�
+     * @param  message 鏁版嵁鍖呬俊鎭�
+     * @return: 鏄惁娣诲姞鎴愬姛
+     *
+     */
     @Override
     public boolean pushActiveMq(MqMessage message) {
-        try{
-            activeExecutorService.execute(message);
-            return true;
-        }catch (Exception e) {
-            log.error(e.getMessage());
-            return false;
-        }
+        return pushActiveMq(message, null);
     }
-
     @Override
-    public boolean pushPassiveMq(MqMessage message) {
+    public boolean pushActiveMq(MqMessage message, String touchType) {
+        MqEntity mqEntity = message.getMqEntity();
+        SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
+        if (preUnSuccessByMqEntity == null) {
+            SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
+            if (StringUtils.isEmpty(touchType)) {
+                logByMqEntity.setTouchType(touchType);
+            } else {
+                logByMqEntity.setTouchType("sys");
+            }
+
+
+            logByMqEntity.updateById();
+
+            mqEntity.setLogId(logByMqEntity.getId());
+
+        } else {
+            mqEntity.setLogId(preUnSuccessByMqEntity.getId());
+            mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
+            mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
+            mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
+            mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
+        }
+
+        this.passiveRequestList.add(message);
+
+        return true;
+    }
+    /**
+     *
+     * @description:  娣诲姞鍒拌鍔ㄥ垎鍙戦槦鍒�
+     * @param  message 鏁版嵁鍖呬俊鎭�
+     * @param  touchType 瑙﹀彂绫诲瀷
+     * @param  logs 鏃ュ織瀵硅薄
+     * @return: 鏄惁娣诲姞鎴愬姛
+     *
+     */
+    @Override
+    public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) {
         try{
             MqMessage preMsg = null;
-            for (Runnable runnable : queue) {
+            if (queueSize.equals(priorityQueue.size())) {
+                logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById();
+                return Result.error(CodeMsg.ADDQUEUE_OVER);
+            }
+            MqEntity mqEntity = message.getMqEntity();
+            for (Runnable runnable : priorityQueue) {
                 if (!(runnable instanceof MqMessage)) {
                     continue;
                 }
                 MqMessage mqMessage = (MqMessage) runnable;
-                String code = mqMessage.getCode();
+                String code = mqMessage.getMqEntity().getMsgCode();
                 if (StringUtils.isEmpty(code)){
                     continue;
                 }
-                if (code.equalsIgnoreCase(message.getCode())) {
+                if (code.equalsIgnoreCase(mqEntity.getMsgCode())) {
                     preMsg = (MqMessage) runnable;
+                    break;
                 }
             }
             if (preMsg != null) {
                 int cnt = preMsg.getCnt();
                 preMsg.setCnt(cnt + 1);
-                message.printRepeat();
+                logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById();
+                return Result.error(CodeMsg.REPEAT_ERROR);
+
             }else {
                 message.setTime(new Date());
                 message.setCnt(0);
             }
-            executorService.execute(message);
-            return true;
+            priorityExecutorService.execute(message);
+            SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
+            if (preUnSuccessByMqEntity == null) {
+               logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById();
+            } else {
+                mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
+                mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
+                mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
+                mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
+            }
+
+            return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
         }catch (Exception e) {
             log.error(e.getMessage());
-            return false;
+            logs.setErrorInfo(e.getMessage());
+            return Result.success(CodeMsg.ADDQUEUE_FAIL);
         }
     }
 
-    @Override
-    public PriorityBlockingQueue<Runnable> getQueue() {
-        return queue;
-    }
+    /**
+     *
+     * @description:  鑾峰彇涓诲姩鍒嗗彂闃熷垪涓殑鎺掗槦鏁�
+     * @return: 涓诲姩鍒嗗彂鎺掗槦鏁�
+     *
+     */
+
 
     @Override
     public Integer passiveQueueSize() {
-        return queue.size();
+        return priorityQueue.size();
     }
-
+    /**
+     *
+     * @description:  鑾峰彇琚姩鍒嗗彂闃熷垪涓殑鎺掗槦鏁�
+     * @return: 琚姩鍒嗗彂鎺掗槦鏁�
+     *
+     */
     @Override
     public Integer avtiveQueueSize() {
-        return activeQueue.size();
+        return infiniteQueue.size();
     }
 
-    private void  createView(Maintain baseMaintain) {
-        String maintainTableName = baseMaintain.getTableName();
-        String version = baseMaintain.getVersion();
 
-    }
 }

--
Gitblit v1.8.0