package com.highdatas.mdm.service.impl; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.baomidou.mybatisplus.service.impl.ServiceImpl; import com.highdatas.mdm.entity.*; import com.highdatas.mdm.mapper.SysAssembleMapper; import com.highdatas.mdm.mapper.TableInfoMapper; import com.highdatas.mdm.pojo.*; import com.highdatas.mdm.pojo.kettle.BigDataDataSourceInfo; import com.highdatas.mdm.pojo.kettle.DataSourceInfo; import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo; import com.highdatas.mdm.service.*; import com.highdatas.mdm.util.Constant; import com.highdatas.mdm.util.ContentBuilder; import com.highdatas.mdm.util.DbUtils; import com.highdatas.mdm.util.RuleClient; import com.xxl.job.core.log.XxlJobLogger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.sql.*; import java.text.MessageFormat; import java.util.*; import java.util.Date; import java.util.stream.Collectors; /** * <p> * æœåŠ¡å®žçŽ°ç±» * </p> * * @author kimi * @since 2020-02-20 */ @Slf4j @Service public class SysAssembleServiceImpl extends ServiceImpl<SysAssembleMapper, SysAssemble> implements ISysAssembleService { @Autowired ISysAssembleRuleService ruleService; @Autowired ISysAssembleDbService dbService; @Autowired ISysAssembleApiService apiService; @Autowired ISysAssembleDbTableService tableService; @Autowired ISysAssembleDbFieldService fieldService; @Autowired ISysAssembleParamsService paramsService; @Autowired ISysAssembleLogService logService; @Autowired IMenuMappingService menuMappingService; @Autowired ActivitiService activitiService; @Autowired IMaintainService maintainService; @Autowired IFlowsService flowsService; @Resource SqlSessionTemplate sqlSessionTemplate; @Autowired TableInfoMapper tableInfoMapper; @Autowired UnBigDataDataSourceInfo unBigDataDataSourceInfo; @Autowired BigDataDataSourceInfo bigDataDataSourceInfo; @Autowired MasterDataService masterDataService; @Autowired RuleClient ruleClient; @Autowired IMasterModifiedService masterModifiedService; @Autowired ISysViewService viewService; @Autowired ISysMenuService menuService; @Autowired IMasterAuthorSubscribeService subscribeService; /** * * @description: 通过汇集id执行汇集任务 * @param id 汇集id * @return: 执行结果 * */ @Override public Result run(String id) { XxlJobLogger.log("log info: run" ); if (StringUtils.isEmpty(id)) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } SysAssemble assemble = selectById(id); if (assemble == null) { XxlJobLogger.log("assemble not found" ); return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } assemble.setPreTime(new Date()); assemble.setPreStatus(SysAssembleRunStatus.working); assemble.updateById(); if (!assemble.getStatus().equals(SysAssembleStatus.working)) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreCnt(0); XxlJobLogger.log("当å‰ä»»åŠ¡ä¸åœ¨å·¥ä½œä¸,状æ€:" + assemble.getStatus() ); assemble.setPreMsg("当å‰ä»»åŠ¡ä¸åœ¨å·¥ä½œä¸,状æ€:" + assemble.getStatus()); assemble.updateById(); return Result.error(new CodeMsg(6009,"当å‰ä»»åŠ¡ä¸åœ¨å·¥ä½œä¸,状æ€:" + assemble.getStatus())); } String flowId = assemble.getFlowId(); if (!StringUtils.isEmpty(flowId)){ Flows flows = flowsService.selectById(flowId); if (flows != null) { ActivitiStatus status = flows.getStatus(); if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreCnt(0); XxlJobLogger.log("当剿œ‰æµç¨‹æ£åœ¨è¿è¡Œ,æš‚æ—¶æ— æ³•æ±‡é›†ä¸‹æ¬¡æ•°æ®:" + status.toString() ); assemble.setPreMsg("当剿œ‰æµç¨‹æ£åœ¨è¿è¡Œ,æš‚æ—¶æ— æ³•æ±‡é›†ä¸‹æ¬¡æ•°æ®:" + status.toString()); assemble.updateById(); return Result.error(new CodeMsg(6009,"当剿œ‰æµç¨‹æ£åœ¨è¿è¡Œ,æš‚æ—¶æ— æ³•æ±‡é›†ä¸‹æ¬¡æ•°æ®:" + status.toString())); } } } Boolean bigData = assemble.getBigdata(); Date scheduleDate = new Date(); XxlJobLogger.log("log info: start assemble"); //1 load from db try{ Result result = runByDb(assemble); Date dbDate = new Date(); XxlJobLogger.log("log info db time:" + (dbDate.getTime() - scheduleDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.updateById(); return result; } //2 load from api result = runByApi(id); Date apiDate = new Date(); XxlJobLogger.log("log info api time:" + (apiDate.getTime() - dbDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.updateById(); return result; } //3check temp table; result = checkTempTable(assemble); Date checkDate = new Date(); XxlJobLogger.log("log info check time:" + (checkDate.getTime() - apiDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); XxlJobLogger.log("检测主数æ®ä¸´æ—¶è¡¨å‡ºçŽ°é”™è¯¯"+ result.getMessage()); assemble.setPreMsg("检测主数æ®ä¸´æ—¶è¡¨å‡ºçŽ°é”™è¯¯:" + result.getMessage()); assemble.updateById(); return result; } //4 purge data String purgeSql = assemble.getPurgeSql(); purgeSql = DbUtils.replaceEscape(purgeSql); XxlJobLogger.log("assemble db purgeSql sql:" + purgeSql); result = purgeData(purgeSql, assemble.getId(), bigData); Date purgeDate = new Date(); XxlJobLogger.log("log info purge time:" + (purgeDate.getTime() - checkDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreMsg("è¿è¡Œæ¸…æ´—sql出现错误: "+purgeSql + "," + result.getMessage()); XxlJobLogger.log("è¿è¡Œæ¸…æ´—sql出现错误: "+purgeSql + "," + result.getMessage() ); assemble.updateById(); return result; } //5 check temp data result = checkTempData(assemble); Date checkTempDate = new Date(); XxlJobLogger.log("log info check temp time:" + (checkTempDate.getTime() - purgeDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreMsg("è´¨é‡æ£€éªŒçŽ¯èŠ‚å‡ºçŽ°é”™è¯¯:" + result.getMessage()); XxlJobLogger.log("è´¨é‡æ£€éªŒçŽ¯èŠ‚å‡ºçŽ°é”™è¯¯" + result.getMessage()); assemble.updateById(); return result; } //6 temp 2 record result = needRollBack(assemble, result, checkTempDate); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); XxlJobLogger.log("临时数æ®é“¾æŽ¥ä¸»æ•°æ®ç³»ç»Ÿé”™è¯¯ï¼š" +result.getMessage()); assemble.setPreMsg("临时数æ®é“¾æŽ¥ä¸»æ•°æ®ç³»ç»Ÿé”™è¯¯ï¼š" +result.getMessage()); }else { assemble.setPreMsg("汇集æˆåŠŸ"); XxlJobLogger.log("汇集æˆåŠŸ"); assemble.setPreStatus(SysAssembleRunStatus.success); } assemble.updateById(); return result; } catch (Exception e) { XxlJobLogger.log("汇集任务出现错误:"+e.getMessage()); assemble.setPreMsg("汇集任务出现错误:"+e.getMessage()); assemble.setPreStatus(SysAssembleRunStatus.fail).updateById(); e.printStackTrace(); return Result.error(new CodeMsg(6009, e.getMessage())); } } @Transactional(rollbackFor = {Exception.class, Error.class}) private Result needRollBack(SysAssemble assemble, Result result, Date checkTempDate) { result = temp2record(assemble); Date temp2RecordDate = new Date(); XxlJobLogger.log("log info temp2Record time:" + (temp2RecordDate.getTime() - checkTempDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); XxlJobLogger.log("æ•°æ®æ¬è¿è‡³ä¸»æ•°æ®è®°å½•环节出现错误;" + result.getMessage() ); assemble.setPreMsg("æ•°æ®æ¬è¿è‡³ä¸»æ•°æ®è®°å½•环节出现错误:" + result.getMessage()); assemble.updateById(); return result; } //7 record 链接 版本 result = linkMaintain(assemble); Date linkMaintainDate = new Date(); XxlJobLogger.log("log info link maintain time:" + (linkMaintainDate.getTime() - temp2RecordDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); XxlJobLogger.log("è”æŽ¥ç‰ˆæœ¬å‡ºçŽ°é”™è¯¯:" + result.getMessage()); assemble.setPreMsg("è”æŽ¥ç‰ˆæœ¬å‡ºçŽ°é”™è¯¯:" + result.getMessage()); assemble.updateById(); return result; } //8 æ›´æ–° 傿•° result = updateParams(assemble); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); XxlJobLogger.log("æ›´æ–°å‚æ•°é”™è¯¯:" + result.getMessage()); assemble.setPreMsg("æ›´æ–°å‚æ•°é”™è¯¯:" + result.getMessage()); assemble.updateById(); return result; } return result; } private Result linkMaintain(SysAssemble assemble) { try{ String menuId = assemble.getMenuId(); MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); String tableName = menuMapping.getTableName(); SysAssembleUpdateType updateType = assemble.getUpdateType(); boolean isBigVersion = false; if (updateType.equals(SysAssembleUpdateType.All)) { isBigVersion = true; } Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion); XxlJobLogger.log("log info create now maintain:" + maintain.getId()); Date before2Record = new Date(); Result result = temp2recordUpdate(assemble, maintain); Date after2Record = new Date(); XxlJobLogger.log("log info temp2Record time:" + (after2Record.getTime() - before2Record.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreMsg("æ¬è¿æ›´æ–°å—段出现错误" ); assemble.updateById(); return result; } Boolean audit = menuMapping.getAudit(); if (audit == null){ //默认需è¦å®¡æ‰¹ audit = true; } // 处ç†å…³è”人 if (audit) { String chargeId = menuMapping.getChargeId(); TUser user = DbUtils.getUserById(chargeId); if (user == null) { return Result.error(new CodeMsg(6009, "找ä¸åˆ°å¯¹åº”的负责人:" + chargeId)); } activitiService.setUser(user); Flows flows = activitiService.start("process", null, maintain.getId(), ActivitiBusinessType.maintain); assemble.setFlowId(flows.getId()).updateById(); maintain.setFlowId(flows.getId()); maintain.setDesp("å¯åŠ¨æ±‡é›†æµç¨‹"); maintain.updateById(); masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), true); }else { //直接è¿è¡Œ Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date()); flows.setBusinessType(ActivitiBusinessType.maintain); flows.insert(); maintain.setFlowId(flows.getId()); maintain.updateById(); maintainService.dealFlow(maintain.getId(), flows.getStatus()); log.info("flow-maintainService end"); viewService.dealFlow(maintain.getId(), flows.getStatus()); log.info("flow-viewService end"); subscribeService.dealFlow(maintain.getId(), flows.getStatus()); log.info("flow-subscribeService end"); menuService.dealFlow(maintain.getId(), flows.getStatus(), flows.getUserId()); log.info("flow-menuService end"); masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false); } return Result.success(null); }catch (Exception e) { e.printStackTrace(); return Result.error(new CodeMsg(6009, e.getMessage())); } } private Result updateParams(SysAssemble assemble) { Connection conn = null; try{ String id = assemble.getId(); Boolean bigdata = assemble.getBigdata(); DataSourceInfo dataSourceInfo; if (bigdata) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; } conn = dataSourceInfo.conn(); List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", id)); for (SysAssembleParams sysAssembleParams : paramsList) { String updateSql = sysAssembleParams.getUpdateSql(); if (StringUtils.isEmpty(updateSql)){ continue; } PreparedStatement preparedStatement = conn.prepareStatement(updateSql, ResultSet.TYPE_SCROLL_INSENSITIVE,ResultSet.CONCUR_READ_ONLY); ResultSet resultSet = preparedStatement.executeQuery(); //é»˜è®¤åªæœ‰ä¸€ä¸ªç»“æžœ if (resultSet.first()) { String val = resultSet.getString(1); sysAssembleParams.setVal(val); sysAssembleParams.updateById(); } } return Result.success(null) ; } catch (Exception e) { return Result.error(new CodeMsg(6006, e.getMessage())) ; } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); return Result.error(new CodeMsg(6006, e.getMessage())) ; } } } } private Result checkTempTable(SysAssemble assemble) { try { String menuId = assemble.getMenuId(); MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); String tableName = menuMapping.getTableName(); String tempTableName = Constant.Temp + tableName; Boolean bigdata = assemble.getBigdata(); DataSourceInfo dataSourceInfo; if (bigdata) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; } boolean exists = dataSourceInfo.checkTableExists(tempTableName); if (!exists){ List<String> fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName); if (fieldsFromTable == null) { return Result.error(new CodeMsg(6003, "查询æ£å¼è¡¨çš„å—æ®µ:" + tableName)); } dataSourceInfo.createTable(tempTableName, fieldsFromTable); } //drop temp data dataSourceInfo.truncateData(tempTableName); return Result.success(null); }catch (Exception e) { e.printStackTrace(); return Result.error(new CodeMsg(6004, e.getMessage())); } } private Result temp2recordUpdate(SysAssemble assemble, Maintain maintain) { Connection conn = null; try { String menuId = assemble.getMenuId(); Boolean bigdata = assemble.getBigdata(); MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); String tableName = menuMapping.getTableName(); String tempTableName = Constant.Temp + tableName; String recordTableName = tableName + Constant.RECORD; DataSourceInfo dataSourceInfo; String updateSql = null; if (bigdata) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; updateSql = Constant.MYSQLJoinUpdateSql; } conn = dataSourceInfo.conn(); SysAssembleUpdateType updateType = assemble.getUpdateType(); List<String> fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName); if (fieldsFromTable.contains(Constant.ID)) { fieldsFromTable.remove(Constant.ID); } List<String> fieldFromRecordTable = new ArrayList<>(fieldsFromTable); fieldFromRecordTable.add(Constant.ID); fieldFromRecordTable.add(Constant.STD_ID); fieldFromRecordTable.add(Constant.DEAL); String recordFieldStr = StringUtils.join(fieldFromRecordTable, Constant.COMMA); int cnt = 0; if(updateType.equals(SysAssembleUpdateType.Increment)) { String updateFields = assemble.getUpdateFields(); String[] split = updateFields.split(Constant.SEMICOLON); ArrayList<String> unionCodeFields = new ArrayList<String>(Arrays.asList(split)); String joinStr = getJoinFieldParse(unionCodeFields); List<String> otherFields = fieldsFromTable.stream().filter(s -> !unionCodeFields.contains(s)).filter(s -> !s.equalsIgnoreCase(Constant.ID)).collect(Collectors.toList()); if (otherFields.size() != 0) { String insertFieldStr = fieldsFromTable.stream() .map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s)) .collect(Collectors.joining(Constant.COMMA)); String updatedSql = MessageFormat.format(updateSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr); PreparedStatement updatedPreparedStatement = conn.prepareStatement(updatedSql); int updateCnt = updatedPreparedStatement.executeUpdate(); cnt = updateCnt; tableInfoMapper.insertMatintainDetailFromTemp(DbUtils.quotedStr(maintain.getId()), maintain.getTableName() + Constant.RECORD , DbUtils.quotedStr(Operate.update.toString())); } } //tableInfoMapper.updateStdId( maintain.getTableName() + Constant.RECORD); tableInfoMapper.tempDeal(maintain.getTableName() + Constant.RECORD, DbUtils.quotedStr(maintain.getId())); assemble.setPreCnt(assemble.getPreCnt() + cnt); assemble.updateById(); return Result.success(null); } catch (Exception e) { e.printStackTrace(); return Result.error(new CodeMsg(6005, e.getMessage())); } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); return Result.error(new CodeMsg(6005, e.getMessage())); } } } } private Result temp2record(SysAssemble assemble) { Connection conn = null; try { String menuId = assemble.getMenuId(); Boolean bigdata = assemble.getBigdata(); MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); String tableName = menuMapping.getTableName(); String tempTableName = Constant.Temp + tableName; String recordTableName = tableName + Constant.RECORD; DataSourceInfo dataSourceInfo; String transSql; String updateSql = null; String insertSql = null; if (bigdata) { dataSourceInfo = bigDataDataSourceInfo; transSql = Constant.Temp2RecordHbaseTemplate; }else { dataSourceInfo = unBigDataDataSourceInfo; transSql = Constant.Temp2RecordMySQLTemplate; updateSql = Constant.MYSQLJoinUpdateSql; insertSql = Constant.MYSQLJoinAddSql; } conn = dataSourceInfo.conn(); SysAssembleUpdateType updateType = assemble.getUpdateType(); List<String> fieldsFromTable = dataSourceInfo.getFieldsFromTable(tableName); if (fieldsFromTable.contains(Constant.ID)) { fieldsFromTable.remove(Constant.ID); } List<String> fieldFromRecordTable = new ArrayList<>(fieldsFromTable); fieldFromRecordTable.add(Constant.ID); fieldFromRecordTable.add(Constant.STD_ID); fieldFromRecordTable.add(Constant.DEAL); String recordFieldStr = StringUtils.join(fieldFromRecordTable, Constant.COMMA); String fieldStr = fieldsFromTable.stream() .collect(Collectors.joining(Constant.COMMA)); int cnt = 0; if (updateType.equals(SysAssembleUpdateType.All)) { String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName); XxlJobLogger.log("update sql:" + sql); PreparedStatement preparedStatement = conn.prepareStatement(sql); cnt = preparedStatement.executeUpdate(); }else if(updateType.equals(SysAssembleUpdateType.Increment)) { String updateFields = assemble.getUpdateFields(); String[] split = updateFields.split(Constant.SEMICOLON); ArrayList<String> unionCodeFields = new ArrayList<String>(Arrays.asList(split)); String joinStr = getJoinFieldParse(unionCodeFields); // insert String insertFieldStr = fieldsFromTable.stream() .map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s)) .collect(Collectors.joining(Constant.COMMA)); String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr); log.info(insertSql); XxlJobLogger.log("update Increment sql:" + insertSql); PreparedStatement preparedStatement = conn.prepareStatement(insetSql); int insertCnt = preparedStatement.executeUpdate(); cnt += insertCnt; } XxlJobLogger.log("update sql cnt:" + cnt); assemble.setPreCnt(cnt); assemble.updateById(); return Result.success(null); } catch (Exception e) { e.printStackTrace(); return Result.error(new CodeMsg(6005, e.getMessage())); } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); return Result.error(new CodeMsg(6005, e.getMessage())); } } } } private String getOthetFieldParse(List<String> unionCodeFields) { ContentBuilder builder = new ContentBuilder(Constant.AND); for (String s : unionCodeFields) { builder.append(MessageFormat.format(Constant.MYSQLJoinParse, s)); } return builder.toString(); } private String getJoinFieldParse(ArrayList<String> updateFields) { ContentBuilder builder = new ContentBuilder(Constant.AND); for (String s : updateFields) { builder.append(MessageFormat.format(Constant.MYSQLJoinParse, s)); } return builder.toString(); } private Result checkTempData(SysAssemble assemble) { try { SysAssembleCheckType checkType = assemble.getCheckType(); if (checkType == null) { return Result.error(new CodeMsg(6009, "è§„åˆ™æ ¡éªŒç±»åž‹ä¸ºç©º")); } if (checkType.equals(SysAssembleCheckType.unlimited)) { return Result.success(null); } String menuId = assemble.getMenuId(); MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); String tableName = menuMapping.getTableName(); String tempTableName = Constant.Temp + tableName; HashMap<String, Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId()); switch (checkType) { case successAdd: String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA)); if (fieldResultSet.keySet().contains(false)) { return Result.error(new CodeMsg(6009, "è§„åˆ™æ ¡éªŒä¸é€šè¿‡ å—æ®µ:" + unSuccessFields)); } break; case partSuccessAdd: String checkFields = assemble.getCheckFields(); String[] split = checkFields.split(Constant.SEMICOLON); for (String s : split) { Boolean checked = fieldResultSet.get(s); if (checked == null || !checked) { return Result.error(new CodeMsg(6009, "è§„åˆ™æ ¡éªŒä¸é€šè¿‡ å—æ®µ:" + s)); } } break; } return Result.success(null); } catch (Exception e) { e.printStackTrace(); return Result.error(new CodeMsg(3009, e.getMessage())); } } private Result purgeData(String purgeSql, String assembleId, boolean bigData) { Connection connection = null; Savepoint purge = null; try { DataSourceInfo dataSourceInfo = null; if (bigData) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; } connection = dataSourceInfo.conn(); connection.setAutoCommit(false); purge = connection.setSavepoint("purge"); List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", assembleId)); HashMap<String, String> paramsMap = new HashMap<>(); Set<String> matcher = DbUtils.matcher(purgeSql); for (String s : matcher) { List<SysAssembleParams> collect = paramsList.stream().filter(sysAssembleParams -> sysAssembleParams.getCode().equalsIgnoreCase(s)).collect(Collectors.toList()); if (collect.size() == 0) { return Result.error(new CodeMsg(6006, "未匹é…åˆ°å‚æ•°:" + s)); }else if (collect.size() > 1) { return Result.error(new CodeMsg(6006, "匹é…åˆ°å¤šä¸ªå‚æ•°:" + s)); }else { SysAssembleParams sysAssembleParams = collect.get(0); String val = sysAssembleParams.getVal(); if (StringUtils.isEmpty(val)) { String initSql = sysAssembleParams.getInitSql(); if (StringUtils.isEmpty(initSql)) { return Result.error(new CodeMsg(6006, "åˆå§‹åŒ–傿•°:" + s + "失败")); } PreparedStatement preparedStatement = connection.prepareStatement(initSql); ResultSet resultSet = preparedStatement.executeQuery(); //é»˜è®¤åªæœ‰ä¸€ä¸ªç»“æžœ val = resultSet.getString(1); } paramsMap.put(s, val); } } for (String key : paramsMap.keySet()) { String val = paramsMap.get(key); String preParams = MessageFormat.format(Constant.ParamsShell, key); purgeSql = purgeSql.replace(preParams, val); } XxlJobLogger.log("purgeSql:" +purgeSql); List<String> split = DbUtils.split(purgeSql, Constant.SEMICOLON); XxlJobLogger.log("split size:" + split.size()); PreparedStatement preparedStatement = null; if (!split.isEmpty()) { for (String oneSql : split) { XxlJobLogger.log("one sql:" + oneSql); if (preparedStatement == null) { preparedStatement = connection.prepareStatement(oneSql); preparedStatement.addBatch(oneSql); } else { preparedStatement.addBatch(oneSql); } } } int[] ints = preparedStatement.executeBatch(); for (int i = 0; i < ints.length; i++) { XxlJobLogger.log("sqls count:" + ints[i]); } connection.commit(); return Result.success(null); } catch (SQLException e) { e.printStackTrace(); if (connection != null) { try { if (purge != null) { connection.rollback(purge); } else { connection.rollback(); } } catch (SQLException e1) { e1.printStackTrace(); return Result.error(new CodeMsg(6004, "回滚sql出错," + e.getMessage())); } } return Result.error(new CodeMsg(6004, "è¿è¡Œæ¸…æ´—sql出错," + e.getMessage())); } finally { if (connection != null) { try { connection.close(); } catch (SQLException e) { e.printStackTrace(); return Result.error(new CodeMsg(6003, "本地connection æ— æ³•å…³é—")); } } } } private Result runByApi(String id) { //TODO return Result.success(null); } private Result runByDb(SysAssemble assemble) { try { String id = assemble.getId(); Boolean bigdata = assemble.getBigdata(); List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true)); for (SysAssembleDb sysAssembleDb : dbList) { String dbId = sysAssembleDb.getId(); Result result = loadOneDb(sysAssembleDb, bigdata, id); XxlJobLogger.log("assemble db load one db:" + sysAssembleDb.getDatasourceUrl()); if (!result.getSuccess()) { assemble.setPreMsg("å¯¼å…¥æºæ•°æ®é”™è¯¯:" + sysAssembleDb.getDatabaseName()).updateById(); return result; } } return Result.success(null); } catch (Exception e) { return Result.error(new CodeMsg(3009, e.getMessage())); } } private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) { String dbId = sysAssembleDb.getId(); Connection conn = dbService.getConnection(dbId); if (conn == null) { return Result.error(new CodeMsg(6002, MessageFormat.format("æœªèƒ½è¿žæŽ¥åˆ°æº id: {0}", dbId))); } Connection dataSourceConn = null; PreparedStatement statement = null; try { List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true)); for (SysAssembleDbTable dbTable : tableList) { String tableId = dbTable.getId(); String tempTableName = dbTable.getTempTableName(); XxlJobLogger.log("assemble table" + tempTableName); List<SysAssembleDbField> fieldList = fieldService.selectList(new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId)); String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA)); XxlJobLogger.log("assemble fields" + fields); SysAssembleTableType type = dbTable.getType(); String tableName = null; String assembleTempTableName = tempTableName; if (type.equals(SysAssembleTableType.sql)) { tableName = dbTable.getSql(); } else if (type.equals(SysAssembleTableType.table)) { tableName = dbTable.getTableName(); } XxlJobLogger.log("assemble source table:" + tableName); //TODO assembleTempTableName å¯èƒ½ä¼šè¶…é•¿ 64å—节 åŽç»ä¿®æ”¹ DataSourceInfo dataSourceInfo = null; if (!bigData) { dataSourceInfo = unBigDataDataSourceInfo; }else { dataSourceInfo = bigDataDataSourceInfo; } // drop temp data boolean b = dataSourceInfo.truncateData(tempTableName); XxlJobLogger.log("assemble truncateData :" + b); fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId); XxlJobLogger.log("assemble fixAssembleTempTable :" ); //checkAssembleTempTableExists(assembleTempTableName); String filter = dbTable.getFilter(); XxlJobLogger.log("assemble raw filter:" + filter); if (!StringUtils.isEmpty(filter)) { Set<String> matcher = DbUtils.matcher(filter); for (String code : matcher) { SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code)); if (sysAssembleParams == null){ return Result.error(new CodeMsg(6009, assembleId + "有å˜é‡æœªåŒ¹é…到:"+ code)); } String val = sysAssembleParams.getVal(); if (StringUtils.isEmpty(val)) { return Result.error(new CodeMsg(6009, assembleId + "有å˜é‡æœªèŽ·å–到值:"+ code)); } val = DbUtils.quotedStr(val); filter = filter.replace(DbUtils.assemblParam(code), val); } }else { filter = Constant.WHERE_DEFAULT; } XxlJobLogger.log("assemble db filter:" + filter); String runSqlTemplate = null; if (type.equals(SysAssembleTableType.table)){ runSqlTemplate = Constant.selectFieldTableTemplate; }else if(type.equals(SysAssembleTableType.sql)){ runSqlTemplate = Constant.selectFieldSqlTemplate; } XxlJobLogger.log("assemble db select sql template :" + runSqlTemplate); String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter); XxlJobLogger.log("assemble db select sql:" + sql); PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ps.setFetchSize(Integer.MIN_VALUE); ps.setFetchDirection(ResultSet.FETCH_REVERSE); ResultSet resultSet = ps.executeQuery(); ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fieldList); XxlJobLogger.log("assemble db insert sql:" + insertMySqlSql); int cnt = 0; dataSourceConn = dataSourceInfo.conn(); dataSourceConn.setAutoCommit(false); statement = dataSourceConn.prepareStatement(insertMySqlSql); int count = 0; while (resultSet.next()) { count++; for (int i = 1; i <= columnCount; i++) { statement.setString(i, resultSet.getString(i)); } statement.addBatch(); cnt++; if (cnt == 5000) { statement.executeBatch(); } } statement.executeBatch(); XxlJobLogger.log("assemble db select sql count:" + count); } dataSourceConn.commit(); return Result.success(null); } catch (Exception e) { e.printStackTrace(); return Result.error(new CodeMsg(6004, e.getMessage())); }finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } if (statement != null) { try { statement.close(); } catch (SQLException e) { e.printStackTrace(); } } if (dataSourceConn != null) { try { dataSourceConn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } private void setOneBatch(boolean bigData, String insertMySqlSql, List<List<String>> branchList) { Connection conn = null; try { DataSourceInfo dataSourceInfo = null; if (bigData) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; } conn = dataSourceInfo.conn(); PreparedStatement statement = conn.prepareStatement(insertMySqlSql); int result = 0; if (branchList.isEmpty()) { return; } for (List<String> stringList : branchList) { for (int i = 0; i < stringList.size(); i++) { statement.setString(i + 1, stringList.get(i)); } statement.addBatch(); } statement.executeBatch(); } catch (Exception e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException { byte[] bytes = builder.toString().getBytes("UTF-8"); if (bytes.length > 0) { InputStream stream = new ByteArrayInputStream(bytes); bulkLoadFromInputStream(insertMySqlSql, stream, bigData); } //清空bulider builder.delete(0, builder.length()); } private void fixAssembleTempTable(DataSourceInfo dataSourceInfo, String assembleTempTableName, List<SysAssembleDbField> dbFieldList, String dbId) { boolean exists = dataSourceInfo.checkTableExists(assembleTempTableName); List<String> fieldList = dbFieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.toList()); if (!exists) { dataSourceInfo.createTable(assembleTempTableName, fieldList); }else { //check å—æ®µæ˜¯å¦ç›¸åŒ boolean checked = dataSourceInfo.checkFieldList(assembleTempTableName, fieldList); if (!checked){ assembleTempTableName = reSetTableName(assembleTempTableName, dbId); dataSourceInfo.createTable(assembleTempTableName, fieldList); } } } private String reSetTableName(String assembleTempTableName, String tableId) { //TODO Db SysAssembleDbTable dbTable = tableService.selectById(tableId); String suffix = null; suffix = DbUtils.getUUID(16); String tempTableName = null; if (dbTable.getType().equals(SysAssembleTableType.table)) { tempTableName = Constant.AssembleTempTable + dbTable.getTableName() + suffix; }else { tempTableName = Constant.AssembleTempSql+ DbUtils.getUUID(5)+ suffix; } dbTable.setTempTableName(tempTableName).updateById(); return tempTableName; } public void builderEnd(StringBuilder builder, Object object) { builder.append(object); builder.append("\n"); } public void builderAppend(StringBuilder builder, Object object) { builder.append(object); builder.append(","); } public String assembleSql(String dataBaseName, String tableName, List<SysAssembleDbField> fieldList) { String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA)); ContentBuilder builder = new ContentBuilder(Constant.COMMA); for (int i = 0; i < fieldList.size(); i++) { builder.append(Constant.QUESTION); } String sql = "INSERT INTO " + dataBaseName + "." + tableName + "(" + fields + ") values(" + builder.toString() + ")"; return sql; } private int bulkLoadFromInputStream(String sql, InputStream dataStream, boolean bigData) { if (null == dataStream) { log.error("输入æµä¸ºNULL,没有数æ®å¯¼å…¥ã€‚"); return 0; } Connection conn = null; try { DataSourceInfo dataSourceInfo = null; if (bigData) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; } conn = dataSourceInfo.conn(); PreparedStatement statement = conn.prepareStatement(sql); int result = 0; if (statement.isWrapperFor(java.sql.Statement.class)) { // com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class); // mysqlStatement.setLocalInfileInputStream(dataStream); // statement. // result = mysqlStatement.executeUpdate(); } return result; } catch (Exception e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } return 0; } }