package com.highdatas.mdm.service.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.highdatas.mdm.entity.*; import com.highdatas.mdm.mapper.TableInfoMapper; import com.highdatas.mdm.pojo.CodeMsg; import com.highdatas.mdm.pojo.DbAccessType; import com.highdatas.mdm.pojo.Result; import com.highdatas.mdm.pojo.SysAssembleLogStatus; import com.highdatas.mdm.pojo.kettle.KettleDBTrans; import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo; import com.highdatas.mdm.service.*; import com.highdatas.mdm.util.Constant; import com.highdatas.mdm.util.DbUtils; import org.apache.commons.lang3.StringUtils; import org.pentaho.di.core.KettleClientEnvironment; import org.pentaho.di.core.database.Database; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.LoggingObjectInterface; import org.pentaho.di.core.logging.LoggingObjectType; import org.pentaho.di.core.logging.SimpleLoggingObject; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import java.awt.*; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.text.MessageFormat; import java.util.Date; import java.util.List; import java.util.stream.Collectors; /** * @author kimi * @description * @date 2020-02-15 14:26 */ @Service public class TaskServixceImpl { @Autowired ISysAssembleService assembleService; @Autowired ISysAssembleDetailService assembleDetailService; @Autowired ISysAssembleLogService assembleLogService; @Autowired ISysDbtypeService dbtypeService; @Autowired IMenuMappingService menuMappingService; @Autowired UnBigDataDataSourceInfo unBigDataDataSourceInfo; @Autowired MasterDataService masterDataService; @Autowired TableInfoMapper infoMapper; @Override public Result run(String assembleId) { SysAssembleLog sysAssembleLog = new SysAssembleLog(); sysAssembleLog.setId(DbUtils.getUUID()); sysAssembleLog.setCreateTime(new Date()); sysAssembleLog.setAssembleId(assembleId); sysAssembleLog.setStatus(SysAssembleLogStatus.working); sysAssembleLog.insert(); if (StringUtils.isEmpty(assembleId)) { sysAssembleLog.setMessage("assembleId is null").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } SysAssembleLog runtimeLog = assembleLogService.selectOne(new EntityWrapper().eq("assemble_id", assembleId).orderBy("create_time desc")); if (runtimeLog != null) { if (runtimeLog.getStatus().equals(SysAssembleLogStatus.working)) { sysAssembleLog.setMessage("当前正在有同一任务在运行").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); return Result.error(new CodeMsg(3001, "当前正在有同一任务在运行")); } } KettleDBTrans kettleDBTrans = null; try { SysAssemble assemble = assembleService.selectById(assembleId); if (assemble == null) { //false sysAssembleLog.setMessage("assemble is not found").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } String sqlStr = "", tableName = ""; Database dsInput = null; Database dsOutput = null; TableInputMeta tableInputMeta = new TableInputMeta(); TableOutputMeta talbeOutputMeta = new TableOutputMeta(); TransMeta transMeta = new TransMeta(); kettleDBTrans = new KettleDBTrans(); kettleDBTrans.setAssemble(assemble); kettleDBTrans.setDsInput(dsInput); kettleDBTrans.setDsOutput(dsOutput); kettleDBTrans.setTableInputMeta(tableInputMeta); kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta); kettleDBTrans.setTransMeta(transMeta); boolean connected = kettleDBConn(kettleDBTrans); if (!connected){ //false sysAssembleLog.setMessage("connection failed").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); } String dbType = ""; // assemble.getDbType(); SysDbtype sysDbtype = dbtypeService.selectOne(new EntityWrapper().eq("name", dbType)); DbAccessType type = sysDbtype.getType(); switch (type) { case unRelational: break; case relational: extractDBData(kettleDBTrans, sysAssembleLog); } } catch (KettleException e) { e.printStackTrace(); sysAssembleLog.setMessage(e.getMessage()).setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); }finally { Database dsInput = kettleDBTrans.getDsInput(); if (dsInput != null) { dsInput.disconnect(); } Database dsOutput = kettleDBTrans.getDsOutput(); if (dsOutput != null) { dsOutput.disconnect(); } } return null; } private boolean kettleDBConn(KettleDBTrans kettleDBTrans) throws KettleException { SysAssemble assemble = kettleDBTrans.getAssemble(); //初始化kettle环境 KettleClientEnvironment.init(); //导入数据的数据库连接 DatabaseMeta dataMetaInput = null; // DatabaseMeta dataMetaInput = new DatabaseMeta("Input", assemble.getDbType(), "Native",assemble.getHost(), assemble.getSchema(),assemble.getPort().toString(), // assemble.getUserName(), assemble.getPassword()); Boolean bigdata = true; // /assemble.getBigdata(); DatabaseMeta dataMetaOutput = null; if (bigdata) { // 导入hbase数据库 }else { // 导入mysql数据库 //导出数据的数据库连接 dataMetaOutput = new DatabaseMeta("Output", unBigDataDataSourceInfo.getDbType(), "Native", unBigDataDataSourceInfo.getDbHostName(), unBigDataDataSourceInfo.getDbName(), unBigDataDataSourceInfo.getDbPort(), unBigDataDataSourceInfo.getUsername(), unBigDataDataSourceInfo.getPassword()); } if (dataMetaOutput == null) { return false; } //导出数据库连接 LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput)); kettleDBTrans.getDsInput().connect(); Connection connInput = kettleDBTrans.getDsInput().getConnection(); //导入数据库连接 kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput)); kettleDBTrans.getDsOutput().connect(); Connection connOutput = kettleDBTrans.getDsOutput().getConnection(); kettleDBTrans.getTransMeta().setName("数据抽取"); kettleDBTrans.getTransMeta().addDatabase(dataMetaInput); kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput); //导出数据 kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input")); //导入数据 kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output")); return true; } private void extractDBData(KettleDBTrans dbTrans, SysAssembleLog sysAssembleLog) { try { SysAssemble assemble = dbTrans.getAssemble(); boolean checked = checkField(dbTrans); if (!checked){ //false log sysAssembleLog.setMessage("source target field is not matched").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); return; } List detailList = assembleDetailService.selectList(new EntityWrapper().eq("assemble_id", assemble.getId())); List fieldList = detailList.stream().map(sysAssembleDetail -> sysAssembleDetail.getField()).collect(Collectors.toList()); if (fieldList.contains(Constant.ID)) { fieldList.remove(Constant.ID); fieldList.add(MessageFormat.format(Constant.FieldAsAlias, Constant.ID, Constant.STD_ID)); if (!assemble.getBigdata()){ //MYSQL fieldList.add(MessageFormat.format(Constant.MYSQL_UUID, Constant.ID)); } //fieldList = fieldList.stream().map(s -> ( s.equalsIgnoreCase(Constant.ID) ? MessageFormat.format(Constant.FieldAsAlias,Constant.ID, Constant.STD_ID) : s)) }else { if (!assemble.getBigdata()) { fieldList.add(MessageFormat.format(Constant.MYSQL_UUID, Constant.STD_ID)); fieldList.add(MessageFormat.format(Constant.MYSQL_UUID, Constant.ID)); } } String fieldStr = fieldList.stream().collect(Collectors.joining(Constant.COMMA)); String countSqlStr = MessageFormat.format(Constant.selectFieldSqlTemplate, fieldStr, assemble.getTableName()); String selectSqlStr = MessageFormat.format(Constant.selectFieldSqlTemplate, fieldStr, assemble.getTableName()); dbTrans.getTableInputMeta().setSQL(selectSqlStr); Database dsInput = dbTrans.getDsInput(); Connection inputConnection = dsInput.getConnection(); PreparedStatement cntStmt = inputConnection.prepareStatement(countSqlStr); cntStmt.executeQuery(); ResultSet cntResultSet = cntStmt.executeQuery(); Integer sorceCnt = cntResultSet.getInt(Constant.CNT); // StepMeta StepMeta inputStep = new StepMeta(Constant.STEP_READ_FROM_TABLE, dbTrans.getTableInputMeta()); inputStep.setLocation(50, 50); inputStep.setDraw(true); dbTrans.getTransMeta().addStep(inputStep); String trmpTableName = getTempTableNameFromMenu(assemble.getMenuId()); dbTrans.getTalbeOutputMeta().setTableName(trmpTableName); StepMeta insertUpdateStep = new StepMeta(Constant.STEP_INSERT_UPDATE, dbTrans.getTalbeOutputMeta()); insertUpdateStep.setLocation(150, 50); insertUpdateStep.setDraw(true); dbTrans.getTransMeta().addStep(insertUpdateStep); DummyTransMeta dummyMeta = new DummyTransMeta(); StepMeta dummyStep = new StepMeta(Constant.STEP_DUMMY, dummyMeta); dummyStep.setLocation(200, 50); dummyStep.setDraw(true); dbTrans.getTransMeta().addStep(dummyStep); //设置步骤直接的关系 TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep); dbTrans.getTransMeta().addTransHop(hop); TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); dbTrans.getTransMeta().addTransHop(hop2); //开始执行 Trans trans = new Trans(dbTrans.getTransMeta()); trans.prepareExecution(null); trans.startThreads(); trans.waitUntilFinished(); int errors = trans.getErrors(); if (errors == 0) { //搬运相关数据 if (!assemble.getBigdata()){ masterDataService.uploadedData(assemble.getTableName(), "All", assemble.getUserId()); } else { // TODO: 2020/2/16 hbase 搬运数据 } sysAssembleLog.setCnt(sorceCnt).setMessage("汇集成功").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.success).updateById(); }else { sysAssembleLog.setMessage("汇集搬运过程中发生问题").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); } boolean hasTable = dbTrans.getDsOutput().checkTableExists(assemble.getTableName()); if (hasTable) { //TODO 全量还是增量 } else { //创建表相关 目前禁用 log sysAssembleLog.setMessage("当前系统中不存在相关表数据,请先初始化表结构等基础主数据结构信息").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById(); } } catch (Exception e) { e.printStackTrace(); } finally { Database dsOutput = dbTrans.getDsOutput(); if (dsOutput!= null) { dsOutput.disconnect(); } Database dsInput = dbTrans.getDsInput(); if (dsInput != null) { dsInput.disconnect(); } } } private String getTempTableNameFromMenu(String menuId) { MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", menuId)); String tableName = menuMapping.getTableName(); return tableName + Constant.RECORD; } private boolean checkField(KettleDBTrans dbTrans) { SysAssemble assemble = dbTrans.getAssemble(); boolean checked = true; Connection conn = null; Database ds = null; try { //1 check 源库中是否有相关字段 List detailList = assembleDetailService.selectList(new EntityWrapper().eq("assemble_id", assemble.getId())); String fieldStr = detailList.stream().map(sysAssembleDetail -> sysAssembleDetail.getField()).collect(Collectors.joining(Constant.COMMA)); List fieldList = detailList.stream().map(sysAssembleDetail -> sysAssembleDetail.getField()).collect(Collectors.toList()); String checkSqlStr = MessageFormat.format(Constant.checkFieldSqlTemplate, fieldStr, assemble.getTableName()); //连接数据库 KettleClientEnvironment.init(); DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep",assemble.getDbType(), "Native", assemble.getHost(),assemble.getSchema(),assemble.getPort().toString(),assemble.getUserName(),assemble.getPassword()); LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); ds = new Database(loggingObject, dataMeta); ds.connect(); conn = ds.getConnection(); PreparedStatement stmt = conn.prepareStatement(checkSqlStr); stmt.executeQuery(); ds.disconnect(); ds = null; //2 验证 目标库中是否有相关字段 Boolean bigdata = assemble.getBigdata(); if (bigdata) { //TODO hbase } else { boolean exists = dbTrans.getDsOutput().checkTableExists(assemble.getTableName()); if (exists) { List tableFieldFromDb = infoMapper.getTableFieldFromDb(assemble.getTableName(), assemble.getSchema()); List targetFieldList = tableFieldFromDb.stream().map(tableSchemaResult -> tableSchemaResult.getFieldName()).collect(Collectors.toList()); long count = fieldList.stream().filter(sourceField -> !targetFieldList.contains(sourceField)).count(); if (count > 0) { //TODO 有部分字段未能匹配上 且是mysql数据库 暂处理为不导入 log checked = false; } } } checked = true; } catch(Exception e) { e.printStackTrace(); checked = false; }finally { if (ds != null) { ds.disconnect(); } if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); checked = false; } } } return checked; } @Override public Result updateTask(String assembleId) { return null; } private Database getDataBaseByKettle(SysAssemble assemble) throws KettleException { Database ds = null; KettleClientEnvironment.init(); DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", assemble.getDbType(), "Native",assemble.getHost(), assemble.getSchema(), assemble.getPort().toString(), assemble.getUserName(), assemble.getPassword()); LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); ds = new Database(loggingObject, dataMeta); return ds; } @Override public boolean connectionDb(SysAssemble assemble) { Connection conn = null; Database ds = null; boolean connected = false; try { ds = getDataBaseByKettle(assemble); ds.normalConnect(null); conn = ds.getConnection(); if (conn == null) { connected = false; }else { connected = true; } ds.disconnect(); ds = null; } catch (KettleException e) { e.printStackTrace(); connected = false; } finally { if (ds != null) { ds.disconnect(); } try { if (conn != null) { conn.close(); } } catch (SQLException e) { e.printStackTrace(); } } return connected; } @Override public boolean sqlTest(SysAssemble assemble, String sqlStr) { Connection conn = null; Database ds = null; boolean connected = false; try { ds = getDataBaseByKettle(assemble); ds.normalConnect(null); conn = ds.getConnection(); PreparedStatement stmt = conn.prepareStatement(sqlStr); stmt.executeQuery(); ds.disconnect(); ds = null; connected = true; } catch (Exception e) { e.printStackTrace(); connected = false; } finally { if (ds != null) { ds.disconnect(); } try { if (conn != null) { conn.close(); } } catch (SQLException e) { e.printStackTrace(); } } return connected; } }