src/main/java/com/highdatas/mdm/service/impl/DispenseServiceImpl.java
@@ -1,19 +1,30 @@
package com.highdatas.mdm.service.impl;
import com.highdatas.mdm.entity.Maintain;
import com.highdatas.mdm.service.DispenseService;
import com.highdatas.mdm.service.MasterDataService;
import com.highdatas.mdm.entity.MasterAuthor;
import com.highdatas.mdm.entity.SysDispenseLogs;
import com.highdatas.mdm.entity.TUser;
import com.highdatas.mdm.pojo.CodeMsg;
import com.highdatas.mdm.pojo.Page;
import com.highdatas.mdm.pojo.Result;
import com.highdatas.mdm.service.*;
import com.highdatas.mdm.util.Constant;
import com.highdatas.mdm.util.pool.MqEntity;
import com.highdatas.mdm.util.pool.MqMessage;
import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
 * @author kimi
@@ -23,91 +34,194 @@
@Service
@Slf4j
@ConfigurationProperties(prefix = "pool")
public class DispenseServiceImpl implements DispenseService {
    @Autowired
    MasterDataService masterDataService;
    @Autowired
    IMasterAuthorService masterAuthorService;
    @Autowired
    ISysViewService viewService;
    @Autowired
    IMaintainService maintainService;
    @Autowired
    IMenuMappingService menuMappingService;
    @Autowired
    ISysDispenseLogsService dispenseLogsService;
    @Value("${pool.coresize}")
    private Integer coreSize;
    private ExecutorService executorService;
    private PriorityBlockingQueue<Runnable> queue;
    private LinkedBlockingQueue activeQueue;
    private ThreadPoolExecutor activeExecutorService;
    private String coreSizeStr;
    private int coreSize;
    private Integer queueSize = 5;
    private ExecutorService priorityExecutorService;
    private volatile PriorityBlockingQueue<Runnable> priorityQueue;
    private volatile LinkedBlockingQueue infiniteQueue;
    private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
    private  ThreadPoolExecutor infiniteExecutorService;
    @PostConstruct
    public void init() {
        if (coreSize == null && coreSize == 0) {
        if (StringUtils.isEmpty(coreSizeStr)) {
            //IO密集型 设置默认 cpu数量   预留 主动和被动两组线程池
            coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue();
        } else {
            coreSize = Integer.valueOf(coreSizeStr);
        }
        log.info("Queue_Consume_thread_size:{}",coreSize);
        this.queue = new PriorityBlockingQueue<>();
        this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue);
        this.activeQueue = new LinkedBlockingQueue();
        this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy());
        this.priorityQueue = new PriorityBlockingQueue<>(queueSize);
        this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue);
        this.infiniteQueue = new LinkedBlockingQueue();
        this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy());
        this.passiveRequestList = new CopyOnWriteArrayList<>();
        createPassiveListenerThread();
    }
    private void createPassiveListenerThread() {
        new Thread(() -> {
            log.info("被动接口准备待命。");
            while (true) {
                try{
                    if (passiveRequestList.size() == 0) {
                        Thread.sleep(5 * 1000);
                    }
                    List<MqMessage> removedList = new ArrayList<>();
                    for (MqMessage mqMessage : passiveRequestList) {
                        MqEntity mqEntity = mqMessage.getMqEntity();
                        AtomicInteger pageNo = mqEntity.getPageNo();
                        if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) {
                            removedList.add(mqMessage);
                        } else {
                            MqMessage nextSubMq = createNextSubMq(mqEntity);
                            infiniteExecutorService.execute(nextSubMq);
                        }
                    }
                    for (MqMessage mqMessage : removedList) {
                        passiveRequestList.remove(mqMessage);
                    }
                }
                catch (Exception e){
                    e.printStackTrace();
                    //log
                }
            }
        }).start();
    }
    private MqMessage createNextSubMq(MqEntity mqEntity) {
        String type = mqEntity.getType();
        AtomicInteger pageNoAI = mqEntity.getPageNo();
        if(pageNoAI == null) {
            if (type.equalsIgnoreCase(Constant.Master)) {
                String dataId = mqEntity.getDataId();
                String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
                String userId = mqEntity.getUserId();
                TUser user = new TUser().setUserId(userId);
                Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
                MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
                masterAuthor.setIncrement(mqEntity.getIncrement());
                Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
                mqEntity.setPages(pagesAI);
                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
                mqEntity.setPageSize(pageSizeAI);
            } else {
                Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
                AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
                mqEntity.setPages(pagesAI);
                AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
                mqEntity.setPageSize(pageSizeAI);
            }
            pageNoAI = new AtomicInteger();
            mqEntity.setPageNo(pageNoAI);
        }
        int i = pageNoAI.addAndGet(1);
        MqMessage mqMessage = new MqMessage(mqEntity);
        return mqMessage;
    }
    @Override
    public boolean pushActiveMq(MqMessage message) {
        try{
            activeExecutorService.execute(message);
            return true;
        }catch (Exception e) {
            log.error(e.getMessage());
            return false;
        MqEntity mqEntity = message.getMqEntity();
        SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
        if (preUnSuccessByMqEntity == null) {
            SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
            logByMqEntity.setTouchType("sys").updateById();
        } else {
            mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
            mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
            mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
            mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
        }
        this.passiveRequestList.add(message);
        return true;
    }
    @Override
    public boolean pushPassiveMq(MqMessage message) {
    public Result pushPassiveMq(MqMessage message, String touchType) {
        try{
            MqMessage preMsg = null;
            for (Runnable runnable : queue) {
            if (queueSize.equals(priorityQueue.size())) {
                return Result.error(CodeMsg.ADDQUEUE_OVER);
            }
            MqEntity mqEntity = message.getMqEntity();
            for (Runnable runnable : priorityQueue) {
                if (!(runnable instanceof MqMessage)) {
                    continue;
                }
                MqMessage mqMessage = (MqMessage) runnable;
                String code = mqMessage.getCode();
                String code = mqMessage.getMqEntity().getMsgCode();
                if (StringUtils.isEmpty(code)){
                    continue;
                }
                if (code.equalsIgnoreCase(message.getCode())) {
                if (code.equalsIgnoreCase(mqEntity.getMsgCode())) {
                    preMsg = (MqMessage) runnable;
                    break;
                }
            }
            if (preMsg != null) {
                int cnt = preMsg.getCnt();
                preMsg.setCnt(cnt + 1);
                message.printRepeat();
                return Result.error(CodeMsg.REPEAT_ERROR);
            }else {
                message.setTime(new Date());
                message.setCnt(0);
            }
            executorService.execute(message);
            return true;
            priorityExecutorService.execute(message);
            SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
            if (preUnSuccessByMqEntity == null) {
                SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
                logByMqEntity.setTouchType(touchType).updateById();
            } else {
                mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
                mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
                mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
                mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
            }
            return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
        }catch (Exception e) {
            log.error(e.getMessage());
            return false;
            return Result.success(CodeMsg.ADDQUEUE_FAIL);
        }
    }
    @Override
    public PriorityBlockingQueue<Runnable> getQueue() {
        return queue;
    }
    @Override
    public Integer passiveQueueSize() {
        return queue.size();
        return priorityQueue.size();
    }
    @Override
    public Integer avtiveQueueSize() {
        return activeQueue.size();
        return infiniteQueue.size();
    }
    private void  createView(Maintain baseMaintain) {
        String maintainTableName = baseMaintain.getTableName();
        String version = baseMaintain.getVersion();
    }
}