| | |
| | | package com.highdatas.mdm.service.impl; |
| | | |
| | | import com.baomidou.mybatisplus.mapper.EntityWrapper; |
| | | import com.baomidou.mybatisplus.service.impl.ServiceImpl; |
| | | import com.highdatas.mdm.entity.*; |
| | | import com.highdatas.mdm.mapper.SysAssembleMapper; |
| | | import com.highdatas.mdm.mapper.TableInfoMapper; |
| | |
| | | import com.highdatas.mdm.pojo.kettle.DataSourceInfo; |
| | | import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo; |
| | | import com.highdatas.mdm.service.*; |
| | | import com.baomidou.mybatisplus.service.impl.ServiceImpl; |
| | | import com.highdatas.mdm.util.Constant; |
| | | import com.highdatas.mdm.util.ContentBuilder; |
| | | import com.highdatas.mdm.util.DbUtils; |
| | | import com.highdatas.mdm.util.RuleClient; |
| | | import lombok.Data; |
| | | import com.xxl.job.core.log.XxlJobLogger; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.commons.lang3.StringUtils; |
| | | import org.apache.ibatis.session.SqlSession; |
| | | import org.apache.regexp.RE; |
| | | import org.jcodings.util.Hash; |
| | | import org.mybatis.spring.SqlSessionTemplate; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.transaction.annotation.Transactional; |
| | | |
| | | import javax.annotation.Resource; |
| | | import javax.servlet.http.HttpSession; |
| | | import java.io.ByteArrayInputStream; |
| | | import java.io.InputStream; |
| | | import java.io.UnsupportedEncodingException; |
| | |
| | | BigDataDataSourceInfo bigDataDataSourceInfo; |
| | | @Autowired |
| | | MasterDataService masterDataService; |
| | | @Autowired |
| | | ITUserService userService; |
| | | |
| | | @Autowired |
| | | RuleClient ruleClient; |
| | | @Autowired |
| | | IMasterModifiedService masterModifiedService; |
| | | |
| | | @Autowired |
| | | ISysViewService viewService; |
| | | @Autowired |
| | | ISysMenuService menuService; |
| | | @Autowired |
| | | IMasterAuthorSubscribeService subscribeService; |
| | | /** |
| | | * |
| | | * @description: 通过汇集id执行汇集任务 |
| | | * @param id 汇集id |
| | | * @return: 执行结果 |
| | | * |
| | | */ |
| | | @Override |
| | | public Result run(String id) { |
| | | XxlJobLogger.log("log info: run" ); |
| | | if (StringUtils.isEmpty(id)) { |
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); |
| | | } |
| | | |
| | | SysAssemble assemble = selectById(id); |
| | | if (assemble == null) { |
| | | XxlJobLogger.log("assemble not found" ); |
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); |
| | | } |
| | | |
| | | assemble.setPreTime(new Date()); |
| | | assemble.setPreStatus(SysAssembleRunStatus.working); |
| | | assemble.updateById(); |
| | |
| | | if (!assemble.getStatus().equals(SysAssembleStatus.working)) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreCnt(0); |
| | | XxlJobLogger.log("当前任务不在工作中,状态:" + assemble.getStatus() ); |
| | | assemble.setPreMsg("当前任务不在工作中,状态:" + assemble.getStatus()); |
| | | assemble.updateById(); |
| | | return Result.error(new CodeMsg(6009,"当前任务不在工作中,状态:" + assemble.getStatus())); |
| | |
| | | if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreCnt(0); |
| | | XxlJobLogger.log("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString() ); |
| | | assemble.setPreMsg("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString()); |
| | | assemble.updateById(); |
| | | return Result.error(new CodeMsg(6009,"当前有流程正在运行,暂时无法汇集下次数据:" + status.toString())); |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | Boolean bigData = assemble.getBigdata(); |
| | | |
| | | Date scheduleDate = new Date(); |
| | | XxlJobLogger.log("log info: start assemble"); |
| | | //1 load from db |
| | | try{ |
| | | Result result = runByDb(assemble); |
| | | Date dbDate = new Date(); |
| | | XxlJobLogger.log("log info db time:" + (dbDate.getTime() - scheduleDate.getTime()) + "ms"); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | |
| | | //2 load from api |
| | | result = runByApi(id); |
| | | Date apiDate = new Date(); |
| | | XxlJobLogger.log("log info api time:" + (apiDate.getTime() - dbDate.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.updateById(); |
| | |
| | | } |
| | | //3check temp table; |
| | | result = checkTempTable(assemble); |
| | | Date checkDate = new Date(); |
| | | XxlJobLogger.log("log info check time:" + (checkDate.getTime() - apiDate.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("检测主数据临时表出现错误"); |
| | | XxlJobLogger.log("检测主数据临时表出现错误"+ result.getMessage()); |
| | | |
| | | assemble.setPreMsg("检测主数据临时表出现错误:" + result.getMessage()); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | |
| | | //4 purge data |
| | | String purgeSql = assemble.getPurgeSql(); |
| | | purgeSql = DbUtils.replaceEscape(purgeSql); |
| | | XxlJobLogger.log("assemble db purgeSql sql:" + purgeSql); |
| | | result = purgeData(purgeSql, assemble.getId(), bigData); |
| | | Date purgeDate = new Date(); |
| | | XxlJobLogger.log("log info purge time:" + (purgeDate.getTime() - checkDate.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("运行清洗sql出现错误: "+purgeSql ); |
| | | assemble.setPreMsg("运行清洗sql出现错误: "+purgeSql + "," + result.getMessage()); |
| | | XxlJobLogger.log("运行清洗sql出现错误: "+purgeSql + "," + result.getMessage() ); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | |
| | | //5 check temp data |
| | | result = checkTempData(assemble); |
| | | Date checkTempDate = new Date(); |
| | | XxlJobLogger.log("log info check temp time:" + (checkTempDate.getTime() - purgeDate.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("质量检验环节出现错误" ); |
| | | assemble.setPreMsg("质量检验环节出现错误:" + result.getMessage()); |
| | | XxlJobLogger.log("质量检验环节出现错误" + result.getMessage()); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | //6 temp 2 record |
| | | result = needRollBack(assemble, result, checkTempDate); |
| | | |
| | | result = temp2record(assemble); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("数据搬运至主数据记录环节出现错误" ); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | |
| | | //7 record 链接 版本 |
| | | result = linkMaintain(assemble); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("联接版本出现错误" ); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | |
| | | //todo 添加事务 log记录 |
| | | //8 更新 参数 |
| | | result = updateParams(assemble); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("更新变量环节出现错误" ); |
| | | XxlJobLogger.log("临时数据链接主数据系统错误:" +result.getMessage()); |
| | | assemble.setPreMsg("临时数据链接主数据系统错误:" +result.getMessage()); |
| | | }else { |
| | | assemble.setPreMsg("汇集成功"); |
| | | XxlJobLogger.log("汇集成功"); |
| | | assemble.setPreStatus(SysAssembleRunStatus.success); |
| | | } |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | catch (Exception e) { |
| | | assemble.setPreMsg("汇集任务出现错误"); |
| | | XxlJobLogger.log("汇集任务出现错误:"+e.getMessage()); |
| | | assemble.setPreMsg("汇集任务出现错误:"+e.getMessage()); |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail).updateById(); |
| | | e.printStackTrace(); |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | @Transactional(rollbackFor = {Exception.class, Error.class}) |
| | | private Result needRollBack(SysAssemble assemble, Result result, Date checkTempDate) { |
| | | result = temp2record(assemble); |
| | | Date temp2RecordDate = new Date(); |
| | | XxlJobLogger.log("log info temp2Record time:" + (temp2RecordDate.getTime() - checkTempDate.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | XxlJobLogger.log("数据搬运至主数据记录环节出现错误;" + result.getMessage() ); |
| | | assemble.setPreMsg("数据搬运至主数据记录环节出现错误:" + result.getMessage()); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | |
| | | |
| | | //7 record 链接 版本 |
| | | |
| | | result = linkMaintain(assemble); |
| | | Date linkMaintainDate = new Date(); |
| | | XxlJobLogger.log("log info link maintain time:" + (linkMaintainDate.getTime() - temp2RecordDate.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | XxlJobLogger.log("联接版本出现错误:" + result.getMessage()); |
| | | |
| | | assemble.setPreMsg("联接版本出现错误:" + result.getMessage()); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | |
| | | //8 更新 参数 |
| | | result = updateParams(assemble); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | XxlJobLogger.log("更新参数错误:" + result.getMessage()); |
| | | |
| | | assemble.setPreMsg("更新参数错误:" + result.getMessage()); |
| | | assemble.updateById(); |
| | | return result; |
| | | } |
| | | return result; |
| | | } |
| | | |
| | | |
| | | private Result linkMaintain(SysAssemble assemble) { |
| | | try{ |
| | | String menuId = assemble.getMenuId(); |
| | | MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); |
| | | String tableName = menuMapping.getTableName(); |
| | | Maintain maintain = masterDataService.uploadedData(tableName, assemble.getUpdateType(), assemble.getUserId()); |
| | | SysAssembleUpdateType updateType = assemble.getUpdateType(); |
| | | boolean isBigVersion = false; |
| | | if (updateType.equals(SysAssembleUpdateType.All)) { |
| | | isBigVersion = true; |
| | | } |
| | | Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion); |
| | | XxlJobLogger.log("log info create now maintain:" + maintain.getId()); |
| | | Date before2Record = new Date(); |
| | | |
| | | Result result = temp2recordUpdate(assemble, maintain); |
| | | Date after2Record = new Date(); |
| | | XxlJobLogger.log("log info temp2Record time:" + (after2Record.getTime() - before2Record.getTime()) + "ms"); |
| | | |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreStatus(SysAssembleRunStatus.fail); |
| | | assemble.setPreMsg("搬运更新字段出现错误" ); |
| | |
| | | |
| | | if (audit) { |
| | | String chargeId = menuMapping.getChargeId(); |
| | | TUser user = userService.selectById(chargeId); |
| | | TUser user = DbUtils.getUserById(chargeId); |
| | | if (user == null) { |
| | | return Result.error(new CodeMsg(6009, "找不到对应的负责人:" + chargeId)); |
| | | } |
| | |
| | | }else { |
| | | //直接运行 |
| | | Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date()); |
| | | flows.setBusinessType(ActivitiBusinessType.maintain); |
| | | flows.insert(); |
| | | maintain.setFlowId(flows.getId()); |
| | | maintain.updateById(); |
| | | maintainService.dealFlow(maintain.getId(), ActivitiStatus.open); |
| | | maintainService.dealFlow(maintain.getId(), flows.getStatus()); |
| | | log.info("flow-maintainService end"); |
| | | viewService.dealFlow(maintain.getId(), flows.getStatus()); |
| | | log.info("flow-viewService end"); |
| | | subscribeService.dealFlow(maintain.getId(), flows.getStatus()); |
| | | log.info("flow-subscribeService end"); |
| | | menuService.dealFlow(maintain.getId(), flows.getStatus(), flows.getUserId()); |
| | | log.info("flow-menuService end"); |
| | | masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false); |
| | | } |
| | | return Result.success(null); |
| | |
| | | int cnt = 0; |
| | | if (updateType.equals(SysAssembleUpdateType.All)) { |
| | | String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName); |
| | | XxlJobLogger.log("update sql:" + sql); |
| | | PreparedStatement preparedStatement = conn.prepareStatement(sql); |
| | | cnt = preparedStatement.executeUpdate(); |
| | | |
| | | }else if(updateType.equals(SysAssembleUpdateType.Increment)) { |
| | | String updateFields = assemble.getUpdateFields(); |
| | | String[] split = updateFields.split(Constant.SEMICOLON); |
| | |
| | | .collect(Collectors.joining(Constant.COMMA)); |
| | | |
| | | String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr); |
| | | log.info(insertSql); |
| | | XxlJobLogger.log("update Increment sql:" + insertSql); |
| | | PreparedStatement preparedStatement = conn.prepareStatement(insetSql); |
| | | int insertCnt = preparedStatement.executeUpdate(); |
| | | cnt += insertCnt; |
| | | |
| | | } |
| | | XxlJobLogger.log("update sql cnt:" + cnt); |
| | | assemble.setPreCnt(cnt); |
| | | assemble.updateById(); |
| | | return Result.success(null); |
| | | } |
| | | catch (Exception e) { |
| | | e.printStackTrace(); |
| | | |
| | | return Result.error(new CodeMsg(6005, e.getMessage())); |
| | | } |
| | | finally { |
| | |
| | | } |
| | | |
| | | private Result checkTempData(SysAssemble assemble) { |
| | | //TODO |
| | | try { |
| | | |
| | | SysAssembleCheckType checkType = assemble.getCheckType(); |
| | | if (checkType == null) { |
| | | return Result.error(new CodeMsg(6009,"规则校验类型为空")); |
| | | } |
| | | if (checkType.equals(SysAssembleCheckType.unlimited)) { |
| | | return Result.success(null); |
| | | } |
| | | |
| | | String menuId = assemble.getMenuId(); |
| | | MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); |
| | | String tableName = menuMapping.getTableName(); |
| | | String tempTableName = Constant.Temp + tableName; |
| | | HashMap<String,Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId()); |
| | | switch (checkType){ |
| | | case successAdd: |
| | | String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA)); |
| | | if (fieldResultSet.keySet().contains(false)) { |
| | | return Result.error(new CodeMsg(6009,"规则校验不通过 字段:" + unSuccessFields)); |
| | | } |
| | | break; |
| | | case partSuccessAdd: |
| | | String checkFields = assemble.getCheckFields(); |
| | | String[] split = checkFields.split(Constant.SEMICOLON); |
| | | for (String s : split) { |
| | | Boolean checked = fieldResultSet.get(s); |
| | | if (checked == null || !checked) { |
| | | return Result.error(new CodeMsg(6009,"规则校验不通过 字段:" + s)); |
| | | SysAssembleCheckType checkType = assemble.getCheckType(); |
| | | if (checkType == null) { |
| | | return Result.error(new CodeMsg(6009, "规则校验类型为空")); |
| | | } |
| | | if (checkType.equals(SysAssembleCheckType.unlimited)) { |
| | | return Result.success(null); |
| | | } |
| | | |
| | | String menuId = assemble.getMenuId(); |
| | | MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); |
| | | String tableName = menuMapping.getTableName(); |
| | | String tempTableName = Constant.Temp + tableName; |
| | | HashMap<String, Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId()); |
| | | switch (checkType) { |
| | | case successAdd: |
| | | String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA)); |
| | | if (fieldResultSet.keySet().contains(false)) { |
| | | return Result.error(new CodeMsg(6009, "规则校验不通过 字段:" + unSuccessFields)); |
| | | } |
| | | } |
| | | break; |
| | | break; |
| | | case partSuccessAdd: |
| | | String checkFields = assemble.getCheckFields(); |
| | | String[] split = checkFields.split(Constant.SEMICOLON); |
| | | for (String s : split) { |
| | | Boolean checked = fieldResultSet.get(s); |
| | | if (checked == null || !checked) { |
| | | return Result.error(new CodeMsg(6009, "规则校验不通过 字段:" + s)); |
| | | } |
| | | } |
| | | break; |
| | | } |
| | | |
| | | |
| | | return Result.success(null); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | return Result.error(new CodeMsg(3009, e.getMessage())); |
| | | } |
| | | |
| | | |
| | | |
| | | return Result.success(null); |
| | | } |
| | | |
| | | |
| | | private Result purgeData(String purgeSql, String assembleId, boolean bigData) { |
| | | Connection connection = null; |
| | | Savepoint purge = null; |
| | | try { |
| | | DataSourceInfo dataSourceInfo = null; |
| | | if (bigData) { |
| | |
| | | dataSourceInfo = unBigDataDataSourceInfo; |
| | | } |
| | | connection = dataSourceInfo.conn(); |
| | | |
| | | connection.setAutoCommit(false); |
| | | purge = connection.setSavepoint("purge"); |
| | | List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", assembleId)); |
| | | HashMap<String, String> paramsMap = new HashMap<>(); |
| | | Set<String> matcher = DbUtils.matcher(purgeSql); |
| | |
| | | String preParams = MessageFormat.format(Constant.ParamsShell, key); |
| | | purgeSql = purgeSql.replace(preParams, val); |
| | | } |
| | | XxlJobLogger.log("purgeSql:" +purgeSql); |
| | | List<String> split = DbUtils.split(purgeSql, Constant.SEMICOLON); |
| | | XxlJobLogger.log("split size:" + split.size()); |
| | | PreparedStatement preparedStatement = null; |
| | | if (!split.isEmpty()) { |
| | | for (String oneSql : split) { |
| | | XxlJobLogger.log("one sql:" + oneSql); |
| | | if (preparedStatement == null) { |
| | | preparedStatement = connection.prepareStatement(oneSql); |
| | | preparedStatement.addBatch(oneSql); |
| | | } else { |
| | | preparedStatement.addBatch(oneSql); |
| | | } |
| | | } |
| | | } |
| | | int[] ints = preparedStatement.executeBatch(); |
| | | for (int i = 0; i < ints.length; i++) { |
| | | XxlJobLogger.log("sqls count:" + ints[i]); |
| | | } |
| | | |
| | | PreparedStatement preparedStatement = connection.prepareStatement(purgeSql); |
| | | preparedStatement.execute(); |
| | | connection.commit(); |
| | | |
| | | return Result.success(null); |
| | | |
| | | } catch (SQLException e) { |
| | | e.printStackTrace(); |
| | | return Result.error(new CodeMsg(6004, "运行清洗sql出错," + e.getSQLState())); |
| | | if (connection != null) { |
| | | try { |
| | | if (purge != null) { |
| | | connection.rollback(purge); |
| | | } else { |
| | | connection.rollback(); |
| | | } |
| | | |
| | | } catch (SQLException e1) { |
| | | e1.printStackTrace(); |
| | | return Result.error(new CodeMsg(6004, "回滚sql出错," + e.getMessage())); |
| | | } |
| | | } |
| | | return Result.error(new CodeMsg(6004, "运行清洗sql出错," + e.getMessage())); |
| | | } finally { |
| | | if (connection != null) { |
| | | try { |
| | |
| | | } |
| | | |
| | | private Result runByDb(SysAssemble assemble) { |
| | | String id = assemble.getId(); |
| | | Boolean bigdata = assemble.getBigdata(); |
| | | try { |
| | | String id = assemble.getId(); |
| | | Boolean bigdata = assemble.getBigdata(); |
| | | |
| | | List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true)); |
| | | for (SysAssembleDb sysAssembleDb : dbList) { |
| | | String dbId = sysAssembleDb.getId(); |
| | | Result result = loadOneDb(sysAssembleDb, bigdata, id); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreMsg("导入源数据错误:" + sysAssembleDb.getDatabaseName()).updateById(); |
| | | return result; |
| | | List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true)); |
| | | for (SysAssembleDb sysAssembleDb : dbList) { |
| | | String dbId = sysAssembleDb.getId(); |
| | | Result result = loadOneDb(sysAssembleDb, bigdata, id); |
| | | XxlJobLogger.log("assemble db load one db:" + sysAssembleDb.getDatasourceUrl()); |
| | | if (!result.getSuccess()) { |
| | | assemble.setPreMsg("导入源数据错误:" + sysAssembleDb.getDatabaseName()).updateById(); |
| | | return result; |
| | | } |
| | | } |
| | | return Result.success(null); |
| | | } catch (Exception e) { |
| | | return Result.error(new CodeMsg(3009, e.getMessage())); |
| | | } |
| | | return Result.success(null); |
| | | } |
| | | |
| | | private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) { |
| | |
| | | if (conn == null) { |
| | | return Result.error(new CodeMsg(6002, MessageFormat.format("未能连接到源 id: {0}", dbId))); |
| | | } |
| | | Connection dataSourceConn = null; |
| | | PreparedStatement statement = null; |
| | | try { |
| | | List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true)); |
| | | for (SysAssembleDbTable dbTable : tableList) { |
| | | String tableId = dbTable.getId(); |
| | | String tempTableName = dbTable.getTempTableName(); |
| | | |
| | | XxlJobLogger.log("assemble table" + tempTableName); |
| | | List<SysAssembleDbField> fieldList = fieldService.selectList(new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId)); |
| | | String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA)); |
| | | XxlJobLogger.log("assemble fields" + fields); |
| | | SysAssembleTableType type = dbTable.getType(); |
| | | String tableName = null; |
| | | String assembleTempTableName = tempTableName; |
| | |
| | | } else if (type.equals(SysAssembleTableType.table)) { |
| | | tableName = dbTable.getTableName(); |
| | | } |
| | | XxlJobLogger.log("assemble source table:" + tableName); |
| | | //TODO assembleTempTableName 可能会超长 64字节 后续修改 |
| | | DataSourceInfo dataSourceInfo = null; |
| | | if (!bigData) { |
| | |
| | | } |
| | | |
| | | // drop temp data |
| | | dataSourceInfo.truncateData(tempTableName); |
| | | |
| | | boolean b = dataSourceInfo.truncateData(tempTableName); |
| | | XxlJobLogger.log("assemble truncateData :" + b); |
| | | fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId); |
| | | |
| | | XxlJobLogger.log("assemble fixAssembleTempTable :" ); |
| | | //checkAssembleTempTableExists(assembleTempTableName); |
| | | //TODO 未分页 |
| | | String filter = dbTable.getFilter(); |
| | | Set<String> matcher = DbUtils.matcher(filter); |
| | | for (String code : matcher) { |
| | | SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code)); |
| | | if (sysAssembleParams == null){ |
| | | return Result.error(new CodeMsg(6009, assembleId + "有变量未匹配到:"+ code)); |
| | | XxlJobLogger.log("assemble raw filter:" + filter); |
| | | if (!StringUtils.isEmpty(filter)) { |
| | | Set<String> matcher = DbUtils.matcher(filter); |
| | | for (String code : matcher) { |
| | | SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code)); |
| | | if (sysAssembleParams == null){ |
| | | return Result.error(new CodeMsg(6009, assembleId + "有变量未匹配到:"+ code)); |
| | | } |
| | | String val = sysAssembleParams.getVal(); |
| | | if (StringUtils.isEmpty(val)) { |
| | | return Result.error(new CodeMsg(6009, assembleId + "有变量未获取到值:"+ code)); |
| | | } |
| | | val = DbUtils.quotedStr(val); |
| | | filter = filter.replace(DbUtils.assemblParam(code), val); |
| | | } |
| | | String val = sysAssembleParams.getVal(); |
| | | if (StringUtils.isEmpty(val)) { |
| | | return Result.error(new CodeMsg(6009, assembleId + "有变量未获取到值:"+ code)); |
| | | } |
| | | val = DbUtils.quotedStr(val); |
| | | filter = filter.replace(DbUtils.assemblParam(code), val); |
| | | } |
| | | if (StringUtils.isEmpty(filter)) { |
| | | }else { |
| | | filter = Constant.WHERE_DEFAULT; |
| | | } |
| | | |
| | | XxlJobLogger.log("assemble db filter:" + filter); |
| | | String runSqlTemplate = null; |
| | | if (type.equals(SysAssembleTableType.table)){ |
| | | runSqlTemplate = Constant.selectFieldTableTemplate; |
| | | }else if(type.equals(SysAssembleTableType.sql)){ |
| | | runSqlTemplate = Constant.selectFieldSqlTemplate; |
| | | } |
| | | XxlJobLogger.log("assemble db select sql template :" + runSqlTemplate); |
| | | String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter); |
| | | XxlJobLogger.log("assemble db select sql:" + sql); |
| | | PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); |
| | | ps.setFetchSize(Integer.MIN_VALUE); |
| | | ps.setFetchDirection(ResultSet.FETCH_REVERSE); |
| | |
| | | ResultSetMetaData metaData = resultSet.getMetaData(); |
| | | int columnCount = metaData.getColumnCount(); |
| | | |
| | | String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fields); |
| | | String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fieldList); |
| | | XxlJobLogger.log("assemble db insert sql:" + insertMySqlSql); |
| | | int cnt = 0; |
| | | StringBuilder builder = new StringBuilder(); |
| | | dataSourceConn = dataSourceInfo.conn(); |
| | | dataSourceConn.setAutoCommit(false); |
| | | statement = dataSourceConn.prepareStatement(insertMySqlSql); |
| | | int count = 0; |
| | | while (resultSet.next()) { |
| | | |
| | | count++; |
| | | for (int i = 1; i <= columnCount; i++) { |
| | | if (i == columnCount) { |
| | | builderEnd(builder, resultSet.getObject(i)); |
| | | }else { |
| | | builderAppend(builder, resultSet.getObject(i)); |
| | | } |
| | | |
| | | statement.setString(i, resultSet.getString(i)); |
| | | } |
| | | statement.addBatch(); |
| | | cnt++; |
| | | if (cnt == 5000) { |
| | | runOneBatch(bigData, insertMySqlSql, builder); |
| | | statement.executeBatch(); |
| | | } |
| | | } |
| | | if (builder.length() > 0) { |
| | | runOneBatch(bigData, insertMySqlSql, builder); |
| | | } |
| | | statement.executeBatch(); |
| | | XxlJobLogger.log("assemble db select sql count:" + count); |
| | | } |
| | | |
| | | dataSourceConn.commit(); |
| | | return Result.success(null); |
| | | } |
| | | catch (Exception e) { |
| | |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | if (statement != null) { |
| | | try { |
| | | statement.close(); |
| | | } catch (SQLException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | if (dataSourceConn != null) { |
| | | try { |
| | | dataSourceConn.close(); |
| | | } catch (SQLException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | private void setOneBatch(boolean bigData, String insertMySqlSql, List<List<String>> branchList) { |
| | | Connection conn = null; |
| | | try { |
| | | DataSourceInfo dataSourceInfo = null; |
| | | if (bigData) { |
| | | dataSourceInfo = bigDataDataSourceInfo; |
| | | }else { |
| | | dataSourceInfo = unBigDataDataSourceInfo; |
| | | } |
| | | |
| | | conn = dataSourceInfo.conn(); |
| | | PreparedStatement statement = conn.prepareStatement(insertMySqlSql); |
| | | int result = 0; |
| | | if (branchList.isEmpty()) { |
| | | return; |
| | | } |
| | | for (List<String> stringList : branchList) { |
| | | for (int i = 0; i < stringList.size(); i++) { |
| | | statement.setString(i + 1, stringList.get(i)); |
| | | } |
| | | statement.addBatch(); |
| | | } |
| | | statement.executeBatch(); |
| | | } |
| | | catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | finally { |
| | | if (conn != null) { |
| | | try { |
| | | conn.close(); |
| | | } catch (SQLException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException { |
| | |
| | | |
| | | public void builderAppend(StringBuilder builder, Object object) { |
| | | builder.append(object); |
| | | builder.append("\t"); |
| | | builder.append(","); |
| | | } |
| | | |
| | | |
| | | public String assembleSql(String dataBaseName, String tableName, String fields) { |
| | | String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + "(" + fields + ")"; |
| | | public String assembleSql(String dataBaseName, String tableName, List<SysAssembleDbField> fieldList) { |
| | | String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA)); |
| | | ContentBuilder builder = new ContentBuilder(Constant.COMMA); |
| | | for (int i = 0; i < fieldList.size(); i++) { |
| | | builder.append(Constant.QUESTION); |
| | | } |
| | | String sql = "INSERT INTO " + dataBaseName + "." + tableName + "(" + fields + ") values(" + builder.toString() + ")"; |
| | | return sql; |
| | | } |
| | | |
| | |
| | | conn = dataSourceInfo.conn(); |
| | | PreparedStatement statement = conn.prepareStatement(sql); |
| | | int result = 0; |
| | | if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) { |
| | | com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class); |
| | | mysqlStatement.setLocalInfileInputStream(dataStream); |
| | | result = mysqlStatement.executeUpdate(); |
| | | if (statement.isWrapperFor(java.sql.Statement.class)) { |
| | | // com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class); |
| | | // mysqlStatement.setLocalInfileInputStream(dataStream); |
| | | // statement. |
| | | // result = mysqlStatement.executeUpdate(); |
| | | } |
| | | return result; |
| | | } |