From c007f0ca1785db093d48f4846cda82fe8e955765 Mon Sep 17 00:00:00 2001
From: kimi <kimi42345@gmail.com>
Date: 星期三, 27 五月 2020 09:59:29 +0800
Subject: [PATCH] merage

---
 src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java |  423 ++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 311 insertions(+), 112 deletions(-)

diff --git a/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java b/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java
index 19170e4..d88c785 100644
--- a/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java
+++ b/src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java
@@ -14,11 +14,13 @@
 import com.highdatas.mdm.util.ContentBuilder;
 import com.highdatas.mdm.util.DbUtils;
 import com.highdatas.mdm.util.RuleClient;
+import com.xxl.job.core.log.XxlJobLogger;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.mybatis.spring.SqlSessionTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.io.ByteArrayInputStream;
@@ -78,17 +80,32 @@
     RuleClient ruleClient;
     @Autowired
     IMasterModifiedService masterModifiedService;
-
+    @Autowired
+    ISysViewService viewService;
+    @Autowired
+    ISysMenuService menuService;
+    @Autowired
+    IMasterAuthorSubscribeService subscribeService;
+    /**
+     *
+     * @description:  閫氳繃姹囬泦id鎵ц姹囬泦浠诲姟
+     * @param id 姹囬泦id
+     * @return: 鎵ц缁撴灉
+     *
+     */
     @Override
     public Result run(String id) {
+        XxlJobLogger.log("log info: run" );
         if (StringUtils.isEmpty(id)) {
             return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
         }
 
         SysAssemble assemble = selectById(id);
         if (assemble == null) {
+            XxlJobLogger.log("assemble not found" );
             return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
         }
+
         assemble.setPreTime(new Date());
         assemble.setPreStatus(SysAssembleRunStatus.working);
         assemble.updateById();
@@ -96,6 +113,7 @@
         if (!assemble.getStatus().equals(SysAssembleStatus.working)) {
             assemble.setPreStatus(SysAssembleRunStatus.fail);
             assemble.setPreCnt(0);
+            XxlJobLogger.log("褰撳墠浠诲姟涓嶅湪宸ヤ綔涓�,鐘舵��:" + assemble.getStatus() );
             assemble.setPreMsg("褰撳墠浠诲姟涓嶅湪宸ヤ綔涓�,鐘舵��:" + assemble.getStatus());
             assemble.updateById();
             return  Result.error(new CodeMsg(6009,"褰撳墠浠诲姟涓嶅湪宸ヤ綔涓�,鐘舵��:" + assemble.getStatus()));
@@ -110,6 +128,7 @@
                 if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) {
                     assemble.setPreStatus(SysAssembleRunStatus.fail);
                     assemble.setPreCnt(0);
+                    XxlJobLogger.log("褰撳墠鏈夋祦绋嬫鍦ㄨ繍琛�,鏆傛椂鏃犳硶姹囬泦涓嬫鏁版嵁:" + status.toString() );
                     assemble.setPreMsg("褰撳墠鏈夋祦绋嬫鍦ㄨ繍琛�,鏆傛椂鏃犳硶姹囬泦涓嬫鏁版嵁:" + status.toString());
                     assemble.updateById();
                     return  Result.error(new CodeMsg(6009,"褰撳墠鏈夋祦绋嬫鍦ㄨ繍琛�,鏆傛椂鏃犳硶姹囬泦涓嬫鏁版嵁:" + status.toString()));
@@ -119,17 +138,24 @@
         }
 
         Boolean bigData = assemble.getBigdata();
-
+        Date scheduleDate = new Date();
+        XxlJobLogger.log("log info:  start assemble");
         //1 load  from db
         try{
             Result result = runByDb(assemble);
+            Date dbDate = new Date();
+            XxlJobLogger.log("log info db time:" + (dbDate.getTime() - scheduleDate.getTime()) + "ms");
             if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.updateById();
                 return result;
             }
+
             //2 load  from api
             result = runByApi(id);
+            Date apiDate = new Date();
+            XxlJobLogger.log("log info api time:" + (apiDate.getTime() - dbDate.getTime()) + "ms");
+
             if (!result.getSuccess()) {
                 assemble.setPreStatus(SysAssembleRunStatus.fail);
                 assemble.updateById();
@@ -137,9 +163,14 @@
             }
             //3check temp table;
             result = checkTempTable(assemble);
+            Date checkDate = new Date();
+            XxlJobLogger.log("log info check time:" + (checkDate.getTime() - apiDate.getTime()) + "ms");
+
             if (!result.getSuccess()) {
                 assemble.setPreStatus(SysAssembleRunStatus.fail);
-                assemble.setPreMsg("妫�娴嬩富鏁版嵁涓存椂琛ㄥ嚭鐜伴敊璇�");
+                XxlJobLogger.log("妫�娴嬩富鏁版嵁涓存椂琛ㄥ嚭鐜伴敊璇�"+ result.getMessage());
+
+                assemble.setPreMsg("妫�娴嬩富鏁版嵁涓存椂琛ㄥ嚭鐜伴敊璇�:" + result.getMessage());
                 assemble.updateById();
                 return result;
             }
@@ -147,63 +178,99 @@
             //4 purge data
             String purgeSql = assemble.getPurgeSql();
             purgeSql = DbUtils.replaceEscape(purgeSql);
+            XxlJobLogger.log("assemble db purgeSql sql:" + purgeSql);
             result = purgeData(purgeSql, assemble.getId(), bigData);
+            Date purgeDate = new Date();
+            XxlJobLogger.log("log info purge time:" + (purgeDate.getTime() - checkDate.getTime()) + "ms");
+
             if (!result.getSuccess()) {
                 assemble.setPreStatus(SysAssembleRunStatus.fail);
-                assemble.setPreMsg("杩愯娓呮礂sql鍑虹幇閿欒: "+purgeSql );
+                assemble.setPreMsg("杩愯娓呮礂sql鍑虹幇閿欒: "+purgeSql + "," + result.getMessage());
+                XxlJobLogger.log("杩愯娓呮礂sql鍑虹幇閿欒: "+purgeSql + "," + result.getMessage() );
                 assemble.updateById();
                 return result;
             }
 
             //5 check temp data
             result = checkTempData(assemble);
+            Date checkTempDate = new Date();
+            XxlJobLogger.log("log info check temp time:" + (checkTempDate.getTime() - purgeDate.getTime()) + "ms");
 
             if (!result.getSuccess()) {
                 assemble.setPreStatus(SysAssembleRunStatus.fail);
-                assemble.setPreMsg("璐ㄩ噺妫�楠岀幆鑺傚嚭鐜伴敊璇�" );
+                assemble.setPreMsg("璐ㄩ噺妫�楠岀幆鑺傚嚭鐜伴敊璇�:" + result.getMessage());
+                XxlJobLogger.log("璐ㄩ噺妫�楠岀幆鑺傚嚭鐜伴敊璇�" + result.getMessage());
                 assemble.updateById();
                 return result;
             }
             //6 temp 2 record
+            result = needRollBack(assemble, result, checkTempDate);
 
-            result = temp2record(assemble);
             if (!result.getSuccess()) {
                 assemble.setPreStatus(SysAssembleRunStatus.fail);
-                assemble.setPreMsg("鏁版嵁鎼繍鑷充富鏁版嵁璁板綍鐜妭鍑虹幇閿欒" );
-                assemble.updateById();
-                return result;
-            }
-
-            //7 record 閾炬帴 鐗堟湰
-            result = linkMaintain(assemble);
-            if (!result.getSuccess()) {
-                assemble.setPreStatus(SysAssembleRunStatus.fail);
-                assemble.setPreMsg("鑱旀帴鐗堟湰鍑虹幇閿欒" );
-                assemble.updateById();
-                return result;
-            }
-
-            //todo 娣诲姞浜嬪姟  log璁板綍
-            //8 鏇存柊 鍙傛暟
-            result = updateParams(assemble);
-            if (!result.getSuccess()) {
-                assemble.setPreStatus(SysAssembleRunStatus.fail);
-                assemble.setPreMsg("鏇存柊鍙橀噺鐜妭鍑虹幇閿欒" );
+                XxlJobLogger.log("涓存椂鏁版嵁閾炬帴涓绘暟鎹郴缁熼敊璇細" +result.getMessage());
+                assemble.setPreMsg("涓存椂鏁版嵁閾炬帴涓绘暟鎹郴缁熼敊璇細" +result.getMessage());
             }else {
                 assemble.setPreMsg("姹囬泦鎴愬姛");
+                XxlJobLogger.log("姹囬泦鎴愬姛");
                 assemble.setPreStatus(SysAssembleRunStatus.success);
             }
             assemble.updateById();
             return result;
         }
         catch (Exception e) {
-            assemble.setPreMsg("姹囬泦浠诲姟鍑虹幇閿欒");
+            XxlJobLogger.log("姹囬泦浠诲姟鍑虹幇閿欒锛�"+e.getMessage());
+            assemble.setPreMsg("姹囬泦浠诲姟鍑虹幇閿欒锛�"+e.getMessage());
             assemble.setPreStatus(SysAssembleRunStatus.fail).updateById();
             e.printStackTrace();
 
             return Result.error(new CodeMsg(6009, e.getMessage()));
         }
     }
