package com.highdatas.mdm.service.impl;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.service.impl.ServiceImpl;
import com.highdatas.mdm.entity.*;
import com.highdatas.mdm.mapper.SysAssembleMapper;
import com.highdatas.mdm.mapper.TableInfoMapper;
import com.highdatas.mdm.pojo.*;
import com.highdatas.mdm.pojo.kettle.BigDataDataSourceInfo;
import com.highdatas.mdm.pojo.kettle.DataSourceInfo;
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
import com.highdatas.mdm.service.*;
import com.highdatas.mdm.util.Constant;
import com.highdatas.mdm.util.ContentBuilder;
import com.highdatas.mdm.util.DbUtils;
import com.highdatas.mdm.util.RuleClient;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.sql.*;
import java.text.MessageFormat;
import java.util.*;
import java.util.Date;
import java.util.stream.Collectors;
/**
*
* 服务实现类
*
*
* @author kimi
* @since 2020-02-20
*/
@Slf4j
@Service
public class SysAssembleServiceImpl extends ServiceImpl implements ISysAssembleService {
@Autowired
ISysAssembleRuleService ruleService;
@Autowired
ISysAssembleDbService dbService;
@Autowired
ISysAssembleApiService apiService;
@Autowired
ISysAssembleDbTableService tableService;
@Autowired
ISysAssembleDbFieldService fieldService;
@Autowired
ISysAssembleParamsService paramsService;
@Autowired
ISysAssembleLogService logService;
@Autowired
IMenuMappingService menuMappingService;
@Autowired
ActivitiService activitiService;
@Autowired
IMaintainService maintainService;
@Autowired
IFlowsService flowsService;
@Resource
SqlSessionTemplate sqlSessionTemplate;
@Autowired
TableInfoMapper tableInfoMapper;
@Autowired
UnBigDataDataSourceInfo unBigDataDataSourceInfo;
@Autowired
BigDataDataSourceInfo bigDataDataSourceInfo;
@Autowired
MasterDataService masterDataService;
@Autowired
RuleClient ruleClient;
@Autowired
IMasterModifiedService masterModifiedService;
@Override
public Result run(String id) {
if (StringUtils.isEmpty(id)) {
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
}
SysAssemble assemble = selectById(id);
if (assemble == null) {
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
}
assemble.setPreTime(new Date());
assemble.setPreStatus(SysAssembleRunStatus.working);
assemble.updateById();
if (!assemble.getStatus().equals(SysAssembleStatus.working)) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreCnt(0);
assemble.setPreMsg("当前任务不在工作中,状态:" + assemble.getStatus());
assemble.updateById();
return Result.error(new CodeMsg(6009,"当前任务不在工作中,状态:" + assemble.getStatus()));
}
String flowId = assemble.getFlowId();
if (!StringUtils.isEmpty(flowId)){
Flows flows = flowsService.selectById(flowId);
if (flows != null) {
ActivitiStatus status = flows.getStatus();
if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreCnt(0);
assemble.setPreMsg("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString());
assemble.updateById();
return Result.error(new CodeMsg(6009,"当前有流程正在运行,暂时无法汇集下次数据:" + status.toString()));
}
}
}
Boolean bigData = assemble.getBigdata();
//1 load from db
try{
Result result = runByDb(assemble);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.updateById();
return result;
}
//2 load from api
result = runByApi(id);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.updateById();
return result;
}
//3check temp table;
result = checkTempTable(assemble);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("检测主数据临时表出现错误");
assemble.updateById();
return result;
}
//4 purge data
String purgeSql = assemble.getPurgeSql();
purgeSql = DbUtils.replaceEscape(purgeSql);
result = purgeData(purgeSql, assemble.getId(), bigData);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("运行清洗sql出现错误: "+purgeSql );
assemble.updateById();
return result;
}
//5 check temp data
result = checkTempData(assemble);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("质量检验环节出现错误" );
assemble.updateById();
return result;
}
//6 temp 2 record
result = temp2record(assemble);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("数据搬运至主数据记录环节出现错误" );
assemble.updateById();
return result;
}
//7 record 链接 版本
result = linkMaintain(assemble);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("联接版本出现错误" );
assemble.updateById();
return result;
}
//todo 添加事务 log记录
//8 更新 参数
result = updateParams(assemble);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("更新变量环节出现错误" );
}else {
assemble.setPreMsg("汇集成功");
assemble.setPreStatus(SysAssembleRunStatus.success);
}
assemble.updateById();
return result;
}
catch (Exception e) {
assemble.setPreMsg("汇集任务出现错误");
assemble.setPreStatus(SysAssembleRunStatus.fail).updateById();
e.printStackTrace();
return Result.error(new CodeMsg(6009, e.getMessage()));
}
}
private Result linkMaintain(SysAssemble assemble) {
try{
String menuId = assemble.getMenuId();
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", menuId));
String tableName = menuMapping.getTableName();
SysAssembleUpdateType updateType = assemble.getUpdateType();
boolean isBigVersion = false;
if (updateType.equals(SysAssembleUpdateType.All)) {
isBigVersion = true;
}
Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion);
Result result = temp2recordUpdate(assemble, maintain);
if (!result.getSuccess()) {
assemble.setPreStatus(SysAssembleRunStatus.fail);
assemble.setPreMsg("搬运更新字段出现错误" );
assemble.updateById();
return result;
}
Boolean audit = menuMapping.getAudit();
if (audit == null){
//默认需要审批
audit = true;
}
// 处理关联人
if (audit) {
String chargeId = menuMapping.getChargeId();
TUser user = DbUtils.getUserById(chargeId);
if (user == null) {
return Result.error(new CodeMsg(6009, "找不到对应的负责人:" + chargeId));
}
activitiService.setUser(user);
Flows flows = activitiService.start("process", null, maintain.getId(), ActivitiBusinessType.maintain);
assemble.setFlowId(flows.getId()).updateById();
maintain.setFlowId(flows.getId());
maintain.setDesp("启动汇集流程");
maintain.updateById();
masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), true);
}else {
//直接运行
Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date());
flows.insert();
maintain.setFlowId(flows.getId());
maintain.updateById();
maintainService.dealFlow(maintain.getId(), ActivitiStatus.open);
masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false);
}
return Result.success(null);
}catch (Exception e) {
e.printStackTrace();
return Result.error(new CodeMsg(6009, e.getMessage()));
}
}
private Result updateParams(SysAssemble assemble) {
Connection conn = null;
try{
String id = assemble.getId();
Boolean bigdata = assemble.getBigdata();
DataSourceInfo dataSourceInfo;
if (bigdata) {
dataSourceInfo = bigDataDataSourceInfo;
}else {
dataSourceInfo = unBigDataDataSourceInfo;
}
conn = dataSourceInfo.conn();
List paramsList = paramsService.selectList(new EntityWrapper().eq("parent_id", id));
for (SysAssembleParams sysAssembleParams : paramsList) {
String updateSql = sysAssembleParams.getUpdateSql();
if (StringUtils.isEmpty(updateSql)){
continue;
}
PreparedStatement preparedStatement = conn.prepareStatement(updateSql, ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY);
ResultSet resultSet = preparedStatement.executeQuery();
//默认只有一个结果
if (resultSet.first()) {
String val = resultSet.getString(1);
sysAssembleParams.setVal(val);
sysAssembleParams.updateById();
}
}
return Result.success(null) ;
}
catch (Exception e) {
return Result.error(new CodeMsg(6006, e.getMessage())) ;
}
finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
return Result.error(new CodeMsg(6006, e.getMessage())) ;
}
}
}
}
private Result checkTempTable(SysAssemble assemble) {
try {
String menuId = assemble.getMenuId();
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", menuId));
String tableName = menuMapping.getTableName();
String tempTableName = Constant.Temp + tableName;
Boolean bigdata = assemble.getBigdata();
DataSourceInfo dataSourceInfo;
if (bigdata) {
dataSourceInfo = bigDataDataSourceInfo;
}else {
dataSourceInfo = unBigDataDataSourceInfo;
}
boolean exists = dataSourceInfo.checkTableExists(tempTableName);
if (!exists){
List fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName);
if (fieldsFromTable == null) {
return Result.error(new CodeMsg(6003, "查询正式表的字段:" + tableName));
}
dataSourceInfo.createTable(tempTableName, fieldsFromTable);
}
//drop temp data
dataSourceInfo.truncateData(tempTableName);
return Result.success(null);
}catch (Exception e) {
e.printStackTrace();
return Result.error(new CodeMsg(6004, e.getMessage()));
}
}
private Result temp2recordUpdate(SysAssemble assemble, Maintain maintain) {
Connection conn = null;
try {
String menuId = assemble.getMenuId();
Boolean bigdata = assemble.getBigdata();
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", menuId));
String tableName = menuMapping.getTableName();
String tempTableName = Constant.Temp + tableName;
String recordTableName = tableName + Constant.RECORD;
DataSourceInfo dataSourceInfo;
String updateSql = null;
if (bigdata) {
dataSourceInfo = bigDataDataSourceInfo;
}else {
dataSourceInfo = unBigDataDataSourceInfo;
updateSql = Constant.MYSQLJoinUpdateSql;
}
conn = dataSourceInfo.conn();
SysAssembleUpdateType updateType = assemble.getUpdateType();
List fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName);
if (fieldsFromTable.contains(Constant.ID)) {
fieldsFromTable.remove(Constant.ID);
}
List fieldFromRecordTable = new ArrayList<>(fieldsFromTable);
fieldFromRecordTable.add(Constant.ID);
fieldFromRecordTable.add(Constant.STD_ID);
fieldFromRecordTable.add(Constant.DEAL);
String recordFieldStr = StringUtils.join(fieldFromRecordTable, Constant.COMMA);
int cnt = 0;
if(updateType.equals(SysAssembleUpdateType.Increment)) {
String updateFields = assemble.getUpdateFields();
String[] split = updateFields.split(Constant.SEMICOLON);
ArrayList unionCodeFields = new ArrayList(Arrays.asList(split));
String joinStr = getJoinFieldParse(unionCodeFields);
List otherFields = fieldsFromTable.stream().filter(s -> !unionCodeFields.contains(s)).filter(s -> !s.equalsIgnoreCase(Constant.ID)).collect(Collectors.toList());
if (otherFields.size() != 0) {
String insertFieldStr = fieldsFromTable.stream()
.map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s))
.collect(Collectors.joining(Constant.COMMA));
String updatedSql = MessageFormat.format(updateSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr);
PreparedStatement updatedPreparedStatement = conn.prepareStatement(updatedSql);
int updateCnt = updatedPreparedStatement.executeUpdate();
cnt = updateCnt;
tableInfoMapper.insertMatintainDetailFromTemp(DbUtils.quotedStr(maintain.getId()), maintain.getTableName() + Constant.RECORD , DbUtils.quotedStr(Operate.update.toString()));
}
}
//tableInfoMapper.updateStdId( maintain.getTableName() + Constant.RECORD);
tableInfoMapper.tempDeal(maintain.getTableName() + Constant.RECORD, DbUtils.quotedStr(maintain.getId()));
assemble.setPreCnt(assemble.getPreCnt() + cnt);
assemble.updateById();
return Result.success(null);
}
catch (Exception e) {
e.printStackTrace();
return Result.error(new CodeMsg(6005, e.getMessage()));
}
finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
return Result.error(new CodeMsg(6005, e.getMessage()));
}
}
}
}
private Result temp2record(SysAssemble assemble) {
Connection conn = null;
try {
String menuId = assemble.getMenuId();
Boolean bigdata = assemble.getBigdata();
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", menuId));
String tableName = menuMapping.getTableName();
String tempTableName = Constant.Temp + tableName;
String recordTableName = tableName + Constant.RECORD;
DataSourceInfo dataSourceInfo;
String transSql;
String updateSql = null;
String insertSql = null;
if (bigdata) {
dataSourceInfo = bigDataDataSourceInfo;
transSql = Constant.Temp2RecordHbaseTemplate;
}else {
dataSourceInfo = unBigDataDataSourceInfo;
transSql = Constant.Temp2RecordMySQLTemplate;
updateSql = Constant.MYSQLJoinUpdateSql;
insertSql = Constant.MYSQLJoinAddSql;
}
conn = dataSourceInfo.conn();
SysAssembleUpdateType updateType = assemble.getUpdateType();
List fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName);
if (fieldsFromTable.contains(Constant.ID)) {
fieldsFromTable.remove(Constant.ID);
}
List fieldFromRecordTable = new ArrayList<>(fieldsFromTable);
fieldFromRecordTable.add(Constant.ID);
fieldFromRecordTable.add(Constant.STD_ID);
fieldFromRecordTable.add(Constant.DEAL);
String recordFieldStr = StringUtils.join(fieldFromRecordTable, Constant.COMMA);
String fieldStr = fieldsFromTable.stream()
.collect(Collectors.joining(Constant.COMMA));
int cnt = 0;
if (updateType.equals(SysAssembleUpdateType.All)) {
String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName);
PreparedStatement preparedStatement = conn.prepareStatement(sql);
cnt = preparedStatement.executeUpdate();
}else if(updateType.equals(SysAssembleUpdateType.Increment)) {
String updateFields = assemble.getUpdateFields();
String[] split = updateFields.split(Constant.SEMICOLON);
ArrayList unionCodeFields = new ArrayList(Arrays.asList(split));
String joinStr = getJoinFieldParse(unionCodeFields);
// insert
String insertFieldStr = fieldsFromTable.stream()
.map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s))
.collect(Collectors.joining(Constant.COMMA));
String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr);
PreparedStatement preparedStatement = conn.prepareStatement(insetSql);
int insertCnt = preparedStatement.executeUpdate();
cnt += insertCnt;
}
assemble.setPreCnt(cnt);
assemble.updateById();
return Result.success(null);
}
catch (Exception e) {
e.printStackTrace();
return Result.error(new CodeMsg(6005, e.getMessage()));
}
finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
return Result.error(new CodeMsg(6005, e.getMessage()));
}
}
}
}
private String getOthetFieldParse(List unionCodeFields) {
ContentBuilder builder = new ContentBuilder(Constant.AND);
for (String s : unionCodeFields) {
builder.append(MessageFormat.format(Constant.MYSQLJoinParse, s));
}
return builder.toString();
}
private String getJoinFieldParse(ArrayList updateFields) {
ContentBuilder builder = new ContentBuilder(Constant.AND);
for (String s : updateFields) {
builder.append(MessageFormat.format(Constant.MYSQLJoinParse, s));
}
return builder.toString();
}
private Result checkTempData(SysAssemble assemble) {
//TODO
SysAssembleCheckType checkType = assemble.getCheckType();
if (checkType == null) {
return Result.error(new CodeMsg(6009,"规则校验类型为空"));
}
if (checkType.equals(SysAssembleCheckType.unlimited)) {
return Result.success(null);
}
String menuId = assemble.getMenuId();
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", menuId));
String tableName = menuMapping.getTableName();
String tempTableName = Constant.Temp + tableName;
HashMap fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId());
switch (checkType){
case successAdd:
String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA));
if (fieldResultSet.keySet().contains(false)) {
return Result.error(new CodeMsg(6009,"规则校验不通过 字段:" + unSuccessFields));
}
break;
case partSuccessAdd:
String checkFields = assemble.getCheckFields();
String[] split = checkFields.split(Constant.SEMICOLON);
for (String s : split) {
Boolean checked = fieldResultSet.get(s);
if (checked == null || !checked) {
return Result.error(new CodeMsg(6009,"规则校验不通过 字段:" + s));
}
}
break;
}
return Result.success(null);
}
private Result purgeData(String purgeSql, String assembleId, boolean bigData) {
Connection connection = null;
try {
DataSourceInfo dataSourceInfo = null;
if (bigData) {
dataSourceInfo = bigDataDataSourceInfo;
}else {
dataSourceInfo = unBigDataDataSourceInfo;
}
connection = dataSourceInfo.conn();
List paramsList = paramsService.selectList(new EntityWrapper().eq("parent_id", assembleId));
HashMap paramsMap = new HashMap<>();
Set matcher = DbUtils.matcher(purgeSql);
for (String s : matcher) {
List collect = paramsList.stream().filter(sysAssembleParams -> sysAssembleParams.getCode().equalsIgnoreCase(s)).collect(Collectors.toList());
if (collect.size() == 0) {
return Result.error(new CodeMsg(6006, "未匹配到参数:" + s));
}else if (collect.size() > 1) {
return Result.error(new CodeMsg(6006, "匹配到多个参数:" + s));
}else {
SysAssembleParams sysAssembleParams = collect.get(0);
String val = sysAssembleParams.getVal();
if (StringUtils.isEmpty(val)) {
String initSql = sysAssembleParams.getInitSql();
if (StringUtils.isEmpty(initSql)) {
return Result.error(new CodeMsg(6006, "初始化参数:" + s + "失败"));
}
PreparedStatement preparedStatement = connection.prepareStatement(initSql);
ResultSet resultSet = preparedStatement.executeQuery();
//默认只有一个结果
val = resultSet.getString(1);
}
paramsMap.put(s, val);
}
}
for (String key : paramsMap.keySet()) {
String val = paramsMap.get(key);
String preParams = MessageFormat.format(Constant.ParamsShell, key);
purgeSql = purgeSql.replace(preParams, val);
}
PreparedStatement preparedStatement = connection.prepareStatement(purgeSql);
preparedStatement.execute();
return Result.success(null);
} catch (SQLException e) {
e.printStackTrace();
return Result.error(new CodeMsg(6004, "运行清洗sql出错," + e.getSQLState()));
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
e.printStackTrace();
return Result.error(new CodeMsg(6003, "本地connection 无法关闭"));
}
}
}
}
private Result runByApi(String id) {
//TODO
return Result.success(null);
}
private Result runByDb(SysAssemble assemble) {
String id = assemble.getId();
Boolean bigdata = assemble.getBigdata();
List dbList = dbService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, id).eq(Constant.Active, true));
for (SysAssembleDb sysAssembleDb : dbList) {
String dbId = sysAssembleDb.getId();
Result result = loadOneDb(sysAssembleDb, bigdata, id);
if (!result.getSuccess()) {
assemble.setPreMsg("导入源数据错误:" + sysAssembleDb.getDatabaseName()).updateById();
return result;
}
}
return Result.success(null);
}
private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) {
String dbId = sysAssembleDb.getId();
Connection conn = dbService.getConnection(dbId);
if (conn == null) {
return Result.error(new CodeMsg(6002, MessageFormat.format("未能连接到源 id: {0}", dbId)));
}
try {
List tableList = tableService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true));
for (SysAssembleDbTable dbTable : tableList) {
String tableId = dbTable.getId();
String tempTableName = dbTable.getTempTableName();
List fieldList = fieldService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, tableId));
String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
SysAssembleTableType type = dbTable.getType();
String tableName = null;
String assembleTempTableName = tempTableName;
if (type.equals(SysAssembleTableType.sql)) {
tableName = dbTable.getSql();
} else if (type.equals(SysAssembleTableType.table)) {
tableName = dbTable.getTableName();
}
//TODO assembleTempTableName 可能会超长 64字节 后续修改
DataSourceInfo dataSourceInfo = null;
if (!bigData) {
dataSourceInfo = unBigDataDataSourceInfo;
}else {
dataSourceInfo = bigDataDataSourceInfo;
}
// drop temp data
dataSourceInfo.truncateData(tempTableName);
fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId);
//checkAssembleTempTableExists(assembleTempTableName);
//TODO 未分页
String filter = dbTable.getFilter();
Set matcher = DbUtils.matcher(filter);
for (String code : matcher) {
SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code));
if (sysAssembleParams == null){
return Result.error(new CodeMsg(6009, assembleId + "有变量未匹配到:"+ code));
}
String val = sysAssembleParams.getVal();
if (StringUtils.isEmpty(val)) {
return Result.error(new CodeMsg(6009, assembleId + "有变量未获取到值:"+ code));
}
val = DbUtils.quotedStr(val);
filter = filter.replace(DbUtils.assemblParam(code), val);
}
if (StringUtils.isEmpty(filter)) {
filter = Constant.WHERE_DEFAULT;
}
String runSqlTemplate = null;
if (type.equals(SysAssembleTableType.table)){
runSqlTemplate = Constant.selectFieldTableTemplate;
}else if(type.equals(SysAssembleTableType.sql)){
runSqlTemplate = Constant.selectFieldSqlTemplate;
}
String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter);
PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ps.setFetchSize(Integer.MIN_VALUE);
ps.setFetchDirection(ResultSet.FETCH_REVERSE);
ResultSet resultSet = ps.executeQuery();
ResultSetMetaData metaData = resultSet.getMetaData();
int columnCount = metaData.getColumnCount();
String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fields);
int cnt = 0;
StringBuilder builder = new StringBuilder();
while (resultSet.next()) {
for (int i = 1; i <= columnCount; i++) {
if (i == columnCount) {
builderEnd(builder, resultSet.getObject(i));
}else {
builderAppend(builder, resultSet.getObject(i));
}
}
cnt++;
if (cnt == 5000) {
runOneBatch(bigData, insertMySqlSql, builder);
}
}
if (builder.length() > 0) {
runOneBatch(bigData, insertMySqlSql, builder);
}
}
return Result.success(null);
}
catch (Exception e) {
e.printStackTrace();
return Result.error(new CodeMsg(6004, e.getMessage()));
}finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException {
byte[] bytes = builder.toString().getBytes("UTF-8");
if (bytes.length > 0) {
InputStream stream = new ByteArrayInputStream(bytes);
bulkLoadFromInputStream(insertMySqlSql, stream, bigData);
}
//清空bulider
builder.delete(0, builder.length());
}
private void fixAssembleTempTable(DataSourceInfo dataSourceInfo, String assembleTempTableName, List dbFieldList, String dbId) {
boolean exists = dataSourceInfo.checkTableExists(assembleTempTableName);
List fieldList = dbFieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.toList());
if (!exists) {
dataSourceInfo.createTable(assembleTempTableName, fieldList);
}else {
//check 字段是否相同
boolean checked = dataSourceInfo.checkFieldList(assembleTempTableName, fieldList);
if (!checked){
assembleTempTableName = reSetTableName(assembleTempTableName, dbId);
dataSourceInfo.createTable(assembleTempTableName, fieldList);
}
}
}
private String reSetTableName(String assembleTempTableName, String tableId) {
//TODO Db
SysAssembleDbTable dbTable = tableService.selectById(tableId);
String suffix = null;
suffix = DbUtils.getUUID(16);
String tempTableName = null;
if (dbTable.getType().equals(SysAssembleTableType.table)) {
tempTableName = Constant.AssembleTempTable + dbTable.getTableName() + suffix;
}else {
tempTableName = Constant.AssembleTempSql+ DbUtils.getUUID(5)+ suffix;
}
dbTable.setTempTableName(tempTableName).updateById();
return tempTableName;
}
public void builderEnd(StringBuilder builder, Object object) {
builder.append(object);
builder.append("\n");
}
public void builderAppend(StringBuilder builder, Object object) {
builder.append(object);
builder.append("\t");
}
public String assembleSql(String dataBaseName, String tableName, String fields) {
String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + "(" + fields + ")";
return sql;
}
private int bulkLoadFromInputStream(String sql, InputStream dataStream, boolean bigData) {
if (null == dataStream) {
log.error("输入流为NULL,没有数据导入。");
return 0;
}
Connection conn = null;
try {
DataSourceInfo dataSourceInfo = null;
if (bigData) {
dataSourceInfo = bigDataDataSourceInfo;
}else {
dataSourceInfo = unBigDataDataSourceInfo;
}
conn = dataSourceInfo.conn();
PreparedStatement statement = conn.prepareStatement(sql);
int result = 0;
if (statement.isWrapperFor(java.sql.Statement.class)) {
com.mysql.cj.jdbc.PreparedStatement mysqlStatement = statement.unwrap( com.mysql.cj.jdbc.PreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(dataStream);
result = mysqlStatement.executeUpdate();
}
return result;
}
catch (Exception e) {
e.printStackTrace();
}
finally {
if (conn != null) {
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
return 0;
}
}