package com.highdatas.mdm.service.impl;
|
|
import com.highdatas.mdm.entity.*;
|
import com.highdatas.mdm.pojo.CodeMsg;
|
import com.highdatas.mdm.pojo.Page;
|
import com.highdatas.mdm.pojo.Result;
|
import com.highdatas.mdm.service.*;
|
import com.highdatas.mdm.util.Constant;
|
import com.highdatas.mdm.util.DbUtils;
|
import com.highdatas.mdm.util.pool.MqEntity;
|
import com.highdatas.mdm.util.pool.MqMessage;
|
import com.highdatas.mdm.util.pool.PassiveMqThreadPoolExecutor;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.stereotype.Service;
|
|
import javax.annotation.PostConstruct;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
/**
|
* @author kimi
|
* @description
|
* @date 2020-04-14 12:58
|
*/
|
|
@Service
|
@Slf4j
|
@ConfigurationProperties(prefix = "pool")
|
public class DispenseServiceImpl implements DispenseService {
|
@Autowired
|
MasterDataService masterDataService;
|
@Autowired
|
IMasterAuthorService masterAuthorService;
|
@Autowired
|
ISysViewService viewService;
|
@Autowired
|
IMaintainService maintainService;
|
@Autowired
|
IMenuMappingService menuMappingService;
|
@Autowired
|
ISysDispenseLogsService dispenseLogsService;
|
|
@Value("${pool.coresize}")
|
private String coreSizeStr;
|
|
@Value("${pool.aes}")
|
private Boolean aes;
|
@Value("${pool.aesurl}")
|
private String aesUrl;
|
|
private int coreSize;
|
private Integer queueSize = 5;
|
private ExecutorService priorityExecutorService;
|
private volatile PriorityBlockingQueue<Runnable> priorityQueue;
|
private volatile LinkedBlockingQueue infiniteQueue;
|
private volatile CopyOnWriteArrayList<MqMessage> passiveRequestList;
|
private ThreadPoolExecutor infiniteExecutorService;
|
|
/**
|
*
|
* @description: 初始化的时候创建主动,被动两个排队队列
|
* @return: void
|
*
|
*/
|
|
@PostConstruct
|
public void init() {
|
|
if (StringUtils.isEmpty(coreSizeStr)) {
|
//IO密集型 设置默认 cpu数量 预留 主动和被动两组线程池
|
coreSize = Double.valueOf(Runtime.getRuntime().availableProcessors()).intValue();
|
} else {
|
coreSize = Integer.valueOf(coreSizeStr);
|
}
|
log.info("Queue_Consume_thread_size:{}",coreSize);
|
this.priorityQueue = new PriorityBlockingQueue<>(queueSize);
|
this.priorityExecutorService = new PassiveMqThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, priorityQueue);
|
this.infiniteQueue = new LinkedBlockingQueue();
|
this.infiniteExecutorService = new ThreadPoolExecutor(coreSize, coreSize, 0L, TimeUnit.MILLISECONDS, infiniteQueue,new ThreadPoolExecutor.AbortPolicy());
|
this.passiveRequestList = new CopyOnWriteArrayList<>();
|
createPassiveListenerThread();
|
}
|
/**
|
*
|
* @description: 获取是否需要
|
* @return: 是否需要aes加密
|
*
|
*/
|
@Override
|
public Boolean getAes() {
|
return aes;
|
}
|
/**
|
*
|
* @description: 获取ars加密请求的路径
|
* @return: 加密接口路径
|
*
|
*/
|
@Override
|
public String getAesUrl() {
|
return aesUrl;
|
}
|
|
/**
|
*
|
* @description: 创建主动分发队列
|
* @return: void
|
*
|
*/
|
|
private void createPassiveListenerThread() {
|
new Thread(() -> {
|
log.info("被动接口准备待命。");
|
while (true) {
|
try{
|
if (passiveRequestList.size() == 0) {
|
Thread.sleep(5 * 1000);
|
}
|
List<MqMessage> removedList = new ArrayList<>();
|
for (MqMessage mqMessage : passiveRequestList) {
|
MqEntity mqEntity = mqMessage.getMqEntity();
|
AtomicInteger pageNo = mqEntity.getPageNo();
|
if (pageNo != null && pageNo.get() == mqEntity.getPages().get()) {
|
removedList.add(mqMessage);
|
} else {
|
MqMessage nextSubMq = createNextSubMq(mqEntity);
|
infiniteExecutorService.execute(nextSubMq);
|
}
|
|
}
|
for (MqMessage mqMessage : removedList) {
|
passiveRequestList.remove(mqMessage);
|
}
|
|
}
|
catch (Exception e){
|
e.printStackTrace();
|
//log
|
}
|
}
|
}).start();
|
}
|
|
/**
|
*
|
* @description: 创建下页分发的数据包信息
|
* @param mqEntity 当前分发的entity数据
|
* @return: void
|
*
|
*/
|
private MqMessage createNextSubMq(MqEntity mqEntity) {
|
String type = mqEntity.getType();
|
AtomicInteger pageNoAI = mqEntity.getPageNo();
|
String logId = mqEntity.getLogId();
|
SysDispenseLogs logs = dispenseLogsService.selectById(logId);
|
|
if(pageNoAI == null) {
|
String userId = mqEntity.getUserId();
|
TUser user = DbUtils.getUserById(userId);
|
if (user == null && logs != null) {
|
logs.setErrorInfo(CodeMsg.USER_NOT_MATHED.getMsg()).updateById();
|
} else if (logs != null) {
|
logs.setUserName(user.getUserName());
|
logs.updateById();
|
}
|
if (type.equalsIgnoreCase(Constant.Master)) {
|
String dataId = mqEntity.getDataId();
|
String tableNameByMenu = menuMappingService.getTableNameByMenu(dataId);
|
Maintain maintain = maintainService.selectById(mqEntity.getMaintainId());
|
MasterAuthor masterAuthor = new MasterAuthor().setTableName(tableNameByMenu);
|
masterAuthor.setIncrement(mqEntity.getIncrement());
|
Page initPageInfo = masterAuthorService.getInitPageInfo(masterAuthor, maintain, user, mqEntity.getIncrement());
|
AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
|
mqEntity.setPages(pagesAI);
|
mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
|
mqEntity.setPageSize(pageSizeAI);
|
if (logs != null) {
|
SysMenu menuByTableName = menuMappingService.getMenuByTableName(maintain.getTableName());
|
if (menuByTableName != null) {
|
logs.setName(menuByTableName.getName());
|
}
|
logs.setVersion(maintain.getVersion());
|
logs.updateById();
|
}
|
|
} else {
|
Page initPageInfo = viewService.getInitPageInfo(mqEntity.getDataId());
|
AtomicInteger pagesAI = new AtomicInteger(initPageInfo.getPages());
|
mqEntity.setPages(pagesAI);
|
AtomicInteger pageSizeAI = new AtomicInteger(initPageInfo.getPageSize());
|
mqEntity.setPageSize(pageSizeAI);
|
mqEntity.setTotalSize(new AtomicInteger(Long.valueOf(initPageInfo.getRecordCount()).intValue()));
|
if (logs != null) {
|
SysView sysView = viewService.selectById(mqEntity.getDataId());
|
if (sysView != null) {
|
logs.setName(sysView.getName());
|
logs.updateById();
|
}
|
}
|
}
|
pageNoAI = new AtomicInteger();
|
mqEntity.setPageNo(pageNoAI);
|
}
|
int i = pageNoAI.addAndGet(1);
|
MqMessage mqMessage = new MqMessage(mqEntity);
|
return mqMessage;
|
}
|
/**
|
*
|
* @description: 添加到主动分发队列
|
* @param message 数据包信息
|
* @return: 是否添加成功
|
*
|
*/
|
@Override
|
public boolean pushActiveMq(MqMessage message) {
|
return pushActiveMq(message, null);
|
}
|
@Override
|
public boolean pushActiveMq(MqMessage message, String touchType) {
|
MqEntity mqEntity = message.getMqEntity();
|
SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
|
if (preUnSuccessByMqEntity == null) {
|
SysDispenseLogs logByMqEntity = dispenseLogsService.createLogByMqEntity(mqEntity);
|
if (StringUtils.isEmpty(touchType)) {
|
logByMqEntity.setTouchType(touchType);
|
} else {
|
logByMqEntity.setTouchType("sys");
|
}
|
|
|
logByMqEntity.updateById();
|
|
mqEntity.setLogId(logByMqEntity.getId());
|
|
} else {
|
mqEntity.setLogId(preUnSuccessByMqEntity.getId());
|
mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
|
mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
|
mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
|
mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
|
}
|
|
this.passiveRequestList.add(message);
|
|
return true;
|
}
|
/**
|
*
|
* @description: 添加到被动分发队列
|
* @param message 数据包信息
|
* @param touchType 触发类型
|
* @param logs 日志对象
|
* @return: 是否添加成功
|
*
|
*/
|
@Override
|
public Result pushPassiveMq(MqMessage message, String touchType, SysDispenseLogs logs) {
|
try{
|
MqMessage preMsg = null;
|
if (queueSize.equals(priorityQueue.size())) {
|
logs.setResult(CodeMsg.ADDQUEUE_OVER.getMsg()).updateById();
|
return Result.error(CodeMsg.ADDQUEUE_OVER);
|
}
|
MqEntity mqEntity = message.getMqEntity();
|
for (Runnable runnable : priorityQueue) {
|
if (!(runnable instanceof MqMessage)) {
|
continue;
|
}
|
MqMessage mqMessage = (MqMessage) runnable;
|
String code = mqMessage.getMqEntity().getMsgCode();
|
if (StringUtils.isEmpty(code)){
|
continue;
|
}
|
if (code.equalsIgnoreCase(mqEntity.getMsgCode())) {
|
preMsg = (MqMessage) runnable;
|
break;
|
}
|
}
|
if (preMsg != null) {
|
int cnt = preMsg.getCnt();
|
preMsg.setCnt(cnt + 1);
|
logs.setResult(CodeMsg.REPEAT_ERROR.getMsg()).updateById();
|
return Result.error(CodeMsg.REPEAT_ERROR);
|
|
}else {
|
message.setTime(new Date());
|
message.setCnt(0);
|
}
|
priorityExecutorService.execute(message);
|
SysDispenseLogs preUnSuccessByMqEntity = dispenseLogsService.getPreUnSuccessByMqEntity(mqEntity);
|
if (preUnSuccessByMqEntity == null) {
|
logs.setResult(CodeMsg.ADDQUEUE_SUCCESS.getMsg()).updateById();
|
} else {
|
mqEntity.setMsgKey(preUnSuccessByMqEntity.getKeyId());
|
mqEntity.setPageNo(new AtomicInteger(preUnSuccessByMqEntity.getPageNo()));
|
mqEntity.setPageSize(new AtomicInteger(preUnSuccessByMqEntity.getPageSize()));
|
mqEntity.setPages(new AtomicInteger(preUnSuccessByMqEntity.getPages()));
|
}
|
|
return Result.success(CodeMsg.ADDQUEUE_SUCCESS);
|
}catch (Exception e) {
|
log.error(e.getMessage());
|
logs.setErrorInfo(e.getMessage());
|
return Result.success(CodeMsg.ADDQUEUE_FAIL);
|
}
|
}
|
|
/**
|
*
|
* @description: 获取主动分发队列中的排队数
|
* @return: 主动分发排队数
|
*
|
*/
|
|
|
@Override
|
public Integer passiveQueueSize() {
|
return priorityQueue.size();
|
}
|
/**
|
*
|
* @description: 获取被动分发队列中的排队数
|
* @return: 被动分发排队数
|
*
|
*/
|
@Override
|
public Integer avtiveQueueSize() {
|
return infiniteQueue.size();
|
}
|
|
|
}
|