+
+    @Transactional(rollbackFor = {Exception.class, Error.class})
+    private Result needRollBack(SysAssemble assemble, Result result, Date checkTempDate) {
+        result = temp2record(assemble);
+        Date temp2RecordDate = new Date();
+        XxlJobLogger.log("log info  temp2Record time:" + (temp2RecordDate.getTime() - checkTempDate.getTime()) + "ms");
+
+        if (!result.getSuccess()) {
+            assemble.setPreStatus(SysAssembleRunStatus.fail);
+            XxlJobLogger.log("鏁版嵁鎼繍鑷充富鏁版嵁璁板綍鐜妭鍑虹幇閿欒;" + result.getMessage() );
+            assemble.setPreMsg("鏁版嵁鎼繍鑷充富鏁版嵁璁板綍鐜妭鍑虹幇閿欒:" + result.getMessage());
+            assemble.updateById();
+            return result;
+        }
+
+
+        //7 record 閾炬帴 鐗堟湰
+
+        result = linkMaintain(assemble);
+        Date linkMaintainDate = new Date();
+        XxlJobLogger.log("log info  link maintain time:" + (linkMaintainDate.getTime() - temp2RecordDate.getTime()) + "ms");
+
+        if (!result.getSuccess()) {
+            assemble.setPreStatus(SysAssembleRunStatus.fail);
+            XxlJobLogger.log("鑱旀帴鐗堟湰鍑虹幇閿欒:" + result.getMessage());
+
+            assemble.setPreMsg("鑱旀帴鐗堟湰鍑虹幇閿欒:" + result.getMessage());
+            assemble.updateById();
+            return result;
+        }
+
+        //8 鏇存柊 鍙傛暟
+        result = updateParams(assemble);
+        if (!result.getSuccess()) {
+            assemble.setPreStatus(SysAssembleRunStatus.fail);
+            XxlJobLogger.log("鏇存柊鍙傛暟閿欒:" + result.getMessage());
+
+            assemble.setPreMsg("鏇存柊鍙傛暟閿欒:" + result.getMessage());
+            assemble.updateById();
+            return result;
+        }
+        return result;
+    }
+
 
     private Result linkMaintain(SysAssemble assemble) {
         try{
@@ -216,8 +283,13 @@
                 isBigVersion = true;
             }
             Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion);
