From c007f0ca1785db093d48f4846cda82fe8e955765 Mon Sep 17 00:00:00 2001 From: kimi <kimi42345@gmail.com> Date: 星期三, 27 五月 2020 09:59:29 +0800 Subject: [PATCH] merage --- src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java | 423 ++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 311 insertions(+), 112 deletions(-) diff --git a/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java b/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java index 19170e4..d88c785 100644 --- a/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java +++ b/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java @@ -14,11 +14,13 @@ import com.highdatas.mdm.util.ContentBuilder; import com.highdatas.mdm.util.DbUtils; import com.highdatas.mdm.util.RuleClient; +import com.xxl.job.core.log.XxlJobLogger; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.mybatis.spring.SqlSessionTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import javax.annotation.Resource; import java.io.ByteArrayInputStream; @@ -78,17 +80,32 @@ RuleClient ruleClient; @Autowired IMasterModifiedService masterModifiedService; - + @Autowired + ISysViewService viewService; + @Autowired + ISysMenuService menuService; + @Autowired + IMasterAuthorSubscribeService subscribeService; + /** + * + * @description: 閫氳繃姹囬泦id鎵ц姹囬泦浠诲姟 + * @param id 姹囬泦id + * @return: 鎵ц缁撴灉 + * + */ @Override public Result run(String id) { + XxlJobLogger.log("log info: run" ); if (StringUtils.isEmpty(id)) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } SysAssemble assemble = selectById(id); if (assemble == null) { + XxlJobLogger.log("assemble not found" ); return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } + assemble.setPreTime(new Date()); assemble.setPreStatus(SysAssembleRunStatus.working); assemble.updateById(); @@ -96,6 +113,7 @@ if (!assemble.getStatus().equals(SysAssembleStatus.working)) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreCnt(0); + XxlJobLogger.log("褰撳墠浠诲姟涓嶅湪宸ヤ綔涓�,鐘舵��:" + assemble.getStatus() ); assemble.setPreMsg("褰撳墠浠诲姟涓嶅湪宸ヤ綔涓�,鐘舵��:" + assemble.getStatus()); assemble.updateById(); return Result.error(new CodeMsg(6009,"褰撳墠浠诲姟涓嶅湪宸ヤ綔涓�,鐘舵��:" + assemble.getStatus())); @@ -110,6 +128,7 @@ if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreCnt(0); + XxlJobLogger.log("褰撳墠鏈夋祦绋嬫鍦ㄨ繍琛�,鏆傛椂鏃犳硶姹囬泦涓嬫鏁版嵁:" + status.toString() ); assemble.setPreMsg("褰撳墠鏈夋祦绋嬫鍦ㄨ繍琛�,鏆傛椂鏃犳硶姹囬泦涓嬫鏁版嵁:" + status.toString()); assemble.updateById(); return Result.error(new CodeMsg(6009,"褰撳墠鏈夋祦绋嬫鍦ㄨ繍琛�,鏆傛椂鏃犳硶姹囬泦涓嬫鏁版嵁:" + status.toString())); @@ -119,17 +138,24 @@ } Boolean bigData = assemble.getBigdata(); - + Date scheduleDate = new Date(); + XxlJobLogger.log("log info: start assemble"); //1 load from db try{ Result result = runByDb(assemble); + Date dbDate = new Date(); + XxlJobLogger.log("log info db time:" + (dbDate.getTime() - scheduleDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.updateById(); return result; } + //2 load from api result = runByApi(id); + Date apiDate = new Date(); + XxlJobLogger.log("log info api time:" + (apiDate.getTime() - dbDate.getTime()) + "ms"); + if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.updateById(); @@ -137,9 +163,14 @@ } //3check temp table; result = checkTempTable(assemble); + Date checkDate = new Date(); + XxlJobLogger.log("log info check time:" + (checkDate.getTime() - apiDate.getTime()) + "ms"); + if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); - assemble.setPreMsg("妫�娴嬩富鏁版嵁涓存椂琛ㄥ嚭鐜伴敊璇�"); + XxlJobLogger.log("妫�娴嬩富鏁版嵁涓存椂琛ㄥ嚭鐜伴敊璇�"+ result.getMessage()); + + assemble.setPreMsg("妫�娴嬩富鏁版嵁涓存椂琛ㄥ嚭鐜伴敊璇�:" + result.getMessage()); assemble.updateById(); return result; } @@ -147,63 +178,99 @@ //4 purge data String purgeSql = assemble.getPurgeSql(); purgeSql = DbUtils.replaceEscape(purgeSql); + XxlJobLogger.log("assemble db purgeSql sql:" + purgeSql); result = purgeData(purgeSql, assemble.getId(), bigData); + Date purgeDate = new Date(); + XxlJobLogger.log("log info purge time:" + (purgeDate.getTime() - checkDate.getTime()) + "ms"); + if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); - assemble.setPreMsg("杩愯娓呮礂sql鍑虹幇閿欒: "+purgeSql ); + assemble.setPreMsg("杩愯娓呮礂sql鍑虹幇閿欒: "+purgeSql + "," + result.getMessage()); + XxlJobLogger.log("杩愯娓呮礂sql鍑虹幇閿欒: "+purgeSql + "," + result.getMessage() ); assemble.updateById(); return result; } //5 check temp data result = checkTempData(assemble); + Date checkTempDate = new Date(); + XxlJobLogger.log("log info check temp time:" + (checkTempDate.getTime() - purgeDate.getTime()) + "ms"); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); - assemble.setPreMsg("璐ㄩ噺妫�楠岀幆鑺傚嚭鐜伴敊璇�" ); + assemble.setPreMsg("璐ㄩ噺妫�楠岀幆鑺傚嚭鐜伴敊璇�:" + result.getMessage()); + XxlJobLogger.log("璐ㄩ噺妫�楠岀幆鑺傚嚭鐜伴敊璇�" + result.getMessage()); assemble.updateById(); return result; } //6 temp 2 record + result = needRollBack(assemble, result, checkTempDate); - result = temp2record(assemble); if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); - assemble.setPreMsg("鏁版嵁鎼繍鑷充富鏁版嵁璁板綍鐜妭鍑虹幇閿欒" ); - assemble.updateById(); - return result; - } - - //7 record 閾炬帴 鐗堟湰 - result = linkMaintain(assemble); - if (!result.getSuccess()) { - assemble.setPreStatus(SysAssembleRunStatus.fail); - assemble.setPreMsg("鑱旀帴鐗堟湰鍑虹幇閿欒" ); - assemble.updateById(); - return result; - } - - //todo 娣诲姞浜嬪姟 log璁板綍 - //8 鏇存柊 鍙傛暟 - result = updateParams(assemble); - if (!result.getSuccess()) { - assemble.setPreStatus(SysAssembleRunStatus.fail); - assemble.setPreMsg("鏇存柊鍙橀噺鐜妭鍑虹幇閿欒" ); + XxlJobLogger.log("涓存椂鏁版嵁閾炬帴涓绘暟鎹郴缁熼敊璇細" +result.getMessage()); + assemble.setPreMsg("涓存椂鏁版嵁閾炬帴涓绘暟鎹郴缁熼敊璇細" +result.getMessage()); }else { assemble.setPreMsg("姹囬泦鎴愬姛"); + XxlJobLogger.log("姹囬泦鎴愬姛"); assemble.setPreStatus(SysAssembleRunStatus.success); } assemble.updateById(); return result; } catch (Exception e) { - assemble.setPreMsg("姹囬泦浠诲姟鍑虹幇閿欒"); + XxlJobLogger.log("姹囬泦浠诲姟鍑虹幇閿欒锛�"+e.getMessage()); + assemble.setPreMsg("姹囬泦浠诲姟鍑虹幇閿欒锛�"+e.getMessage()); assemble.setPreStatus(SysAssembleRunStatus.fail).updateById(); e.printStackTrace(); return Result.error(new CodeMsg(6009, e.getMessage())); } } + + @Transactional(rollbackFor = {Exception.class, Error.class}) + private Result needRollBack(SysAssemble assemble, Result result, Date checkTempDate) { + result = temp2record(assemble); + Date temp2RecordDate = new Date(); + XxlJobLogger.log("log info temp2Record time:" + (temp2RecordDate.getTime() - checkTempDate.getTime()) + "ms"); + + if (!result.getSuccess()) { + assemble.setPreStatus(SysAssembleRunStatus.fail); + XxlJobLogger.log("鏁版嵁鎼繍鑷充富鏁版嵁璁板綍鐜妭鍑虹幇閿欒;" + result.getMessage() ); + assemble.setPreMsg("鏁版嵁鎼繍鑷充富鏁版嵁璁板綍鐜妭鍑虹幇閿欒:" + result.getMessage()); + assemble.updateById(); + return result; + } + + + //7 record 閾炬帴 鐗堟湰 + + result = linkMaintain(assemble); + Date linkMaintainDate = new Date(); + XxlJobLogger.log("log info link maintain time:" + (linkMaintainDate.getTime() - temp2RecordDate.getTime()) + "ms"); + + if (!result.getSuccess()) { + assemble.setPreStatus(SysAssembleRunStatus.fail); + XxlJobLogger.log("鑱旀帴鐗堟湰鍑虹幇閿欒:" + result.getMessage()); + + assemble.setPreMsg("鑱旀帴鐗堟湰鍑虹幇閿欒:" + result.getMessage()); + assemble.updateById(); + return result; + } + + //8 鏇存柊 鍙傛暟 + result = updateParams(assemble); + if (!result.getSuccess()) { + assemble.setPreStatus(SysAssembleRunStatus.fail); + XxlJobLogger.log("鏇存柊鍙傛暟閿欒:" + result.getMessage()); + + assemble.setPreMsg("鏇存柊鍙傛暟閿欒:" + result.getMessage()); + assemble.updateById(); + return result; + } + return result; + } + private Result linkMaintain(SysAssemble assemble) { try{ @@ -216,8 +283,13 @@ isBigVersion = true; } Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion); + XxlJobLogger.log("log info create now maintain:" + maintain.getId()); + Date before2Record = new Date(); Result result = temp2recordUpdate(assemble, maintain); + Date after2Record = new Date(); + XxlJobLogger.log("log info temp2Record time:" + (after2Record.getTime() - before2Record.getTime()) + "ms"); + if (!result.getSuccess()) { assemble.setPreStatus(SysAssembleRunStatus.fail); assemble.setPreMsg("鎼繍鏇存柊瀛楁鍑虹幇閿欒" ); @@ -248,10 +320,18 @@ }else { //鐩存帴杩愯 Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date()); + flows.setBusinessType(ActivitiBusinessType.maintain); flows.insert(); maintain.setFlowId(flows.getId()); maintain.updateById(); - maintainService.dealFlow(maintain.getId(), ActivitiStatus.open); + maintainService.dealFlow(maintain.getId(), flows.getStatus()); + log.info("flow-maintainService end"); + viewService.dealFlow(maintain.getId(), flows.getStatus()); + log.info("flow-viewService end"); + subscribeService.dealFlow(maintain.getId(), flows.getStatus()); + log.info("flow-subscribeService end"); + menuService.dealFlow(maintain.getId(), flows.getStatus(), flows.getUserId()); + log.info("flow-menuService end"); masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false); } return Result.success(null); @@ -455,8 +535,10 @@ int cnt = 0; if (updateType.equals(SysAssembleUpdateType.All)) { String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName); + XxlJobLogger.log("update sql:" + sql); PreparedStatement preparedStatement = conn.prepareStatement(sql); cnt = preparedStatement.executeUpdate(); + }else if(updateType.equals(SysAssembleUpdateType.Increment)) { String updateFields = assemble.getUpdateFields(); String[] split = updateFields.split(Constant.SEMICOLON); @@ -469,16 +551,21 @@ .collect(Collectors.joining(Constant.COMMA)); String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr); + log.info(insertSql); + XxlJobLogger.log("update Increment sql:" + insertSql); PreparedStatement preparedStatement = conn.prepareStatement(insetSql); int insertCnt = preparedStatement.executeUpdate(); cnt += insertCnt; + } + XxlJobLogger.log("update sql cnt:" + cnt); assemble.setPreCnt(cnt); assemble.updateById(); return Result.success(null); } catch (Exception e) { e.printStackTrace(); + return Result.error(new CodeMsg(6005, e.getMessage())); } finally { @@ -511,47 +598,53 @@ } private Result checkTempData(SysAssemble assemble) { - //TODO + try { - SysAssembleCheckType checkType = assemble.getCheckType(); - if (checkType == null) { - return Result.error(new CodeMsg(6009,"瑙勫垯鏍¢獙绫诲瀷涓虹┖")); - } - if (checkType.equals(SysAssembleCheckType.unlimited)) { - return Result.success(null); - } - String menuId = assemble.getMenuId(); - MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); - String tableName = menuMapping.getTableName(); - String tempTableName = Constant.Temp + tableName; - HashMap<String,Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId()); - switch (checkType){ - case successAdd: - String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA)); - if (fieldResultSet.keySet().contains(false)) { - return Result.error(new CodeMsg(6009,"瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + unSuccessFields)); - } - break; - case partSuccessAdd: - String checkFields = assemble.getCheckFields(); - String[] split = checkFields.split(Constant.SEMICOLON); - for (String s : split) { - Boolean checked = fieldResultSet.get(s); - if (checked == null || !checked) { - return Result.error(new CodeMsg(6009,"瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + s)); + SysAssembleCheckType checkType = assemble.getCheckType(); + if (checkType == null) { + return Result.error(new CodeMsg(6009, "瑙勫垯鏍¢獙绫诲瀷涓虹┖")); + } + if (checkType.equals(SysAssembleCheckType.unlimited)) { + return Result.success(null); + } + + String menuId = assemble.getMenuId(); + MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId)); + String tableName = menuMapping.getTableName(); + String tempTableName = Constant.Temp + tableName; + HashMap<String, Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId()); + switch (checkType) { + case successAdd: + String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA)); + if (fieldResultSet.keySet().contains(false)) { + return Result.error(new CodeMsg(6009, "瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + unSuccessFields)); } - } - break; + break; + case partSuccessAdd: + String checkFields = assemble.getCheckFields(); + String[] split = checkFields.split(Constant.SEMICOLON); + for (String s : split) { + Boolean checked = fieldResultSet.get(s); + if (checked == null || !checked) { + return Result.error(new CodeMsg(6009, "瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + s)); + } + } + break; + } + + + return Result.success(null); + } catch (Exception e) { + e.printStackTrace(); + return Result.error(new CodeMsg(3009, e.getMessage())); } - - - - return Result.success(null); } + private Result purgeData(String purgeSql, String assembleId, boolean bigData) { Connection connection = null; + Savepoint purge = null; try { DataSourceInfo dataSourceInfo = null; if (bigData) { @@ -560,7 +653,8 @@ dataSourceInfo = unBigDataDataSourceInfo; } connection = dataSourceInfo.conn(); - + connection.setAutoCommit(false); + purge = connection.setSavepoint("purge"); List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", assembleId)); HashMap<String, String> paramsMap = new HashMap<>(); Set<String> matcher = DbUtils.matcher(purgeSql); @@ -591,14 +685,46 @@ String preParams = MessageFormat.format(Constant.ParamsShell, key); purgeSql = purgeSql.replace(preParams, val); } + XxlJobLogger.log("purgeSql:" +purgeSql); + List<String> split = DbUtils.split(purgeSql, Constant.SEMICOLON); + XxlJobLogger.log("split size:" + split.size()); + PreparedStatement preparedStatement = null; + if (!split.isEmpty()) { + for (String oneSql : split) { + XxlJobLogger.log("one sql:" + oneSql); + if (preparedStatement == null) { + preparedStatement = connection.prepareStatement(oneSql); + preparedStatement.addBatch(oneSql); + } else { + preparedStatement.addBatch(oneSql); + } + } + } + int[] ints = preparedStatement.executeBatch(); + for (int i = 0; i < ints.length; i++) { + XxlJobLogger.log("sqls count:" + ints[i]); + } - PreparedStatement preparedStatement = connection.prepareStatement(purgeSql); - preparedStatement.execute(); + connection.commit(); + return Result.success(null); } catch (SQLException e) { e.printStackTrace(); - return Result.error(new CodeMsg(6004, "杩愯娓呮礂sql鍑洪敊," + e.getSQLState())); + if (connection != null) { + try { + if (purge != null) { + connection.rollback(purge); + } else { + connection.rollback(); + } + + } catch (SQLException e1) { + e1.printStackTrace(); + return Result.error(new CodeMsg(6004, "鍥炴粴sql鍑洪敊," + e.getMessage())); + } + } + return Result.error(new CodeMsg(6004, "杩愯娓呮礂sql鍑洪敊," + e.getMessage())); } finally { if (connection != null) { try { @@ -617,19 +743,24 @@ } private Result runByDb(SysAssemble assemble) { - String id = assemble.getId(); - Boolean bigdata = assemble.getBigdata(); + try { + String id = assemble.getId(); + Boolean bigdata = assemble.getBigdata(); - List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true)); - for (SysAssembleDb sysAssembleDb : dbList) { - String dbId = sysAssembleDb.getId(); - Result result = loadOneDb(sysAssembleDb, bigdata, id); - if (!result.getSuccess()) { - assemble.setPreMsg("瀵煎叆婧愭暟鎹敊璇�:" + sysAssembleDb.getDatabaseName()).updateById(); - return result; + List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true)); + for (SysAssembleDb sysAssembleDb : dbList) { + String dbId = sysAssembleDb.getId(); + Result result = loadOneDb(sysAssembleDb, bigdata, id); + XxlJobLogger.log("assemble db load one db:" + sysAssembleDb.getDatasourceUrl()); + if (!result.getSuccess()) { + assemble.setPreMsg("瀵煎叆婧愭暟鎹敊璇�:" + sysAssembleDb.getDatabaseName()).updateById(); + return result; + } } + return Result.success(null); + } catch (Exception e) { + return Result.error(new CodeMsg(3009, e.getMessage())); } - return Result.success(null); } private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) { @@ -638,14 +769,17 @@ if (conn == null) { return Result.error(new CodeMsg(6002, MessageFormat.format("鏈兘杩炴帴鍒版簮 id: {0}", dbId))); } + Connection dataSourceConn = null; + PreparedStatement statement = null; try { List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true)); for (SysAssembleDbTable dbTable : tableList) { String tableId = dbTable.getId(); String tempTableName = dbTable.getTempTableName(); - + XxlJobLogger.log("assemble table" + tempTableName); List<SysAssembleDbField> fieldList = fieldService.selectList(new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId)); String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA)); + XxlJobLogger.log("assemble fields" + fields); SysAssembleTableType type = dbTable.getType(); String tableName = null; String assembleTempTableName = tempTableName; @@ -654,6 +788,7 @@ } else if (type.equals(SysAssembleTableType.table)) { tableName = dbTable.getTableName(); } + XxlJobLogger.log("assemble source table:" + tableName); //TODO assembleTempTableName 鍙兘浼氳秴闀� 64瀛楄妭 鍚庣画淇敼 DataSourceInfo dataSourceInfo = null; if (!bigData) { @@ -663,36 +798,41 @@ } // drop temp data - dataSourceInfo.truncateData(tempTableName); - + boolean b = dataSourceInfo.truncateData(tempTableName); + XxlJobLogger.log("assemble truncateData :" + b); fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId); - + XxlJobLogger.log("assemble fixAssembleTempTable :" ); //checkAssembleTempTableExists(assembleTempTableName); - //TODO 鏈垎椤� String filter = dbTable.getFilter(); - Set<String> matcher = DbUtils.matcher(filter); - for (String code : matcher) { - SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code)); - if (sysAssembleParams == null){ - return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鍖归厤鍒�:"+ code)); + XxlJobLogger.log("assemble raw filter:" + filter); + if (!StringUtils.isEmpty(filter)) { + Set<String> matcher = DbUtils.matcher(filter); + for (String code : matcher) { + SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code)); + if (sysAssembleParams == null){ + return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鍖归厤鍒�:"+ code)); + } + String val = sysAssembleParams.getVal(); + if (StringUtils.isEmpty(val)) { + return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鑾峰彇鍒板��:"+ code)); + } + val = DbUtils.quotedStr(val); + filter = filter.replace(DbUtils.assemblParam(code), val); } - String val = sysAssembleParams.getVal(); - if (StringUtils.isEmpty(val)) { - return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鑾峰彇鍒板��:"+ code)); - } - val = DbUtils.quotedStr(val); - filter = filter.replace(DbUtils.assemblParam(code), val); - } - if (StringUtils.isEmpty(filter)) { + }else { filter = Constant.WHERE_DEFAULT; } + + XxlJobLogger.log("assemble db filter:" + filter); String runSqlTemplate = null; if (type.equals(SysAssembleTableType.table)){ runSqlTemplate = Constant.selectFieldTableTemplate; }else if(type.equals(SysAssembleTableType.sql)){ runSqlTemplate = Constant.selectFieldSqlTemplate; } + XxlJobLogger.log("assemble db select sql template :" + runSqlTemplate); String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter); + XxlJobLogger.log("assemble db select sql:" + sql); PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ps.setFetchSize(Integer.MIN_VALUE); ps.setFetchDirection(ResultSet.FETCH_REVERSE); @@ -700,28 +840,29 @@ ResultSetMetaData metaData = resultSet.getMetaData(); int columnCount = metaData.getColumnCount(); - String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fields); + String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fieldList); + XxlJobLogger.log("assemble db insert sql:" + insertMySqlSql); int cnt = 0; - StringBuilder builder = new StringBuilder(); + dataSourceConn = dataSourceInfo.conn(); + dataSourceConn.setAutoCommit(false); + statement = dataSourceConn.prepareStatement(insertMySqlSql); + int count = 0; while (resultSet.next()) { - + count++; for (int i = 1; i <= columnCount; i++) { - if (i == columnCount) { - builderEnd(builder, resultSet.getObject(i)); - }else { - builderAppend(builder, resultSet.getObject(i)); - } - + statement.setString(i, resultSet.getString(i)); } + statement.addBatch(); cnt++; if (cnt == 5000) { - runOneBatch(bigData, insertMySqlSql, builder); + statement.executeBatch(); } } - if (builder.length() > 0) { - runOneBatch(bigData, insertMySqlSql, builder); - } + statement.executeBatch(); + XxlJobLogger.log("assemble db select sql count:" + count); } + + dataSourceConn.commit(); return Result.success(null); } catch (Exception e) { @@ -735,8 +876,60 @@ e.printStackTrace(); } } + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + if (dataSourceConn != null) { + try { + dataSourceConn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } } + } + + private void setOneBatch(boolean bigData, String insertMySqlSql, List<List<String>> branchList) { + Connection conn = null; + try { + DataSourceInfo dataSourceInfo = null; + if (bigData) { + dataSourceInfo = bigDataDataSourceInfo; + }else { + dataSourceInfo = unBigDataDataSourceInfo; + } + + conn = dataSourceInfo.conn(); + PreparedStatement statement = conn.prepareStatement(insertMySqlSql); + int result = 0; + if (branchList.isEmpty()) { + return; + } + for (List<String> stringList : branchList) { + for (int i = 0; i < stringList.size(); i++) { + statement.setString(i + 1, stringList.get(i)); + } + statement.addBatch(); + } + statement.executeBatch(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + if (conn != null) { + try { + conn.close(); + } catch (SQLException e) { + e.printStackTrace(); + } + } + } } private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException { @@ -789,12 +982,17 @@ public void builderAppend(StringBuilder builder, Object object) { builder.append(object); - builder.append("\t"); + builder.append(","); } - public String assembleSql(String dataBaseName, String tableName, String fields) { - String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + "(" + fields + ")"; + public String assembleSql(String dataBaseName, String tableName, List<SysAssembleDbField> fieldList) { + String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA)); + ContentBuilder builder = new ContentBuilder(Constant.COMMA); + for (int i = 0; i < fieldList.size(); i++) { + builder.append(Constant.QUESTION); + } + String sql = "INSERT INTO " + dataBaseName + "." + tableName + "(" + fields + ") values(" + builder.toString() + ")"; return sql; } @@ -816,9 +1014,10 @@ PreparedStatement statement = conn.prepareStatement(sql); int result = 0; if (statement.isWrapperFor(java.sql.Statement.class)) { - com.mysql.cj.jdbc.PreparedStatement mysqlStatement = statement.unwrap( com.mysql.cj.jdbc.PreparedStatement.class); - mysqlStatement.setLocalInfileInputStream(dataStream); - result = mysqlStatement.executeUpdate(); +// com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class); +// mysqlStatement.setLocalInfileInputStream(dataStream); +// statement. +// result = mysqlStatement.executeUpdate(); } return result; } -- Gitblit v1.8.0