package com.highdatas.mdm.service.impl;
|
|
import com.highdatas.mdm.entity.Maintain;
|
import com.highdatas.mdm.service.DispenseService;
|
import com.highdatas.mdm.service.MasterDataService;
|
import com.highdatas.mdm.util.pool.MqMessage;
|
import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.PostConstruct;
|
import java.util.Date;
|
import java.util.concurrent.*;
|
|
/**
|
* @author kimi
|
* @description
|
* @date 2020-04-14 12:58
|
*/
|
|
@Service
|
@Slf4j
|
public class DispenseServiceImpl implements DispenseService {
|
@Autowired
|
MasterDataService masterDataService;
|
@Value("${pool.coresize}")
|
private Integer coreSize;
|
private ExecutorService executorService;
|
private PriorityBlockingQueue<Runnable> queue;
|
private LinkedBlockingQueue activeQueue;
|
private ThreadPoolExecutor activeExecutorService;
|
|
@PostConstruct
|
public void init() {
|
if (coreSize == null && coreSize == 0) {
|
//IO密集型 设置默认 cpu数量 预留 主动和被动两组线程池
|
coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue();
|
}
|
log.info("Queue_Consume_thread_size:{}",coreSize);
|
this.queue = new PriorityBlockingQueue<>();
|
this.executorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, queue);
|
this.activeQueue = new LinkedBlockingQueue();
|
this.activeExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, activeQueue,new ThreadPoolExecutor.AbortPolicy());
|
}
|
|
@Override
|
public boolean pushActiveMq(MqMessage message) {
|
try{
|
activeExecutorService.execute(message);
|
return true;
|
}catch (Exception e) {
|
log.error(e.getMessage());
|
return false;
|
}
|
}
|
|
@Override
|
public boolean pushPassiveMq(MqMessage message) {
|
try{
|
MqMessage preMsg = null;
|
for (Runnable runnable : queue) {
|
if (!(runnable instanceof MqMessage)) {
|
continue;
|
}
|
MqMessage mqMessage = (MqMessage) runnable;
|
String code = mqMessage.getCode();
|
if (StringUtils.isEmpty(code)){
|
continue;
|
}
|
if (code.equalsIgnoreCase(message.getCode())) {
|
preMsg = (MqMessage) runnable;
|
}
|
}
|
if (preMsg != null) {
|
int cnt = preMsg.getCnt();
|
preMsg.setCnt(cnt + 1);
|
message.printRepeat();
|
}else {
|
message.setTime(new Date());
|
message.setCnt(0);
|
}
|
executorService.execute(message);
|
return true;
|
}catch (Exception e) {
|
log.error(e.getMessage());
|
return false;
|
}
|
}
|
|
@Override
|
public PriorityBlockingQueue<Runnable> getQueue() {
|
return queue;
|
}
|
|
@Override
|
public Integer passiveQueueSize() {
|
return queue.size();
|
}
|
|
@Override
|
public Integer avtiveQueueSize() {
|
return activeQueue.size();
|
}
|
|
private void createView(Maintain baseMaintain) {
|
String maintainTableName = baseMaintain.getTableName();
|
String version = baseMaintain.getVersion();
|
|
}
|
}
|