+            XxlJobLogger.log("log info  create now maintain:" + maintain.getId());
+            Date before2Record = new Date();
 
             Result result = temp2recordUpdate(assemble, maintain);
+            Date after2Record = new Date();
+            XxlJobLogger.log("log info  temp2Record time:" + (after2Record.getTime() - before2Record.getTime()) + "ms");
+
             if (!result.getSuccess()) {
                 assemble.setPreStatus(SysAssembleRunStatus.fail);
                 assemble.setPreMsg("鎼繍鏇存柊瀛楁鍑虹幇閿欒" );
@@ -248,10 +320,18 @@
             }else  {
                 //鐩存帴杩愯
                 Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date());
+                flows.setBusinessType(ActivitiBusinessType.maintain);
                 flows.insert();
                 maintain.setFlowId(flows.getId());
                 maintain.updateById();
-                maintainService.dealFlow(maintain.getId(), ActivitiStatus.open);
+                maintainService.dealFlow(maintain.getId(), flows.getStatus());
+                log.info("flow-maintainService end");
+                viewService.dealFlow(maintain.getId(), flows.getStatus());
+                log.info("flow-viewService end");
+                subscribeService.dealFlow(maintain.getId(), flows.getStatus());
+                log.info("flow-subscribeService end");
+                menuService.dealFlow(maintain.getId(), flows.getStatus(), flows.getUserId());
+                log.info("flow-menuService end");
                 masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false);
             }
             return Result.success(null);
