kimi
2020-05-27 c007f0ca1785db093d48f4846cda82fe8e955765
src/main/java/com/highdatas/mdm/service/impl/SysAssembleServiceImpl.java
@@ -1,6 +1,7 @@
package com.highdatas.mdm.service.impl;
import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.baomidou.mybatisplus.service.impl.ServiceImpl;
import com.highdatas.mdm.entity.*;
import com.highdatas.mdm.mapper.SysAssembleMapper;
import com.highdatas.mdm.mapper.TableInfoMapper;
@@ -9,22 +10,19 @@
import com.highdatas.mdm.pojo.kettle.DataSourceInfo;
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
import com.highdatas.mdm.service.*;
import com.baomidou.mybatisplus.service.impl.ServiceImpl;
import com.highdatas.mdm.util.Constant;
import com.highdatas.mdm.util.ContentBuilder;
import com.highdatas.mdm.util.DbUtils;
import lombok.Data;
import com.highdatas.mdm.util.RuleClient;
import com.xxl.job.core.log.XxlJobLogger;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.SqlSession;
import org.apache.regexp.RE;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@@ -78,21 +76,48 @@
    @Autowired
    MasterDataService masterDataService;
    @Autowired
    RuleClient ruleClient;
    @Autowired
    IMasterModifiedService masterModifiedService;
    @Autowired
    ISysViewService viewService;
    @Autowired
    ISysMenuService menuService;
    @Autowired
    IMasterAuthorSubscribeService subscribeService;
    /**
     *
     * @description:  通过汇集id执行汇集任务
     * @param id 汇集id
     * @return: 执行结果
     *
     */
    @Override
    @Transactional(rollbackFor=Exception.class)
    public Result run(String id, HttpSession session) {
    public Result run(String id) {
        XxlJobLogger.log("log info: run" );
        if (StringUtils.isEmpty(id)) {
            return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
        }
        SysAssemble assemble = selectById(id);
        if (assemble == null) {
            XxlJobLogger.log("assemble not found" );
            return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
        }
        assemble.setPreTime(new Date());
        assemble.setPreStatus(SysAssembleRunStatus.working);
        assemble.updateById();
        if (!assemble.getStatus().equals(SysAssembleStatus.working)) {
            assemble.setPreStatus(SysAssembleRunStatus.fail);
            assemble.setPreCnt(0);
            XxlJobLogger.log("当前任务不在工作中,状态:" + assemble.getStatus() );
            assemble.setPreMsg("当前任务不在工作中,状态:" + assemble.getStatus());
            assemble.updateById();
            return  Result.error(new CodeMsg(6009,"当前任务不在工作中,状态:" + assemble.getStatus()));
        }
        String flowId = assemble.getFlowId();
@@ -101,26 +126,36 @@
            if (flows != null) {
                ActivitiStatus status = flows.getStatus();
                if (!status.equals(ActivitiStatus.close) && !status.equals(ActivitiStatus.open)) {
                    assemble.setPreStatus(SysAssembleRunStatus.fail);
                    assemble.setPreCnt(0);
                    XxlJobLogger.log("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString() );
                    assemble.setPreMsg("当前有流程正在运行,暂时无法汇集下次数据:" + status.toString());
                    assemble.updateById();
                    return  Result.error(new CodeMsg(6009,"当前有流程正在运行,暂时无法汇集下次数据:" + status.toString()));
                }
            }
        }
        assemble.setPreTime(new Date());
        assemble.setPreStatus(SysAssembleRunStatus.working);
        assemble.updateById();
        Boolean bigData = assemble.getBigdata();
        Date scheduleDate = new Date();
        XxlJobLogger.log("log info:  start assemble");
        //1 load  from db
        try{
            Result result = runByDb(assemble);
            Date dbDate = new Date();
            XxlJobLogger.log("log info db time:" + (dbDate.getTime() - scheduleDate.getTime()) + "ms");
            if (!result.getSuccess()) {
               assemble.setPreStatus(SysAssembleRunStatus.fail);
               assemble.updateById();
                return result;
            }
            //2 load  from api
            result = runByApi(id);
            Date apiDate = new Date();
            XxlJobLogger.log("log info api time:" + (apiDate.getTime() - dbDate.getTime()) + "ms");
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.updateById();
@@ -128,9 +163,14 @@
            }
            //3check temp table;
            result = checkTempTable(assemble);
            Date checkDate = new Date();
            XxlJobLogger.log("log info check time:" + (checkDate.getTime() - apiDate.getTime()) + "ms");
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("检测主数据临时表出现错误");
                XxlJobLogger.log("检测主数据临时表出现错误"+ result.getMessage());
                assemble.setPreMsg("检测主数据临时表出现错误:" + result.getMessage());
                assemble.updateById();
                return result;
            }
@@ -138,57 +178,49 @@
            //4 purge data
            String purgeSql = assemble.getPurgeSql();
            purgeSql = DbUtils.replaceEscape(purgeSql);
            XxlJobLogger.log("assemble db purgeSql sql:" + purgeSql);
            result = purgeData(purgeSql, assemble.getId(), bigData);
            Date purgeDate = new Date();
            XxlJobLogger.log("log info purge time:" + (purgeDate.getTime() - checkDate.getTime()) + "ms");
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("运行清洗sql出现错误: "+purgeSql );
                assemble.setPreMsg("运行清洗sql出现错误: "+purgeSql + "," + result.getMessage());
                XxlJobLogger.log("运行清洗sql出现错误: "+purgeSql + "," + result.getMessage() );
                assemble.updateById();
                return result;
            }
            //5 check temp data
            result = checkTempData(id, assemble.getCheckType(), assemble.getCheckFields());
            result = checkTempData(assemble);
            Date checkTempDate = new Date();
            XxlJobLogger.log("log info check temp time:" + (checkTempDate.getTime() - purgeDate.getTime()) + "ms");
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("质量检验环节出现错误" );
                assemble.setPreMsg("质量检验环节出现错误:" + result.getMessage());
                XxlJobLogger.log("质量检验环节出现错误" + result.getMessage());
                assemble.updateById();
                return result;
            }
            //6 temp 2 record
            result = needRollBack(assemble, result, checkTempDate);
            result = temp2record(assemble);
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("数据搬运至主数据记录环节出现错误" );
                assemble.updateById();
                return result;
            }
            //7 record 链接 版本
            result = linkMaintain(assemble, session);
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("联接版本出现错误" );
                assemble.updateById();
                return result;
            }
            //todo 添加事务  log记录
            //8 更新 参数
            result = updateParams(assemble);
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("更新变量环节出现错误" );
                XxlJobLogger.log("临时数据链接主数据系统错误:" +result.getMessage());
                assemble.setPreMsg("临时数据链接主数据系统错误:" +result.getMessage());
            }else {
                assemble.setPreMsg("汇集成功");
                XxlJobLogger.log("汇集成功");
                assemble.setPreStatus(SysAssembleRunStatus.success);
            }
            assemble.updateById();
            return result;
        }
        catch (Exception e) {
            assemble.setPreMsg("汇集任务出现错误");
            XxlJobLogger.log("汇集任务出现错误:"+e.getMessage());
            assemble.setPreMsg("汇集任务出现错误:"+e.getMessage());
            assemble.setPreStatus(SysAssembleRunStatus.fail).updateById();
            e.printStackTrace();
@@ -196,14 +228,68 @@
        }
    }
    private Result linkMaintain(SysAssemble assemble, HttpSession session) {
    @Transactional(rollbackFor = {Exception.class, Error.class})
    private Result needRollBack(SysAssemble assemble, Result result, Date checkTempDate) {
        result = temp2record(assemble);
        Date temp2RecordDate = new Date();
        XxlJobLogger.log("log info  temp2Record time:" + (temp2RecordDate.getTime() - checkTempDate.getTime()) + "ms");
        if (!result.getSuccess()) {
            assemble.setPreStatus(SysAssembleRunStatus.fail);
            XxlJobLogger.log("数据搬运至主数据记录环节出现错误;" + result.getMessage() );
            assemble.setPreMsg("数据搬运至主数据记录环节出现错误:" + result.getMessage());
            assemble.updateById();
            return result;
        }
        //7 record 链接 版本
        result = linkMaintain(assemble);
        Date linkMaintainDate = new Date();
        XxlJobLogger.log("log info  link maintain time:" + (linkMaintainDate.getTime() - temp2RecordDate.getTime()) + "ms");
        if (!result.getSuccess()) {
            assemble.setPreStatus(SysAssembleRunStatus.fail);
            XxlJobLogger.log("联接版本出现错误:" + result.getMessage());
            assemble.setPreMsg("联接版本出现错误:" + result.getMessage());
            assemble.updateById();
            return result;
        }
        //8 更新 参数
        result = updateParams(assemble);
        if (!result.getSuccess()) {
            assemble.setPreStatus(SysAssembleRunStatus.fail);
            XxlJobLogger.log("更新参数错误:" + result.getMessage());
            assemble.setPreMsg("更新参数错误:" + result.getMessage());
            assemble.updateById();
            return result;
        }
        return result;
    }
    private Result linkMaintain(SysAssemble assemble) {
        try{
            String menuId = assemble.getMenuId();
            MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
            String tableName = menuMapping.getTableName();
            Maintain maintain = masterDataService.uploadedData(tableName, assemble.getUpdateType(), assemble.getUserId());
            SysAssembleUpdateType updateType = assemble.getUpdateType();
            boolean isBigVersion = false;
            if (updateType.equals(SysAssembleUpdateType.All)) {
                isBigVersion = true;
            }
            Maintain maintain = masterDataService.uploadedData(tableName, updateType, assemble.getUserId(),isBigVersion);
            XxlJobLogger.log("log info  create now maintain:" + maintain.getId());
            Date before2Record = new Date();
            Result result = temp2recordUpdate(assemble, maintain);
            Date after2Record = new Date();
            XxlJobLogger.log("log info  temp2Record time:" + (after2Record.getTime() - before2Record.getTime()) + "ms");
            if (!result.getSuccess()) {
                assemble.setPreStatus(SysAssembleRunStatus.fail);
                assemble.setPreMsg("搬运更新字段出现错误" );
@@ -215,19 +301,38 @@
                //默认需要审批
                audit = true;
            }
            // 处理关联人
            if (audit) {
                Flows flows = activitiService.start("process", session, maintain.getId(),  ActivitiBusinessType.maintain);
                String chargeId = menuMapping.getChargeId();
                TUser user = DbUtils.getUserById(chargeId);
                if (user == null) {
                    return Result.error(new CodeMsg(6009, "找不到对应的负责人:" + chargeId));
                }
                activitiService.setUser(user);
                Flows flows = activitiService.start("process", null, maintain.getId(),  ActivitiBusinessType.maintain);
                assemble.setFlowId(flows.getId()).updateById();
                maintain.setFlowId(flows.getId());
                maintain.setDesp("启动汇集流程");
                maintain.updateById();
                masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), true);
            }else  {
                //直接运行
                Flows flows = new Flows().setStatus(ActivitiStatus.open).setBusinessId(maintain.getId()).setId(DbUtils.getUUID()).setCreateTime(new Date());
                flows.setBusinessType(ActivitiBusinessType.maintain);
                flows.insert();
                maintain.setFlowId(flows.getId());
                maintain.updateById();
                maintainService.dealFlow(maintain.getId(), ActivitiStatus.open);
                maintainService.dealFlow(maintain.getId(), flows.getStatus());
                log.info("flow-maintainService end");
                viewService.dealFlow(maintain.getId(), flows.getStatus());
                log.info("flow-viewService end");
                subscribeService.dealFlow(maintain.getId(), flows.getStatus());
                log.info("flow-subscribeService end");
                menuService.dealFlow(maintain.getId(), flows.getStatus(), flows.getUserId());
                log.info("flow-menuService end");
                masterModifiedService.dealAssemble(maintain.getId(), assemble.getUserId(), false);
            }
            return Result.success(null);
        }catch (Exception e) {
@@ -362,10 +467,9 @@
                    PreparedStatement updatedPreparedStatement = conn.prepareStatement(updatedSql);
                    int updateCnt = updatedPreparedStatement.executeUpdate();
                    cnt = updateCnt;
                    tableInfoMapper.insertMatintainDetailFromTemp(DbUtils.quotedStr(maintain.getId()), maintain.getTableName() + Constant.RECORD , DbUtils.quotedStr(Operate.update.toString()));
                }
            }
            tableInfoMapper.insertMatintainDetailFromTemp(DbUtils.quotedStr(maintain.getId()), maintain.getTableName() + Constant.RECORD , DbUtils.quotedStr(Operate.update.toString()));
            //tableInfoMapper.updateStdId( maintain.getTableName() + Constant.RECORD);
