package com.highdatas.mdm.service.impl;
|
|
import com.baomidou.mybatisplus.mapper.EntityWrapper;
|
import com.baomidou.mybatisplus.service.impl.ServiceImpl;
|
import com.highdatas.mdm.entity.*;
|
import com.highdatas.mdm.mapper.SysAssembleMapper;
|
import com.highdatas.mdm.mapper.TableInfoMapper;
|
import com.highdatas.mdm.pojo.*;
|
import com.highdatas.mdm.pojo.kettle.BigDataDataSourceInfo;
|
import com.highdatas.mdm.pojo.kettle.DataSourceInfo;
|
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
|
import com.highdatas.mdm.service.*;
|
import com.highdatas.mdm.util.Constant;
|
import com.highdatas.mdm.util.ContentBuilder;
|
import com.highdatas.mdm.util.DbUtils;
|
import com.highdatas.mdm.util.RuleClient;
|
import com.xxl.job.core.log.XxlJobLogger;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.mybatis.spring.SqlSessionTemplate;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import javax.annotation.Resource;
|
import java.io.ByteArrayInputStream;
|
import java.io.InputStream;
|
import java.io.UnsupportedEncodingException;
|
import java.sql.*;
|
import java.text.MessageFormat;
|
import java.util.*;
|
import java.util.Date;
|
import java.util.stream.Collectors;
|
|
/**
|
* <p>
|
* 服务实现类
|
* </p>
|
*
|
* @author kimi
|
* @since 2020-02-20
|
*/
|
@Slf4j
|
@Service
|
public class SysAssembleServiceImpl extends ServiceImpl<SysAssembleMapper, SysAssemble> implements ISysAssembleService {
|
@Autowired
|
ISysAssembleRuleService ruleService;
|
@Autowired
|
ISysAssembleDbService dbService;
|
@Autowired
|
ISysAssembleApiService apiService;
|
@Autowired
|
ISysAssembleDbTableService tableService;
|
@Autowired
|
ISysAssembleDbFieldService fieldService;
|
@Autowired
|
ISysAssembleParamsService paramsService;
|
@Autowired
|
ISysAssembleLogService logService;
|
@Autowired
|
IMenuMappingService menuMappingService;
|
@Autowired
|
ActivitiService activitiService;
|
@Autowired
|
IMaintainService maintainService;
|
@Autowired
|
IFlowsService flowsService;
|
@Resource
|
SqlSessionTemplate sqlSessionTemplate;
|
@Autowired
|
TableInfoMapper tableInfoMapper;
|
@Autowired
|
UnBigDataDataSourceInfo unBigDataDataSourceInfo;
|
@Autowired
|
BigDataDataSourceInfo bigDataDataSourceInfo;
|
@Autowired
|
MasterDataService masterDataService;
|
|
@Autowired
|
RuleClient ruleClient;
|
@Autowired
|
IMasterModifiedService masterModifiedService;
|
@Autowired
|
ISysViewService viewService;
|
@Autowired
|
ISysMenuService menuService;
|
@Autowired
|
IMasterAuthorSubscribeService subscribeService;
|
/**
|
*
|
* @description: 通过汇集id执行汇集任务
|
* @param id 汇集id
|
* @return: 执行结果
|
*
|
*/
|
@Override
|
public Result run(String id) {
|
XxlJobLogger.log("log info: run" );
|
if (StringUtils.isEmpty(id)) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
|
SysAssemble assemble = selectById(id);
|
if (assemble == null) {
|
XxlJobLogger.log("assemble not found" );
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
|
assemble.setPreTime(new Date());
|
assemble.setPreStatus(SysAssembleRunStatus.working);
|
assemble.updateById();
|
|
if (!assemble.getStatus().equals(SysAssembleStatus.working)) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.setPreCnt(0);
|
XxlJobLogger.log("当前任务不在工作中,状态:" + assemble.getStatus() );
|
assemble.setPreMsg("当前任务不在工作中,状态:" + assemble.getStatus());
|
assemble.updateById();
|
return Result.error(new CodeMsg(6009,"当前任务不在工作中,状态:" + assemble.getStatus()));
|
|
}
|
|
String flowId = assemble.getFlowId();
|
if (!StringUtils.isEmpty(flowId)){
|
Flows flows = flowsService.selectById(flowId);
|
if (flows != null) {
|
ActivitiStatus status = flows.getStatus();
|
if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.setPreCnt(0);
|
XxlJobLogger.log("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString() );
|
assemble.setPreMsg("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString());
|
assemble.updateById();
|
return Result.error(new CodeMsg(6009,"当前有流程正在运行,暂时无法汇集下次数据:" + status.toString()));
|
|
}
|
}
|
}
|
|
String tableNameByMenu = menuMappingService.getTableNameByMenu(assemble.getMenuId());
|
boolean canAct = maintainService.getCanAct(tableNameByMenu);
|
if (!canAct) {
|
XxlJobLogger.log("当前有流程正在运行,暂时无法汇集下次数据");
|
assemble.setPreMsg("当前有流程正在运行,暂时无法汇集下次数据");
|
assemble.updateById();
|
return Result.error(new CodeMsg(6009,"当前有流程正在运行,暂时无法汇集下次数据"));
|
}
|
|
Boolean bigData = assemble.getBigdata();
|
Date scheduleDate = new Date();
|
XxlJobLogger.log("log info: start assemble");
|
//1 load from db
|
try{
|
Result result = runByDb(assemble);
|
Date dbDate = new Date();
|
XxlJobLogger.log("log info db time:" + (dbDate.getTime() - scheduleDate.getTime()) + "ms");
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.updateById();
|
return result;
|
}
|
|
//2 load from api
|
result = runByApi(id);
|
Date apiDate = new Date();
|
XxlJobLogger.log("log info api time:" + (apiDate.getTime() - dbDate.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.updateById();
|
return result;
|
}
|
//3check temp table;
|
result = checkTempTable(assemble);
|
Date checkDate = new Date();
|
XxlJobLogger.log("log info check time:" + (checkDate.getTime() - apiDate.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
XxlJobLogger.log("检测主数据临时表出现错误"+ result.getMessage());
|
|
assemble.setPreMsg("检测主数据临时表出现错误:" + result.getMessage());
|
assemble.updateById();
|
return result;
|
}
|
|
//4 purge data
|
String purgeSql = assemble.getPurgeSql();
|
purgeSql = DbUtils.replaceEscape(purgeSql);
|
XxlJobLogger.log("assemble db purgeSql sql:" + purgeSql);
|
result = purgeData(purgeSql, assemble.getId(), bigData);
|
Date purgeDate = new Date();
|
XxlJobLogger.log("log info purge time:" + (purgeDate.getTime() - checkDate.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.setPreMsg("运行清洗sql出现错误: "+purgeSql + "," + result.getMessage());
|
XxlJobLogger.log("运行清洗sql出现错误: "+purgeSql + "," + result.getMessage() );
|
assemble.updateById();
|
return result;
|
}
|
|
//5 check temp data
|
result = checkTempData(assemble);
|
Date checkTempDate = new Date();
|
XxlJobLogger.log("log info check temp time:" + (checkTempDate.getTime() - purgeDate.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.setPreMsg("质量检验环节出现错误:" + result.getMessage());
|
XxlJobLogger.log("质量检验环节出现错误" + result.getMessage());
|
assemble.updateById();
|
return result;
|
}
|
//6 temp 2 record
|
result = needRollBack(assemble, result, checkTempDate);
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
XxlJobLogger.log("临时数据链接主数据系统错误:" +result.getMessage());
|
assemble.setPreMsg("临时数据链接主数据系统错误:" +result.getMessage());
|
}else {
|
assemble.setPreMsg("汇集成功");
|
XxlJobLogger.log("汇集成功");
|
assemble.setPreStatus(SysAssembleRunStatus.success);
|
}
|
assemble.updateById();
|
return result;
|
}
|
catch (Exception e) {
|
XxlJobLogger.log("汇集任务出现错误:"+e.getMessage());
|
assemble.setPreMsg("汇集任务出现错误:"+e.getMessage());
|
assemble.setPreStatus(SysAssembleRunStatus.fail).updateById();
|
e.printStackTrace();
|
|
return Result.error(new CodeMsg(6009, e.getMessage()));
|
}
|
}
|
|
@Transactional(rollbackFor = {Exception.class, Error.class})
|
private Result needRollBack(SysAssemble assemble, Result result, Date checkTempDate) {
|
result = temp2record(assemble);
|
Date temp2RecordDate = new Date();
|
XxlJobLogger.log("log info temp2Record time:" + (temp2RecordDate.getTime() - checkTempDate.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
XxlJobLogger.log("数据搬运至主数据记录环节出现错误;" + result.getMessage() );
|
assemble.setPreMsg("数据搬运至主数据记录环节出现错误:" + result.getMessage());
|
assemble.updateById();
|
return result;
|
}
|
|
|
//7 record 链接 版本
|
|
result = linkMaintain(assemble);
|
Date linkMaintainDate = new Date();
|
XxlJobLogger.log("log info link maintain time:" + (linkMaintainDate.getTime() - temp2RecordDate.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
XxlJobLogger.log("联接版本出现错误:" + result.getMessage());
|
|
assemble.setPreMsg("联接版本出现错误:" + result.getMessage());
|
assemble.updateById();
|
return result;
|
}
|
|
//8 更新 参数
|
result = updateParams(assemble);
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
XxlJobLogger.log("更新参数错误:" + result.getMessage());
|
|
assemble.setPreMsg("更新参数错误:" + result.getMessage());
|
assemble.updateById();
|
return result;
|
}
|
return result;
|
}
|
|
|
private Result linkMaintain(SysAssemble assemble) {
|
try{
|
String menuId = assemble.getMenuId();
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
|
String tableName = menuMapping.getTableName();
|
SysAssembleUpdateType updateType = assemble.getUpdateType();
|
boolean isBigVersion = false;
|
if (updateType.equals(SysAssembleUpdateType.All)) {
|
isBigVersion = true;
|
}
|
Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion);
|
XxlJobLogger.log("log info create now maintain:" + maintain.getId());
|
Date before2Record = new Date();
|
|
Result result = temp2recordUpdate(assemble, maintain);
|
Date after2Record = new Date();
|
XxlJobLogger.log("log info temp2Record time:" + (after2Record.getTime() - before2Record.getTime()) + "ms");
|
|
if (!result.getSuccess()) {
|
assemble.setPreStatus(SysAssembleRunStatus.fail);
|
assemble.setPreMsg("搬运更新字段出现错误" );
|
assemble.updateById();
|
return result;
|
}
|
Boolean audit = menuMapping.getAudit();
|
if (audit == null){
|
//默认需要审批
|
audit = true;
|
}
|
// 处理关联人
|
|
|
if (audit) {
|
String chargeId = menuMapping.getChargeId();
|
TUser user = DbUtils.getUserById(chargeId);
|
if (user == null) {
|
return Result.error(new CodeMsg(6009, "找不到对应的负责人:" + chargeId));
|
}
|
activitiService.setUser(user);
|
Flows flows = activitiService.start("process", null, maintain.getId(), ActivitiBusinessType.maintain);
|
assemble.setFlowId(flows.getId()).updateById();
|
maintain.setFlowId(flows.getId());
|
maintain.setDesp("启动汇集流程");
|
maintain.updateById();
|
masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), true);
|
}else {
|
//直接运行
|
Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date());
|
flows.setBusinessType(ActivitiBusinessType.maintain);
|
flows.insert();
|
maintain.setFlowId(flows.getId());
|
maintain.updateById();
|
maintainService.dealFlow(maintain.getId(), flows.getStatus());
|
log.info("flow-maintainService end");
|
viewService.dealFlow(maintain.getId(), flows.getStatus());
|
log.info("flow-viewService end");
|
subscribeService.dealFlow(maintain.getId(), flows.getStatus());
|
log.info("flow-subscribeService end");
|
menuService.dealFlow(maintain.getId(), flows.getStatus(), flows.getUserId());
|
log.info("flow-menuService end");
|
masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false);
|
}
|
return Result.success(null);
|
}catch (Exception e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6009, e.getMessage()));
|
}
|
|
}
|
|
private Result updateParams(SysAssemble assemble) {
|
Connection conn = null;
|
try{
|
String id = assemble.getId();
|
Boolean bigdata = assemble.getBigdata();
|
|
DataSourceInfo dataSourceInfo;
|
if (bigdata) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}
|
conn = dataSourceInfo.conn();
|
List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", id));
|
|
for (SysAssembleParams sysAssembleParams : paramsList) {
|
String updateSql = sysAssembleParams.getUpdateSql();
|
if (StringUtils.isEmpty(updateSql)){
|
continue;
|
}
|
|
PreparedStatement preparedStatement = conn.prepareStatement(updateSql, ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
|
ResultSet resultSet = preparedStatement.executeQuery();
|
//默认只有一个结果
|
if (resultSet.first()) {
|
String val = resultSet.getString(1);
|
sysAssembleParams.setVal(val);
|
sysAssembleParams.updateById();
|
}
|
|
}
|
return Result.success(null) ;
|
}
|
catch (Exception e) {
|
|
return Result.error(new CodeMsg(6006, e.getMessage())) ;
|
}
|
finally {
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6006, e.getMessage())) ;
|
}
|
}
|
}
|
|
}
|
|
private Result checkTempTable(SysAssemble assemble) {
|
try {
|
String menuId = assemble.getMenuId();
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
|
String tableName = menuMapping.getTableName();
|
String tempTableName = Constant.Temp + tableName;
|
Boolean bigdata = assemble.getBigdata();
|
DataSourceInfo dataSourceInfo;
|
if (bigdata) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}
|
boolean exists = dataSourceInfo.checkTableExists(tempTableName);
|
if (!exists){
|
List<String> fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName);
|
if (fieldsFromTable == null) {
|
return Result.error(new CodeMsg(6003, "查询正式表的字段:" + tableName));
|
}
|
dataSourceInfo.createTable(tempTableName, fieldsFromTable);
|
}
|
//drop temp data
|
dataSourceInfo.truncateData(tempTableName);
|
return Result.success(null);
|
}catch (Exception e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6004, e.getMessage()));
|
}
|
}
|
private Result temp2recordUpdate(SysAssemble assemble, Maintain maintain) {
|
Connection conn = null;
|
try {
|
String menuId = assemble.getMenuId();
|
Boolean bigdata = assemble.getBigdata();
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
|
String tableName = menuMapping.getTableName();
|
String tempTableName = Constant.Temp + tableName;
|
String recordTableName = tableName + Constant.RECORD;
|
DataSourceInfo dataSourceInfo;
|
String updateSql = null;
|
if (bigdata) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
updateSql = Constant.MYSQLJoinUpdateSql;
|
}
|
conn = dataSourceInfo.conn();
|
SysAssembleUpdateType updateType = assemble.getUpdateType();
|
List<String> fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName);
|
if (fieldsFromTable.contains(Constant.ID)) {
|
fieldsFromTable.remove(Constant.ID);
|
}
|
|
List<String> fieldFromRecordTable = new ArrayList<>(fieldsFromTable);
|
fieldFromRecordTable.add(Constant.ID);
|
fieldFromRecordTable.add(Constant.STD_ID);
|
fieldFromRecordTable.add(Constant.DEAL);
|
|
String recordFieldStr = StringUtils.join(fieldFromRecordTable, Constant.COMMA);
|
int cnt = 0;
|
if(updateType.equals(SysAssembleUpdateType.Increment)) {
|
String updateFields = assemble.getUpdateFields();
|
String[] split = updateFields.split(Constant.SEMICOLON);
|
ArrayList<String> unionCodeFields = new ArrayList<String>(Arrays.asList(split));
|
String joinStr = getJoinFieldParse(unionCodeFields);
|
List<String> otherFields = fieldsFromTable.stream().filter(s -> !unionCodeFields.contains(s)).filter(s -> !s.equalsIgnoreCase(Constant.ID)).collect(Collectors.toList());
|
if (otherFields.size() != 0) {
|
String insertFieldStr = fieldsFromTable.stream()
|
.map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s))
|
.collect(Collectors.joining(Constant.COMMA));
|
|
String updatedSql = MessageFormat.format(updateSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr);
|
PreparedStatement updatedPreparedStatement = conn.prepareStatement(updatedSql);
|
int updateCnt = updatedPreparedStatement.executeUpdate();
|
cnt = updateCnt;
|
tableInfoMapper.insertMatintainDetailFromTemp(DbUtils.quotedStr(maintain.getId()), maintain.getTableName() + Constant.RECORD , DbUtils.quotedStr(Operate.update.toString()));
|
}
|
}
|
|
//tableInfoMapper.updateStdId( maintain.getTableName() + Constant.RECORD);
|
|
tableInfoMapper.tempDeal(maintain.getTableName() + Constant.RECORD, DbUtils.quotedStr(maintain.getId()));
|
|
assemble.setPreCnt(assemble.getPreCnt() + cnt);
|
assemble.updateById();
|
return Result.success(null);
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6005, e.getMessage()));
|
}
|
finally {
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6005, e.getMessage()));
|
}
|
}
|
}
|
}
|
|
private Result temp2record(SysAssemble assemble) {
|
Connection conn = null;
|
try {
|
String menuId = assemble.getMenuId();
|
Boolean bigdata = assemble.getBigdata();
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
|
String tableName = menuMapping.getTableName();
|
String tempTableName = Constant.Temp + tableName;
|
String recordTableName = tableName + Constant.RECORD;
|
DataSourceInfo dataSourceInfo;
|
String transSql;
|
String updateSql = null;
|
String insertSql = null;
|
if (bigdata) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
transSql = Constant.Temp2RecordHbaseTemplate;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
transSql = Constant.Temp2RecordMySQLTemplate;
|
updateSql = Constant.MYSQLJoinUpdateSql;
|
insertSql = Constant.MYSQLJoinAddSql;
|
}
|
conn = dataSourceInfo.conn();
|
SysAssembleUpdateType updateType = assemble.getUpdateType();
|
List<String> fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName);
|
if (fieldsFromTable.contains(Constant.ID)) {
|
fieldsFromTable.remove(Constant.ID);
|
}
|
List<String> fieldFromRecordTable = new ArrayList<>(fieldsFromTable);
|
fieldFromRecordTable.add(Constant.ID);
|
fieldFromRecordTable.add(Constant.STD_ID);
|
fieldFromRecordTable.add(Constant.DEAL);
|
|
String recordFieldStr = StringUtils.join(fieldFromRecordTable, Constant.COMMA);
|
String fieldStr = fieldsFromTable.stream()
|
.collect(Collectors.joining(Constant.COMMA));
|
|
int cnt = 0;
|
if (updateType.equals(SysAssembleUpdateType.All)) {
|
String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName);
|
XxlJobLogger.log("update sql:" + sql);
|
PreparedStatement preparedStatement = conn.prepareStatement(sql);
|
cnt = preparedStatement.executeUpdate();
|
|
}else if(updateType.equals(SysAssembleUpdateType.Increment)) {
|
String updateFields = assemble.getUpdateFields();
|
String[] split = updateFields.split(Constant.SEMICOLON);
|
ArrayList<String> unionCodeFields = new ArrayList<String>(Arrays.asList(split));
|
String joinStr = getJoinFieldParse(unionCodeFields);
|
|
// insert
|
String insertFieldStr = fieldsFromTable.stream()
|
.map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s))
|
.collect(Collectors.joining(Constant.COMMA));
|
|
String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr);
|
log.info(insertSql);
|
XxlJobLogger.log("update Increment sql:" + insertSql);
|
PreparedStatement preparedStatement = conn.prepareStatement(insetSql);
|
int insertCnt = preparedStatement.executeUpdate();
|
cnt += insertCnt;
|
|
}
|
XxlJobLogger.log("update sql cnt:" + cnt);
|
assemble.setPreCnt(cnt);
|
assemble.updateById();
|
return Result.success(null);
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
|
return Result.error(new CodeMsg(6005, e.getMessage()));
|
}
|
finally {
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6005, e.getMessage()));
|
}
|
}
|
}
|
}
|
|
private String getOthetFieldParse(List<String> unionCodeFields) {
|
ContentBuilder builder = new ContentBuilder(Constant.AND);
|
for (String s : unionCodeFields) {
|
builder.append(MessageFormat.format(Constant.MYSQLJoinParse, s));
|
}
|
return builder.toString();
|
}
|
|
private String getJoinFieldParse(ArrayList<String> updateFields) {
|
|
ContentBuilder builder = new ContentBuilder(Constant.AND);
|
for (String s : updateFields) {
|
builder.append(MessageFormat.format(Constant.MYSQLJoinParse, s));
|
}
|
return builder.toString();
|
}
|
|
private Result checkTempData(SysAssemble assemble) {
|
try {
|
SysAssembleCheckType checkType = assemble.getCheckType();
|
if (checkType == null) {
|
return Result.error(new CodeMsg(6009, "规则校验类型为空"));
|
}
|
if (checkType.equals(SysAssembleCheckType.unlimited)) {
|
return Result.success(null);
|
}
|
|
String menuId = assemble.getMenuId();
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
|
String tableName = menuMapping.getTableName();
|
String tempTableName = Constant.Temp + tableName;
|
HashMap<String, Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId());
|
switch (checkType) {
|
case successAdd:
|
String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA));
|
if (fieldResultSet.keySet().contains(false)) {
|
return Result.error(new CodeMsg(6009, "规则校验不通过 字段:" + unSuccessFields));
|
}
|
break;
|
case partSuccessAdd:
|
String checkFields = assemble.getCheckFields();
|
String[] split = checkFields.split(Constant.SEMICOLON);
|
for (String s : split) {
|
Boolean checked = fieldResultSet.get(s);
|
if (checked == null || !checked) {
|
return Result.error(new CodeMsg(6009, "规则校验不通过 字段:" + s));
|
}
|
}
|
break;
|
}
|
|
|
return Result.success(null);
|
} catch (Exception e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(3009, e.getMessage()));
|
}
|
}
|
|
|
private Result purgeData(String purgeSql, String assembleId, boolean bigData) {
|
Connection connection = null;
|
Savepoint purge = null;
|
try {
|
DataSourceInfo dataSourceInfo = null;
|
if (bigData) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}
|
connection = dataSourceInfo.conn();
|
connection.setAutoCommit(false);
|
purge = connection.setSavepoint("purge");
|
List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", assembleId));
|
HashMap<String, String> paramsMap = new HashMap<>();
|
Set<String> matcher = DbUtils.matcher(purgeSql);
|
for (String s : matcher) {
|
List<SysAssembleParams> collect = paramsList.stream().filter(sysAssembleParams -> sysAssembleParams.getCode().equalsIgnoreCase(s)).collect(Collectors.toList());
|
if (collect.size() == 0) {
|
return Result.error(new CodeMsg(6006, "未匹配到参数:" + s));
|
}else if (collect.size() > 1) {
|
return Result.error(new CodeMsg(6006, "匹配到多个参数:" + s));
|
}else {
|
SysAssembleParams sysAssembleParams = collect.get(0);
|
String val = sysAssembleParams.getVal();
|
if (StringUtils.isEmpty(val)) {
|
String initSql = sysAssembleParams.getInitSql();
|
if (StringUtils.isEmpty(initSql)) {
|
return Result.error(new CodeMsg(6006, "初始化参数:" + s + "失败"));
|
}
|
PreparedStatement preparedStatement = connection.prepareStatement(initSql);
|
ResultSet resultSet = preparedStatement.executeQuery();
|
//默认只有一个结果
|
val = resultSet.getString(1);
|
}
|
paramsMap.put(s, val);
|
}
|
}
|
for (String key : paramsMap.keySet()) {
|
String val = paramsMap.get(key);
|
String preParams = MessageFormat.format(Constant.ParamsShell, key);
|
purgeSql = purgeSql.replace(preParams, val);
|
}
|
XxlJobLogger.log("purgeSql:" +purgeSql);
|
List<String> split = DbUtils.split(purgeSql, Constant.SEMICOLON);
|
XxlJobLogger.log("split size:" + split.size());
|
PreparedStatement preparedStatement = null;
|
if (!split.isEmpty()) {
|
for (String oneSql : split) {
|
XxlJobLogger.log("one sql:" + oneSql);
|
if (preparedStatement == null) {
|
preparedStatement = connection.prepareStatement(oneSql);
|
preparedStatement.addBatch(oneSql);
|
} else {
|
preparedStatement.addBatch(oneSql);
|
}
|
}
|
}
|
int[] ints = preparedStatement.executeBatch();
|
for (int i = 0; i < ints.length; i++) {
|
XxlJobLogger.log("sqls count:" + ints[i]);
|
}
|
|
connection.commit();
|
|
return Result.success(null);
|
|
} catch (SQLException e) {
|
e.printStackTrace();
|
if (connection != null) {
|
try {
|
if (purge != null) {
|
connection.rollback(purge);
|
} else {
|
connection.rollback();
|
}
|
|
} catch (SQLException e1) {
|
e1.printStackTrace();
|
return Result.error(new CodeMsg(6004, "回滚sql出错," + e.getMessage()));
|
}
|
}
|
return Result.error(new CodeMsg(6004, "运行清洗sql出错," + e.getMessage()));
|
} finally {
|
if (connection != null) {
|
try {
|
connection.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6003, "本地connection 无法关闭"));
|
}
|
}
|
}
|
}
|
|
private Result runByApi(String id) {
|
//TODO
|
return Result.success(null);
|
}
|
|
private Result runByDb(SysAssemble assemble) {
|
try {
|
String id = assemble.getId();
|
Boolean bigdata = assemble.getBigdata();
|
|
List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true));
|
for (SysAssembleDb sysAssembleDb : dbList) {
|
String dbId = sysAssembleDb.getId();
|
Result result = loadOneDb(sysAssembleDb, bigdata, id);
|
XxlJobLogger.log("assemble db load one db:" + sysAssembleDb.getDatasourceUrl());
|
if (!result.getSuccess()) {
|
assemble.setPreMsg("导入源数据错误:" + sysAssembleDb.getDatabaseName()).updateById();
|
return result;
|
}
|
}
|
return Result.success(null);
|
} catch (Exception e) {
|
return Result.error(new CodeMsg(3009, e.getMessage()));
|
}
|
}
|
|
private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) {
|
String dbId = sysAssembleDb.getId();
|
Connection conn = dbService.getConnection(dbId);
|
if (conn == null) {
|
return Result.error(new CodeMsg(6002, MessageFormat.format("未能连接到源 id: {0}", dbId)));
|
}
|
Connection dataSourceConn = null;
|
PreparedStatement statement = null;
|
try {
|
List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true));
|
for (SysAssembleDbTable dbTable : tableList) {
|
String tableId = dbTable.getId();
|
String tempTableName = dbTable.getTempTableName();
|
XxlJobLogger.log("assemble table" + tempTableName);
|
List<SysAssembleDbField> fieldList = fieldService.selectList(new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId));
|
String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
|
XxlJobLogger.log("assemble fields" + fields);
|
SysAssembleTableType type = dbTable.getType();
|
String tableName = null;
|
String assembleTempTableName = tempTableName;
|
if (type.equals(SysAssembleTableType.sql)) {
|
tableName = dbTable.getSql();
|
} else if (type.equals(SysAssembleTableType.table)) {
|
tableName = dbTable.getTableName();
|
}
|
XxlJobLogger.log("assemble source table:" + tableName);
|
//TODO assembleTempTableName 可能会超长 64字节 后续修改
|
DataSourceInfo dataSourceInfo = null;
|
if (!bigData) {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}
|
|
// drop temp data
|
boolean b = dataSourceInfo.truncateData(tempTableName);
|
XxlJobLogger.log("assemble truncateData :" + b);
|
fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId);
|
XxlJobLogger.log("assemble fixAssembleTempTable :" );
|
//checkAssembleTempTableExists(assembleTempTableName);
|
String filter = dbTable.getFilter();
|
XxlJobLogger.log("assemble raw filter:" + filter);
|
if (!StringUtils.isEmpty(filter)) {
|
Set<String> matcher = DbUtils.matcher(filter);
|
for (String code : matcher) {
|
SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code));
|
if (sysAssembleParams == null){
|
return Result.error(new CodeMsg(6009, assembleId + "有变量未匹配到:"+ code));
|
}
|
String val = sysAssembleParams.getVal();
|
if (StringUtils.isEmpty(val)) {
|
return Result.error(new CodeMsg(6009, assembleId + "有变量未获取到值:"+ code));
|
}
|
val = DbUtils.quotedStr(val);
|
filter = filter.replace(DbUtils.assemblParam(code), val);
|
}
|
}else {
|
filter = Constant.WHERE_DEFAULT;
|
}
|
|
XxlJobLogger.log("assemble db filter:" + filter);
|
String runSqlTemplate = null;
|
if (type.equals(SysAssembleTableType.table)){
|
runSqlTemplate = Constant.selectFieldTableTemplate;
|
}else if(type.equals(SysAssembleTableType.sql)){
|
runSqlTemplate = Constant.selectFieldSqlTemplate;
|
}
|
XxlJobLogger.log("assemble db select sql template :" + runSqlTemplate);
|
String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter);
|
XxlJobLogger.log("assemble db select sql:" + sql);
|
PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
ps.setFetchSize(Integer.MIN_VALUE);
|
ps.setFetchDirection(ResultSet.FETCH_REVERSE);
|
ResultSet resultSet = ps.executeQuery();
|
ResultSetMetaData metaData = resultSet.getMetaData();
|
int columnCount = metaData.getColumnCount();
|
|
String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fieldList);
|
XxlJobLogger.log("assemble db insert sql:" + insertMySqlSql);
|
int cnt = 0;
|
dataSourceConn = dataSourceInfo.conn();
|
dataSourceConn.setAutoCommit(false);
|
statement = dataSourceConn.prepareStatement(insertMySqlSql);
|
int count = 0;
|
while (resultSet.next()) {
|
count++;
|
for (int i = 1; i <= columnCount; i++) {
|
statement.setString(i, resultSet.getString(i));
|
}
|
statement.addBatch();
|
cnt++;
|
if (cnt == 5000) {
|
statement.executeBatch();
|
}
|
}
|
statement.executeBatch();
|
XxlJobLogger.log("assemble db select sql count:" + count);
|
}
|
|
dataSourceConn.commit();
|
return Result.success(null);
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
return Result.error(new CodeMsg(6004, e.getMessage()));
|
}finally {
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
if (statement != null) {
|
try {
|
statement.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
if (dataSourceConn != null) {
|
try {
|
dataSourceConn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
|
}
|
|
private void setOneBatch(boolean bigData, String insertMySqlSql, List<List<String>> branchList) {
|
Connection conn = null;
|
try {
|
DataSourceInfo dataSourceInfo = null;
|
if (bigData) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}
|
|
conn = dataSourceInfo.conn();
|
PreparedStatement statement = conn.prepareStatement(insertMySqlSql);
|
int result = 0;
|
if (branchList.isEmpty()) {
|
return;
|
}
|
for (List<String> stringList : branchList) {
|
for (int i = 0; i < stringList.size(); i++) {
|
statement.setString(i + 1, stringList.get(i));
|
}
|
statement.addBatch();
|
}
|
statement.executeBatch();
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
finally {
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
}
|
|
private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException {
|
byte[] bytes = builder.toString().getBytes("UTF-8");
|
if (bytes.length > 0) {
|
InputStream stream = new ByteArrayInputStream(bytes);
|
bulkLoadFromInputStream(insertMySqlSql, stream, bigData);
|
}
|
//清空bulider
|
builder.delete(0, builder.length());
|
}
|
|
private void fixAssembleTempTable(DataSourceInfo dataSourceInfo, String assembleTempTableName, List<SysAssembleDbField> dbFieldList, String dbId) {
|
boolean exists = dataSourceInfo.checkTableExists(assembleTempTableName);
|
List<String> fieldList = dbFieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.toList());
|
if (!exists) {
|
dataSourceInfo.createTable(assembleTempTableName, fieldList);
|
}else {
|
//check 字段是否相同
|
boolean checked = dataSourceInfo.checkFieldList(assembleTempTableName, fieldList);
|
if (!checked){
|
assembleTempTableName = reSetTableName(assembleTempTableName, dbId);
|
dataSourceInfo.createTable(assembleTempTableName, fieldList);
|
|
}
|
}
|
}
|
|
private String reSetTableName(String assembleTempTableName, String tableId) {
|
//TODO Db
|
SysAssembleDbTable dbTable = tableService.selectById(tableId);
|
String suffix = null;
|
suffix = DbUtils.getUUID(16);
|
String tempTableName = null;
|
if (dbTable.getType().equals(SysAssembleTableType.table)) {
|
tempTableName = Constant.AssembleTempTable + dbTable.getTableName() + suffix;
|
}else {
|
tempTableName = Constant.AssembleTempSql+ DbUtils.getUUID(5)+ suffix;
|
}
|
|
dbTable.setTempTableName(tempTableName).updateById();
|
return tempTableName;
|
}
|
|
public void builderEnd(StringBuilder builder, Object object) {
|
builder.append(object);
|
builder.append("\n");
|
}
|
|
|
public void builderAppend(StringBuilder builder, Object object) {
|
builder.append(object);
|
builder.append(",");
|
}
|
|
|
public String assembleSql(String dataBaseName, String tableName, List<SysAssembleDbField> fieldList) {
|
String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
|
ContentBuilder builder = new ContentBuilder(Constant.COMMA);
|
for (int i = 0; i < fieldList.size(); i++) {
|
builder.append(Constant.QUESTION);
|
}
|
String sql = "INSERT INTO " + dataBaseName + "." + tableName + "(" + fields + ") values(" + builder.toString() + ")";
|
return sql;
|
}
|
|
private int bulkLoadFromInputStream(String sql, InputStream dataStream, boolean bigData) {
|
if (null == dataStream) {
|
log.error("输入流为NULL,没有数据导入。");
|
return 0;
|
}
|
Connection conn = null;
|
try {
|
DataSourceInfo dataSourceInfo = null;
|
if (bigData) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}
|
|
conn = dataSourceInfo.conn();
|
PreparedStatement statement = conn.prepareStatement(sql);
|
int result = 0;
|
if (statement.isWrapperFor(java.sql.Statement.class)) {
|
// com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
|
// mysqlStatement.setLocalInfileInputStream(dataStream);
|
// statement.
|
// result = mysqlStatement.executeUpdate();
|
}
|
return result;
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
finally {
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
return 0;
|
}
|
}
|