@@ -455,8 +535,10 @@
             int cnt = 0;
             if (updateType.equals(SysAssembleUpdateType.All)) {
                 String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName);
+                XxlJobLogger.log("update sql:" + sql);
                 PreparedStatement preparedStatement = conn.prepareStatement(sql);
                 cnt = preparedStatement.executeUpdate();
+
             }else if(updateType.equals(SysAssembleUpdateType.Increment)) {
                 String updateFields = assemble.getUpdateFields();
                 String[] split = updateFields.split(Constant.SEMICOLON);
@@ -469,16 +551,21 @@
                         .collect(Collectors.joining(Constant.COMMA));
 
                 String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr);
+                log.info(insertSql);
+                XxlJobLogger.log("update Increment  sql:" + insertSql);
                 PreparedStatement preparedStatement = conn.prepareStatement(insetSql);
                 int insertCnt = preparedStatement.executeUpdate();
                 cnt += insertCnt;
+
             }
+            XxlJobLogger.log("update sql cnt:" + cnt);
             assemble.setPreCnt(cnt);
             assemble.updateById();
             return Result.success(null);
         }
         catch (Exception e) {
             e.printStackTrace();
+
             return Result.error(new CodeMsg(6005, e.getMessage()));
         }
         finally {
@@ -511,47 +598,53 @@
     }
 
     private Result checkTempData(SysAssemble assemble) {
-        //TODO
+        try {
 
-        SysAssembleCheckType checkType = assemble.getCheckType();
-        if (checkType == null) {
-            return Result.error(new CodeMsg(6009,"瑙勫垯鏍¢獙绫诲瀷涓虹┖"));
-        }
-        if (checkType.equals(SysAssembleCheckType.unlimited)) {
-            return Result.success(null);
-        }
 
-        String menuId = assemble.getMenuId();
-        MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
-        String tableName = menuMapping.getTableName();
-        String tempTableName = Constant.Temp + tableName;
-        HashMap<String,Boolean> fieldResultSet =  ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId());
-        switch (checkType){
-            case successAdd:
-                String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA));
-                if (fieldResultSet.keySet().contains(false)) {
-                    return Result.error(new CodeMsg(6009,"瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + unSuccessFields));
-                }
-                break;
-            case partSuccessAdd:
-                String checkFields = assemble.getCheckFields();
-                String[] split = checkFields.split(Constant.SEMICOLON);
-                for (String s : split) {
-                    Boolean checked = fieldResultSet.get(s);
-                    if (checked == null || !checked) {
-                        return Result.error(new CodeMsg(6009,"瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + s));
+            SysAssembleCheckType checkType = assemble.getCheckType();
+            if (checkType == null) {
+                return Result.error(new CodeMsg(6009, "瑙勫垯鏍¢獙绫诲瀷涓虹┖"));
+            }
+            if (checkType.equals(SysAssembleCheckType.unlimited)) {
+                return Result.success(null);
+            }
+
+            String menuId = assemble.getMenuId();
+            MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
+            String tableName = menuMapping.getTableName();
+            String tempTableName = Constant.Temp + tableName;
+            HashMap<String, Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId());
+            switch (checkType) {
+                case successAdd:
+                    String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA));
+                    if (fieldResultSet.keySet().contains(false)) {
+                        return Result.error(new CodeMsg(6009, "瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + unSuccessFields));
                     }
-                }
-                break;
+                    break;
+                case partSuccessAdd:
+                    String checkFields = assemble.getCheckFields();
+                    String[] split = checkFields.split(Constant.SEMICOLON);
+                    for (String s : split) {
+                        Boolean checked = fieldResultSet.get(s);
+                        if (checked == null || !checked) {
+                            return Result.error(new CodeMsg(6009, "瑙勫垯鏍¢獙涓嶉�氳繃 瀛楁:" + s));
+                        }
+                    }
+                    break;
+            }
+
+
+            return Result.success(null);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return Result.error(new CodeMsg(3009, e.getMessage()));
         }
-
-
-
-        return Result.success(null);
     }
+
 
     private Result purgeData(String purgeSql, String assembleId, boolean bigData) {
         Connection connection = null;
+        Savepoint purge = null;
         try {
             DataSourceInfo dataSourceInfo = null;
             if (bigData) {
@@ -560,7 +653,8 @@
                 dataSourceInfo = unBigDataDataSourceInfo;
             }
             connection = dataSourceInfo.conn();
-
+            connection.setAutoCommit(false);
+            purge = connection.setSavepoint("purge");
             List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", assembleId));
             HashMap<String, String> paramsMap = new HashMap<>();
             Set<String> matcher = DbUtils.matcher(purgeSql);
@@ -591,14 +685,46 @@
                 String preParams = MessageFormat.format(Constant.ParamsShell, key);
                 purgeSql = purgeSql.replace(preParams, val);
             }
+            XxlJobLogger.log("purgeSql:" +purgeSql);
+            List<String> split = DbUtils.split(purgeSql, Constant.SEMICOLON);
+            XxlJobLogger.log("split size:" + split.size());
+            PreparedStatement preparedStatement = null;
+            if (!split.isEmpty()) {
+                for (String oneSql : split) {
+                    XxlJobLogger.log("one sql:" + oneSql);
+                    if (preparedStatement == null) {
+                        preparedStatement = connection.prepareStatement(oneSql);
+                        preparedStatement.addBatch(oneSql);
+                    } else {
+                        preparedStatement.addBatch(oneSql);
+                    }
+                }
+            }
+            int[] ints = preparedStatement.executeBatch();
+            for (int i = 0; i < ints.length; i++) {
+                XxlJobLogger.log("sqls count:" + ints[i]);
+            }
 
-            PreparedStatement preparedStatement = connection.prepareStatement(purgeSql);
-            preparedStatement.execute();
+            connection.commit();
+
             return Result.success(null);
 
         } catch (SQLException e) {
             e.printStackTrace();
-            return Result.error(new CodeMsg(6004, "杩愯娓呮礂sql鍑洪敊," + e.getSQLState()));
+            if (connection != null) {
+                try {
+                    if (purge != null) {
+                        connection.rollback(purge);
+                    } else {
+                        connection.rollback();
+                    }
+
+                } catch (SQLException e1) {
+                    e1.printStackTrace();
+                    return Result.error(new CodeMsg(6004, "鍥炴粴sql鍑洪敊," + e.getMessage()));
+                }
+            }
+            return Result.error(new CodeMsg(6004, "杩愯娓呮礂sql鍑洪敊," + e.getMessage()));
         } finally {
             if (connection != null) {
                 try {
@@ -617,19 +743,24 @@
     }
 
     private Result runByDb(SysAssemble assemble) {
-        String id = assemble.getId();
-        Boolean bigdata = assemble.getBigdata();
+        try {
+            String id = assemble.getId();
+            Boolean bigdata = assemble.getBigdata();
 
-        List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true));
-        for (SysAssembleDb sysAssembleDb : dbList) {
-            String dbId = sysAssembleDb.getId();
-            Result result = loadOneDb(sysAssembleDb, bigdata, id);
-            if (!result.getSuccess()) {
-                assemble.setPreMsg("瀵煎叆婧愭暟鎹敊璇�:" + sysAssembleDb.getDatabaseName()).updateById();
-                return result;
+            List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true));
+            for (SysAssembleDb sysAssembleDb : dbList) {
+                String dbId = sysAssembleDb.getId();
+                Result result = loadOneDb(sysAssembleDb, bigdata, id);
+                XxlJobLogger.log("assemble db load one db:" + sysAssembleDb.getDatasourceUrl());
+                if (!result.getSuccess()) {
+                    assemble.setPreMsg("瀵煎叆婧愭暟鎹敊璇�:" + sysAssembleDb.getDatabaseName()).updateById();
+                    return result;
+                }
             }
+            return Result.success(null);
+        } catch (Exception e) {
+            return Result.error(new CodeMsg(3009, e.getMessage()));
         }
-        return Result.success(null);
     }
 
     private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) {
@@ -638,14 +769,17 @@
         if (conn == null) {
             return Result.error(new CodeMsg(6002, MessageFormat.format("鏈兘杩炴帴鍒版簮 id: {0}", dbId)));
         }
+        Connection dataSourceConn = null;
+        PreparedStatement statement = null;
         try {
             List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true));
             for (SysAssembleDbTable dbTable : tableList) {
                 String tableId = dbTable.getId();
                 String tempTableName = dbTable.getTempTableName();
-
+                XxlJobLogger.log("assemble table" + tempTableName);
                 List<SysAssembleDbField> fieldList = fieldService.selectList(new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId));
                 String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
+                XxlJobLogger.log("assemble fields" + fields);
                 SysAssembleTableType type = dbTable.getType();
                 String tableName = null;
                 String assembleTempTableName = tempTableName;
@@ -654,6 +788,7 @@
                 } else if (type.equals(SysAssembleTableType.table)) {
                     tableName = dbTable.getTableName();
                 }