@@ -390,6 +494,7 @@
            }
        }
    }
    private Result temp2record(SysAssemble assemble) {
        Connection conn = null;
        try {
@@ -430,8 +535,10 @@
            int cnt = 0;
            if (updateType.equals(SysAssembleUpdateType.All)) {
                String sql = MessageFormat.format(transSql, recordTableName, recordFieldStr, fieldStr, tempTableName);
                XxlJobLogger.log("update sql:" + sql);
                PreparedStatement preparedStatement = conn.prepareStatement(sql);
                cnt = preparedStatement.executeUpdate();
            }else if(updateType.equals(SysAssembleUpdateType.Increment)) {
                String updateFields = assemble.getUpdateFields();
                String[] split = updateFields.split(Constant.SEMICOLON);
@@ -439,22 +546,26 @@
                String joinStr = getJoinFieldParse(unionCodeFields);
                // insert
                String insertFieldStr = fieldsFromTable.stream()
                        .map(s -> MessageFormat.format(Constant.Alias,Constant.T1,s))
                        .collect(Collectors.joining(Constant.COMMA));
                String insetSql = MessageFormat.format(insertSql, recordTableName, recordFieldStr,insertFieldStr, tempTableName, tableName, joinStr);
                log.info(insertSql);
                XxlJobLogger.log("update Increment  sql:" + insertSql);
                PreparedStatement preparedStatement = conn.prepareStatement(insetSql);
                int insertCnt = preparedStatement.executeUpdate();
                cnt += insertCnt;
            }
            XxlJobLogger.log("update sql cnt:" + cnt);
            assemble.setPreCnt(cnt);
            assemble.updateById();
            return Result.success(null);
        }
        catch (Exception e) {
            e.printStackTrace();
            return Result.error(new CodeMsg(6005, e.getMessage()));
        }
        finally {
@@ -486,13 +597,54 @@
        return builder.toString();
    }
    private Result checkTempData(String s,SysAssembleCheckType checkType, String id) {
        //TODO
        return Result.success(null);
    private Result checkTempData(SysAssemble assemble) {
        try {
            SysAssembleCheckType checkType = assemble.getCheckType();
            if (checkType == null) {
                return Result.error(new CodeMsg(6009, "规则校验类型为空"));
            }
            if (checkType.equals(SysAssembleCheckType.unlimited)) {
                return Result.success(null);
            }
            String menuId = assemble.getMenuId();
            MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
            String tableName = menuMapping.getTableName();
            String tempTableName = Constant.Temp + tableName;
            HashMap<String, Boolean> fieldResultSet = ruleClient.execuImmeForCollect(tempTableName, assemble.getUserId());
            switch (checkType) {
                case successAdd:
                    String unSuccessFields = fieldResultSet.keySet().stream().filter(s -> !fieldResultSet.get(s)).collect(Collectors.joining(Constant.COMMA));
                    if (fieldResultSet.keySet().contains(false)) {
                        return Result.error(new CodeMsg(6009, "规则校验不通过 字段:" + unSuccessFields));
                    }
                    break;
                case partSuccessAdd:
                    String checkFields = assemble.getCheckFields();
                    String[] split = checkFields.split(Constant.SEMICOLON);
                    for (String s : split) {
                        Boolean checked = fieldResultSet.get(s);
                        if (checked == null || !checked) {
                            return Result.error(new CodeMsg(6009, "规则校验不通过 字段:" + s));
                        }
                    }
                    break;
            }
            return Result.success(null);
        } catch (Exception e) {
            e.printStackTrace();
            return Result.error(new CodeMsg(3009, e.getMessage()));
        }
    }
    private Result purgeData(String purgeSql, String assembleId, boolean bigData) {
        Connection connection = null;
        Savepoint purge = null;
        try {
            DataSourceInfo dataSourceInfo = null;
            if (bigData) {
@@ -501,7 +653,8 @@
                dataSourceInfo = unBigDataDataSourceInfo;
            }
            connection = dataSourceInfo.conn();
            connection.setAutoCommit(false);
            purge = connection.setSavepoint("purge");
            List<SysAssembleParams> paramsList = paramsService.selectList(new EntityWrapper<SysAssembleParams>().eq("parent_id", assembleId));
            HashMap<String, String> paramsMap = new HashMap<>();
            Set<String> matcher = DbUtils.matcher(purgeSql);
@@ -532,14 +685,46 @@
                String preParams = MessageFormat.format(Constant.ParamsShell, key);
                purgeSql = purgeSql.replace(preParams, val);
            }
            XxlJobLogger.log("purgeSql:" +purgeSql);
            List<String> split = DbUtils.split(purgeSql, Constant.SEMICOLON);
            XxlJobLogger.log("split size:" + split.size());
            PreparedStatement preparedStatement = null;
            if (!split.isEmpty()) {
                for (String oneSql : split) {
                    XxlJobLogger.log("one sql:" + oneSql);
                    if (preparedStatement == null) {
                        preparedStatement = connection.prepareStatement(oneSql);
                        preparedStatement.addBatch(oneSql);
                    } else {
                        preparedStatement.addBatch(oneSql);
                    }
                }
            }
            int[] ints = preparedStatement.executeBatch();
            for (int i = 0; i < ints.length; i++) {
                XxlJobLogger.log("sqls count:" + ints[i]);
            }
            PreparedStatement preparedStatement = connection.prepareStatement(purgeSql);
            preparedStatement.execute();
            connection.commit();
            return Result.success(null);
        } catch (SQLException e) {
            e.printStackTrace();
            return Result.error(new CodeMsg(6004, "运行清洗sql出错," + e.getSQLState()));
            if (connection != null) {
                try {
                    if (purge != null) {
                        connection.rollback(purge);
                    } else {
                        connection.rollback();
                    }
                } catch (SQLException e1) {
                    e1.printStackTrace();
                    return Result.error(new CodeMsg(6004, "回滚sql出错," + e.getMessage()));
                }
            }
            return Result.error(new CodeMsg(6004, "运行清洗sql出错," + e.getMessage()));
        } finally {
            if (connection != null) {
                try {
@@ -558,19 +743,24 @@
    }
    private Result runByDb(SysAssemble assemble) {
        String id = assemble.getId();
        Boolean bigdata = assemble.getBigdata();
        try {
            String id = assemble.getId();
            Boolean bigdata = assemble.getBigdata();
        List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true));
        for (SysAssembleDb sysAssembleDb : dbList) {
            String dbId = sysAssembleDb.getId();
            Result result = loadOneDb(sysAssembleDb, bigdata, id);
            if (!result.getSuccess()) {
                assemble.setPreMsg("导入源数据错误:" + sysAssembleDb.getDatabaseName()).updateById();
                return result;
            List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id).eq(Constant.Active, true));
            for (SysAssembleDb sysAssembleDb : dbList) {
                String dbId = sysAssembleDb.getId();
                Result result = loadOneDb(sysAssembleDb, bigdata, id);
                XxlJobLogger.log("assemble db load one db:" + sysAssembleDb.getDatasourceUrl());
                if (!result.getSuccess()) {
                    assemble.setPreMsg("导入源数据错误:" + sysAssembleDb.getDatabaseName()).updateById();
                    return result;
                }
            }
            return Result.success(null);
        } catch (Exception e) {
            return Result.error(new CodeMsg(3009, e.getMessage()));
        }
        return Result.success(null);
    }
    private Result loadOneDb(SysAssembleDb sysAssembleDb, boolean bigData, String assembleId) {
@@ -579,14 +769,17 @@
        if (conn == null) {
            return Result.error(new CodeMsg(6002, MessageFormat.format("未能连接到源 id: {0}", dbId)));
        }
        Connection dataSourceConn = null;
        PreparedStatement statement = null;
        try {
            List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId).eq(Constant.Active, true));
            for (SysAssembleDbTable dbTable : tableList) {
                String tableId = dbTable.getId();
                String tempTableName = dbTable.getTempTableName();
                XxlJobLogger.log("assemble table" + tempTableName);
                List<SysAssembleDbField> fieldList = fieldService.selectList(new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId));
                String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
                XxlJobLogger.log("assemble fields" + fields);
                SysAssembleTableType type = dbTable.getType();
                String tableName = null;
                String assembleTempTableName = tempTableName;
@@ -595,6 +788,7 @@
                } else if (type.equals(SysAssembleTableType.table)) {
                    tableName = dbTable.getTableName();
                }
                XxlJobLogger.log("assemble source table:" + tableName);
                //TODO assembleTempTableName 可能会超长 64字节 后续修改
                DataSourceInfo dataSourceInfo = null;
                if (!bigData) {
@@ -604,36 +798,41 @@
                }
                // drop temp data
                dataSourceInfo.truncateData(tempTableName);
                boolean b = dataSourceInfo.truncateData(tempTableName);
                XxlJobLogger.log("assemble  truncateData :" + b);
                fixAssembleTempTable(dataSourceInfo, assembleTempTableName, fieldList, dbId);
                XxlJobLogger.log("assemble  fixAssembleTempTable :" );
                //checkAssembleTempTableExists(assembleTempTableName);
                //TODO 未分页
                String filter = dbTable.getFilter();
                Set<String> matcher = DbUtils.matcher(filter);
                for (String code : matcher) {
                    SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code));
                    if (sysAssembleParams == null){
                        return Result.error(new CodeMsg(6009, assembleId + "有变量未匹配到:"+ code));
                XxlJobLogger.log("assemble  raw  filter:" +  filter);
                if (!StringUtils.isEmpty(filter)) {
                    Set<String> matcher = DbUtils.matcher(filter);
                    for (String code : matcher) {
                        SysAssembleParams sysAssembleParams = paramsService.selectOne(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, assembleId).eq(Constant.Code, code));
                        if (sysAssembleParams == null){
                            return Result.error(new CodeMsg(6009, assembleId + "有变量未匹配到:"+ code));
                        }
                        String val = sysAssembleParams.getVal();
                        if (StringUtils.isEmpty(val)) {
                            return Result.error(new CodeMsg(6009, assembleId + "有变量未获取到值:"+ code));
                        }
                        val = DbUtils.quotedStr(val);
                        filter = filter.replace(DbUtils.assemblParam(code), val);
                    }
                    String val = sysAssembleParams.getVal();
                    if (StringUtils.isEmpty(val)) {
                        return Result.error(new CodeMsg(6009, assembleId + "有变量未获取到值:"+ code));
                    }
                    val = DbUtils.quotedStr(val);
                    filter = filter.replace(DbUtils.assemblParam(code), val);
                }
                if (StringUtils.isEmpty(filter)) {
                }else {
                    filter = Constant.WHERE_DEFAULT;
                }
                XxlJobLogger.log("assemble db filter:" + filter);
                String runSqlTemplate = null;
                if (type.equals(SysAssembleTableType.table)){
                    runSqlTemplate = Constant.selectFieldTableTemplate;
                }else if(type.equals(SysAssembleTableType.sql)){
                    runSqlTemplate = Constant.selectFieldSqlTemplate;
                }
                XxlJobLogger.log("assemble db  select sql template :" + runSqlTemplate);
                String sql = MessageFormat.format(runSqlTemplate, fields, tableName, filter);
                XxlJobLogger.log("assemble db select sql:" + sql);
                PreparedStatement ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
                ps.setFetchSize(Integer.MIN_VALUE);
                ps.setFetchDirection(ResultSet.FETCH_REVERSE);
