package com.highdatas.mdm.service.impl; import com.highdatas.mdm.entity.Maintain; import com.highdatas.mdm.service.DispenseService; import com.highdatas.mdm.service.MasterDataService; import com.highdatas.mdm.util.pool.MqMessage; import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.Date; import java.util.concurrent.*; /** * @author kimi * @description * @date 2020-04-14 12:58 */ @Service @Slf4j public class DispenseServiceImpl implements DispenseService { @Autowired MasterDataService masterDataService; @Value("${pool.coresize}") private Integer coreSize; private ExecutorService executorService; private PriorityBlockingQueue queue; private LinkedBlockingQueue activeQueue; private ThreadPoolExecutor activeExecutorService; @PostConstruct public void init() { if (coreSize == null && coreSize == 0) { //IO密集型 设置默认 cpu数量 预留 主动和被动两组线程池 coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue(); } log.info("Queue_Consume_thread_size:{}",coreSize); this.queue = new PriorityBlockingQueue<>(); this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue); this.activeQueue = new LinkedBlockingQueue(); this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy()); } @Override public boolean pushActiveMq(MqMessage message) { try{ activeExecutorService.execute(message); return true; }catch (Exception e) { log.error(e.getMessage()); return false; } } @Override public boolean pushPassiveMq(MqMessage message) { try{ MqMessage preMsg = null; for (Runnable runnable : queue) { if (!(runnable instanceof MqMessage)) { continue; } MqMessage mqMessage = (MqMessage) runnable; String code = mqMessage.getCode(); if (StringUtils.isEmpty(code)){ continue; } if (code.equalsIgnoreCase(message.getCode())) { preMsg = (MqMessage) runnable; } } if (preMsg != null) { int cnt = preMsg.getCnt(); preMsg.setCnt(cnt + 1); message.printRepeat(); }else { message.setTime(new Date()); message.setCnt(0); } executorService.execute(message); return true; }catch (Exception e) { log.error(e.getMessage()); return false; } } @Override public PriorityBlockingQueue getQueue() { return queue; } @Override public Integer passiveQueueSize() { return queue.size(); } @Override public Integer avtiveQueueSize() { return activeQueue.size(); } private void createView(Maintain baseMaintain) { String maintainTableName = baseMaintain.getTableName(); String version = baseMaintain.getVersion(); } }