+                XxlJobLogger.log("assemble source table:" + tableName);
                 //TODO assembleTempTableName 鍙兘浼氳秴闀� 64瀛楄妭 鍚庣画淇敼
                 DataSourceInfo dataSourceInfo = null;
                 if (!bigData) {
@@ -663,36 +798,41 @@
                 }
 
                 // drop temp data
-                dataSourceInfo.truncateData(tempTableName);
-
+                boolean b = dataSourceInfo.truncateData(tempTableName);
+                XxlJobLogger.log("assemble  truncateData :" + b);
                 fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId);
-
+                XxlJobLogger.log("assemble  fixAssembleTempTable :" );
                 //checkAssembleTempTableExists(assembleTempTableName);
-                //TODO 鏈垎椤�
                 String filter = dbTable.getFilter();
-                Set<String> matcher = DbUtils.matcher(filter);
-                for (String code : matcher) {
-                    SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code));
-                    if (sysAssembleParams == null){
-                        return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鍖归厤鍒�:"+ code));
+                XxlJobLogger.log("assemble  raw  filter:" +  filter);
+                if (!StringUtils.isEmpty(filter)) {
+                    Set<String> matcher = DbUtils.matcher(filter);
+                    for (String code : matcher) {
+                        SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code));
+                        if (sysAssembleParams == null){
+                            return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鍖归厤鍒�:"+ code));
+                        }
+                        String val = sysAssembleParams.getVal();
+                        if (StringUtils.isEmpty(val)) {
+                            return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鑾峰彇鍒板��:"+ code));
+                        }
+                        val = DbUtils.quotedStr(val);
+                        filter = filter.replace(DbUtils.assemblParam(code), val);
                     }