@@ -641,28 +840,29 @@
                ResultSetMetaData metaData = resultSet.getMetaData();
                int columnCount = metaData.getColumnCount();
                String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fields);
                String insertMySqlSql = assembleSql(unBigDataDataSourceInfo.getDbName(), assembleTempTableName, fieldList);
                XxlJobLogger.log("assemble db insert sql:" + insertMySqlSql);
                int cnt = 0;
                StringBuilder builder = new StringBuilder();
                dataSourceConn = dataSourceInfo.conn();
                dataSourceConn.setAutoCommit(false);
                statement = dataSourceConn.prepareStatement(insertMySqlSql);
                int count = 0;
                while (resultSet.next()) {
                    count++;
                    for (int i = 1; i <= columnCount; i++) {
                        if (i == columnCount) {
                            builderEnd(builder, resultSet.getObject(i));
                        }else {
                            builderAppend(builder, resultSet.getObject(i));
                        }
                        statement.setString(i, resultSet.getString(i));
                    }
                    statement.addBatch();
                    cnt++;
                    if (cnt == 5000) {
                        runOneBatch(bigData, insertMySqlSql, builder);
                        statement.executeBatch();
                    }
                }
                if (builder.length() > 0) {
                    runOneBatch(bigData, insertMySqlSql, builder);
                }
                statement.executeBatch();
                XxlJobLogger.log("assemble db select sql count:" + count);
            }
            dataSourceConn.commit();
            return Result.success(null);
        }
        catch (Exception e) {
@@ -676,8 +876,60 @@
                    e.printStackTrace();
                }
            }
            if (statement != null) {
                try {
                    statement.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            if (dataSourceConn != null) {
                try {
                    dataSourceConn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private void setOneBatch(boolean bigData, String insertMySqlSql, List<List<String>> branchList) {
        Connection conn = null;
        try {
            DataSourceInfo dataSourceInfo = null;
            if (bigData) {
                dataSourceInfo = bigDataDataSourceInfo;
            }else {
                dataSourceInfo = unBigDataDataSourceInfo;
            }
            conn = dataSourceInfo.conn();
            PreparedStatement statement = conn.prepareStatement(insertMySqlSql);
            int result = 0;
            if (branchList.isEmpty()) {
                return;
            }
            for (List<String> stringList : branchList) {
                for (int i = 0; i < stringList.size(); i++) {
                    statement.setString(i + 1, stringList.get(i));
                }
                statement.addBatch();
            }
            statement.executeBatch();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (conn != null) {
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    private void runOneBatch(boolean bigData, String insertMySqlSql, StringBuilder builder) throws UnsupportedEncodingException, SQLException {
@@ -730,12 +982,17 @@
    public void builderAppend(StringBuilder builder, Object object) {
        builder.append(object);
        builder.append("\t");
        builder.append(",");
    }
    public String assembleSql(String dataBaseName, String tableName, String fields) {
        String sql = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE INTO TABLE " + dataBaseName + "." + tableName + "(" + fields + ")";
    public String assembleSql(String dataBaseName, String tableName, List<SysAssembleDbField> fieldList) {
        String fields = fieldList.stream().map(SysAssembleDbField::getField).collect(Collectors.joining(Constant.COMMA));
        ContentBuilder builder = new ContentBuilder(Constant.COMMA);
        for (int i = 0; i < fieldList.size(); i++) {
            builder.append(Constant.QUESTION);
        }
        String sql = "INSERT INTO " + dataBaseName + "." + tableName + "(" + fields + ") values(" + builder.toString() + ")";
        return sql;
    }
@@ -756,10 +1013,11 @@
            conn = dataSourceInfo.conn();
            PreparedStatement statement = conn.prepareStatement(sql);
            int result = 0;
            if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
                com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
                mysqlStatement.setLocalInfileInputStream(dataStream);
                result = mysqlStatement.executeUpdate();
            if (statement.isWrapperFor(java.sql.Statement.class)) {
//                com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
//                mysqlStatement.setLocalInfileInputStream(dataStream);
//               statement.
//                result = mysqlStatement.executeUpdate();
            }
            return result;
        }