-                    String val = sysAssembleParams.getVal();
-                    if (StringUtils.isEmpty(val)) {
-                        return Result.error(new CodeMsg(6009, assembleId + "鏈夊彉閲忔湭鑾峰彇鍒板��:"+ code));
-                    }
-                    val = DbUtils.quotedStr(val);
-                    filter = filter.replace(DbUtils.assemblParam(code), val);
-                }
-                if (StringUtils.isEmpty(filter)) {
+                }else {
                     filter = Constant.WHERE_DEFAULT;
                 }
+
+                XxlJobLogger.log("assemble db filter:" + filter);
                 String runSqlTemplate = null;
                 if (type.equals(SysAssembleTableType.table)){
                     runSqlTemplate = Constant.selectFieldTableTemplate;
                 }else if(type.equals(SysAssembleTableType.sql)){
                     runSqlTemplate = Constant.selectFieldSqlTemplate;
                 }
+                XxlJobLogger.log("assemble db  select sql template :" + runSqlTemplate);
                 String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter);
+                XxlJobLogger.log("assemble db select sql:" + sql);
                 PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
                 ps.setFetchSize(Integer.MIN_VALUE);
                 ps.setFetchDirection(ResultSet.FETCH_REVERSE);
@@ -700,28 +840,29 @@
                 ResultSetMetaData metaData = resultSet.getMetaData();
                 int columnCount = metaData.getColumnCount();
 
-                String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fields);
+                String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fieldList);
+                XxlJobLogger.log("assemble db insert sql:" + insertMySqlSql);
                 int cnt = 0;
-                StringBuilder builder = new StringBuilder();
+                dataSourceConn = dataSourceInfo.conn();
+                dataSourceConn.setAutoCommit(false);
+                statement = dataSourceConn.prepareStatement(insertMySqlSql);
+                int count = 0;
                 while (resultSet.next()) {
-
+                    count++;
                     for (int i = 1; i <= columnCount; i++) {
-                        if (i == columnCount) {
-                            builderEnd(builder, resultSet.getObject(i));
-                        }else {
-                            builderAppend(builder, resultSet.getObject(i));
-                        }
-
+                        statement.setString(i, resultSet.getString(i));
                     }
+                    statement.addBatch();
                     cnt++;
                     if (cnt == 5000) {
-                        runOneBatch(bigData, insertMySqlSql, builder);
+                        statement.executeBatch();
                     }
                 }
-                if (builder.length() > 0) {
-                    runOneBatch(bigData, insertMySqlSql, builder);
-                }
+                statement.executeBatch();
+                XxlJobLogger.log("assemble db select sql count:" + count);
             }
+
+            dataSourceConn.commit();
             return Result.success(null);
         }
         catch (Exception e) {
@@ -735,8 +876,60 @@
                     e.printStackTrace();
                 }
             }
+            if (statement != null) {
+                try {
+                    statement.close();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+            if (dataSourceConn != null) {
+                try {
+                    dataSourceConn.close();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
         }
 
+    }
+
+    private void setOneBatch(boolean bigData, String insertMySqlSql, List<List<String>> branchList) {
+        Connection conn = null;
+        try {
+            DataSourceInfo dataSourceInfo = null;
+            if (bigData) {
+                dataSourceInfo = bigDataDataSourceInfo;
+            }else {
+                dataSourceInfo = unBigDataDataSourceInfo;
+            }
+
+            conn = dataSourceInfo.conn();
+            PreparedStatement statement = conn.prepareStatement(insertMySqlSql);
+            int result = 0;
+            if (branchList.isEmpty()) {
+                return;
+            }
+            for (List<String> stringList : branchList) {
+                for (int i = 0; i < stringList.size(); i++) {
+                    statement.setString(i + 1, stringList.get(i));
+                }
+                statement.addBatch();
+            }
+            statement.executeBatch();
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+        }
+        finally {
+            if (conn != null) {
+                try {
+                    conn.close();
+                } catch (SQLException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
     }
 
     private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException {
@@ -789,12 +982,17 @@
 
     public void builderAppend(StringBuilder builder, Object object) {
         builder.append(object);
-        builder.append("\t");
+        builder.append(",");
     }
 
 
-    public String assembleSql(String dataBaseName, String tableName, String fields) {
-        String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + "(" + fields + ")";
+    public String assembleSql(String dataBaseName, String tableName, List<SysAssembleDbField> fieldList) {
+        String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
+        ContentBuilder builder = new ContentBuilder(Constant.COMMA);
+        for (int i = 0; i < fieldList.size(); i++) {
+            builder.append(Constant.QUESTION);
+        }
+        String sql = "INSERT INTO " + dataBaseName + "." + tableName + "(" + fields + ") values(" + builder.toString() + ")";
         return sql;
     }
 
@@ -816,9 +1014,10 @@
             PreparedStatement statement = conn.prepareStatement(sql);
             int result = 0;
             if (statement.isWrapperFor(java.sql.Statement.class)) {
-                com.mysql.cj.jdbc.PreparedStatement mysqlStatement = statement.unwrap( com.mysql.cj.jdbc.PreparedStatement.class);
-                mysqlStatement.setLocalInfileInputStream(dataStream);
-                result = mysqlStatement.executeUpdate();
+//                com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
+//                mysqlStatement.setLocalInfileInputStream(dataStream);
+//               statement.
+//                result = mysqlStatement.executeUpdate();
             }
             return result;
         }

--
Gitblit v1.8.0