package com.highdatas.mdm.service.impl;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.baomidou.mybatisplus.mapper.EntityWrapper;
|
import com.highdatas.mdm.entity.*;
|
import com.highdatas.mdm.mapper.TableInfoMapper;
|
import com.highdatas.mdm.pojo.CodeMsg;
|
import com.highdatas.mdm.pojo.DbAccessType;
|
import com.highdatas.mdm.pojo.Result;
|
import com.highdatas.mdm.pojo.SysAssembleLogStatus;
|
import com.highdatas.mdm.pojo.kettle.KettleDBTrans;
|
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
|
import com.highdatas.mdm.service.*;
|
import com.highdatas.mdm.util.Constant;
|
import com.highdatas.mdm.util.DbUtils;
|
import org.apache.commons.lang3.StringUtils;
|
import org.pentaho.di.core.KettleClientEnvironment;
|
import org.pentaho.di.core.database.Database;
|
import org.pentaho.di.core.database.DatabaseMeta;
|
import org.pentaho.di.core.exception.KettleException;
|
import org.pentaho.di.core.logging.LoggingObjectInterface;
|
import org.pentaho.di.core.logging.LoggingObjectType;
|
import org.pentaho.di.core.logging.SimpleLoggingObject;
|
import org.pentaho.di.trans.Trans;
|
import org.pentaho.di.trans.TransHopMeta;
|
import org.pentaho.di.trans.TransMeta;
|
import org.pentaho.di.trans.step.StepMeta;
|
import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta;
|
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
|
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.stereotype.Service;
|
|
import java.awt.*;
|
import java.sql.Connection;
|
import java.sql.PreparedStatement;
|
import java.sql.ResultSet;
|
import java.sql.SQLException;
|
import java.text.MessageFormat;
|
import java.util.Date;
|
import java.util.List;
|
import java.util.stream.Collectors;
|
|
/**
|
* @author kimi
|
* @description
|
* @date 2020-02-15 14:26
|
*/
|
|
@Service
|
public class TaskServixceImpl {
|
@Autowired
|
ISysAssembleService assembleService;
|
@Autowired
|
ISysAssembleDetailService assembleDetailService;
|
@Autowired
|
ISysAssembleLogService assembleLogService;
|
@Autowired
|
ISysDbtypeService dbtypeService;
|
@Autowired
|
IMenuMappingService menuMappingService;
|
@Autowired
|
UnBigDataDataSourceInfo unBigDataDataSourceInfo;
|
@Autowired
|
MasterDataService masterDataService;
|
@Autowired
|
TableInfoMapper infoMapper;
|
|
@Override
|
public Result run(String assembleId) {
|
SysAssembleLog sysAssembleLog = new SysAssembleLog();
|
sysAssembleLog.setId(DbUtils.getUUID());
|
sysAssembleLog.setCreateTime(new Date());
|
sysAssembleLog.setAssembleId(assembleId);
|
sysAssembleLog.setStatus(SysAssembleLogStatus.working);
|
sysAssembleLog.insert();
|
|
|
if (StringUtils.isEmpty(assembleId)) {
|
sysAssembleLog.setMessage("assembleId is null").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
|
SysAssembleLog runtimeLog = assembleLogService.selectOne(new EntityWrapper<SysAssembleLog>().eq("assemble_id", assembleId).orderBy("create_time desc"));
|
if (runtimeLog != null) {
|
if (runtimeLog.getStatus().equals(SysAssembleLogStatus.working)) {
|
sysAssembleLog.setMessage("当前正在有同一任务在运行").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
return Result.error(new CodeMsg(3001, "当前正在有同一任务在运行"));
|
}
|
}
|
|
KettleDBTrans kettleDBTrans = null;
|
try {
|
SysAssemble assemble = assembleService.selectById(assembleId);
|
if (assemble == null) {
|
//false
|
sysAssembleLog.setMessage("assemble is not found").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
String sqlStr = "", tableName = "";
|
|
Database dsInput = null;
|
Database dsOutput = null;
|
TableInputMeta tableInputMeta = new TableInputMeta();
|
TableOutputMeta talbeOutputMeta = new TableOutputMeta();
|
TransMeta transMeta = new TransMeta();
|
|
kettleDBTrans = new KettleDBTrans();
|
kettleDBTrans.setAssemble(assemble);
|
kettleDBTrans.setDsInput(dsInput);
|
kettleDBTrans.setDsOutput(dsOutput);
|
kettleDBTrans.setTableInputMeta(tableInputMeta);
|
kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
|
kettleDBTrans.setTransMeta(transMeta);
|
|
boolean connected = kettleDBConn(kettleDBTrans);
|
if (!connected){
|
//false
|
sysAssembleLog.setMessage("connection failed").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
}
|
String dbType = "";
|
// assemble.getDbType();
|
SysDbtype sysDbtype = dbtypeService.selectOne(new EntityWrapper<SysDbtype>().eq("name", dbType));
|
DbAccessType type = sysDbtype.getType();
|
switch (type) {
|
case unRelational:
|
break;
|
case relational:
|
extractDBData(kettleDBTrans, sysAssembleLog);
|
|
}
|
} catch (KettleException e) {
|
e.printStackTrace();
|
sysAssembleLog.setMessage(e.getMessage()).setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
}finally {
|
Database dsInput = kettleDBTrans.getDsInput();
|
if (dsInput != null) {
|
dsInput.disconnect();
|
}
|
Database dsOutput = kettleDBTrans.getDsOutput();
|
if (dsOutput != null) {
|
dsOutput.disconnect();
|
}
|
|
}
|
return null;
|
}
|
|
private boolean kettleDBConn(KettleDBTrans kettleDBTrans) throws KettleException {
|
SysAssemble assemble = kettleDBTrans.getAssemble();
|
|
//初始化kettle环境
|
KettleClientEnvironment.init();
|
//导入数据的数据库连接
|
DatabaseMeta dataMetaInput = null;
|
// DatabaseMeta dataMetaInput = new DatabaseMeta("Input", assemble.getDbType(), "Native",assemble.getHost(), assemble.getSchema(),assemble.getPort().toString(),
|
// assemble.getUserName(), assemble.getPassword());
|
|
Boolean bigdata = true;
|
// /assemble.getBigdata();
|
DatabaseMeta dataMetaOutput = null;
|
if (bigdata) {
|
// 导入hbase数据库
|
}else {
|
// 导入mysql数据库
|
//导出数据的数据库连接
|
dataMetaOutput = new DatabaseMeta("Output", unBigDataDataSourceInfo.getDbType(), "Native", unBigDataDataSourceInfo.getDbHostName(), unBigDataDataSourceInfo.getDbName(), unBigDataDataSourceInfo.getDbPort(),
|
unBigDataDataSourceInfo.getUsername(), unBigDataDataSourceInfo.getPassword());
|
}
|
|
if (dataMetaOutput == null) {
|
return false;
|
|
}
|
|
//导出数据库连接
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput));
|
kettleDBTrans.getDsInput().connect();
|
Connection connInput = kettleDBTrans.getDsInput().getConnection();
|
//导入数据库连接
|
kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput));
|
kettleDBTrans.getDsOutput().connect();
|
Connection connOutput = kettleDBTrans.getDsOutput().getConnection();
|
|
kettleDBTrans.getTransMeta().setName("数据抽取");
|
|
kettleDBTrans.getTransMeta().addDatabase(dataMetaInput);
|
kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput);
|
|
//导出数据
|
kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input"));
|
//导入数据
|
kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output"));
|
return true;
|
}
|
|
private void extractDBData(KettleDBTrans dbTrans, SysAssembleLog sysAssembleLog) {
|
try {
|
SysAssemble assemble = dbTrans.getAssemble();
|
boolean checked = checkField(dbTrans);
|
if (!checked){
|
//false log
|
sysAssembleLog.setMessage("source target field is not matched").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
return;
|
}
|
|
List<SysAssembleDetail> detailList = assembleDetailService.selectList(new EntityWrapper<SysAssembleDetail>().eq("assemble_id", assemble.getId()));
|
List<String> fieldList = detailList.stream().map(sysAssembleDetail -> sysAssembleDetail.getField()).collect(Collectors.toList());
|
if (fieldList.contains(Constant.ID)) {
|
fieldList.remove(Constant.ID);
|
fieldList.add(MessageFormat.format(Constant.FieldAsAlias, Constant.ID, Constant.STD_ID));
|
if (!assemble.getBigdata()){
|
//MYSQL
|
fieldList.add(MessageFormat.format(Constant.MYSQL_UUID, Constant.ID));
|
}
|
//fieldList = fieldList.stream().map(s -> ( s.equalsIgnoreCase(Constant.ID) ? MessageFormat.format(Constant.FieldAsAlias,Constant.ID, Constant.STD_ID) : s))
|
}else {
|
if (!assemble.getBigdata()) {
|
fieldList.add(MessageFormat.format(Constant.MYSQL_UUID, Constant.STD_ID));
|
fieldList.add(MessageFormat.format(Constant.MYSQL_UUID, Constant.ID));
|
}
|
}
|
|
String fieldStr = fieldList.stream().collect(Collectors.joining(Constant.COMMA));
|
String countSqlStr = MessageFormat.format(Constant.selectFieldSqlTemplate, fieldStr, assemble.getTableName());
|
String selectSqlStr = MessageFormat.format(Constant.selectFieldSqlTemplate, fieldStr, assemble.getTableName());
|
dbTrans.getTableInputMeta().setSQL(selectSqlStr);
|
Database dsInput = dbTrans.getDsInput();
|
Connection inputConnection = dsInput.getConnection();
|
PreparedStatement cntStmt = inputConnection.prepareStatement(countSqlStr);
|
cntStmt.executeQuery();
|
ResultSet cntResultSet = cntStmt.executeQuery();
|
Integer sorceCnt = cntResultSet.getInt(Constant.CNT);
|
|
// StepMeta
|
StepMeta inputStep = new StepMeta(Constant.STEP_READ_FROM_TABLE, dbTrans.getTableInputMeta());
|
inputStep.setLocation(50, 50);
|
inputStep.setDraw(true);
|
dbTrans.getTransMeta().addStep(inputStep);
|
|
String trmpTableName = getTempTableNameFromMenu(assemble.getMenuId());
|
|
dbTrans.getTalbeOutputMeta().setTableName(trmpTableName);
|
StepMeta insertUpdateStep = new StepMeta(Constant.STEP_INSERT_UPDATE, dbTrans.getTalbeOutputMeta());
|
insertUpdateStep.setLocation(150, 50);
|
insertUpdateStep.setDraw(true);
|
dbTrans.getTransMeta().addStep(insertUpdateStep);
|
|
DummyTransMeta dummyMeta = new DummyTransMeta();
|
|
StepMeta dummyStep = new StepMeta(Constant.STEP_DUMMY, dummyMeta);
|
dummyStep.setLocation(200, 50);
|
dummyStep.setDraw(true);
|
|
dbTrans.getTransMeta().addStep(dummyStep);
|
|
//设置步骤直接的关系
|
TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep);
|
dbTrans.getTransMeta().addTransHop(hop);
|
TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
dbTrans.getTransMeta().addTransHop(hop2);
|
|
//开始执行
|
Trans trans = new Trans(dbTrans.getTransMeta());
|
trans.prepareExecution(null);
|
trans.startThreads();
|
trans.waitUntilFinished();
|
|
int errors = trans.getErrors();
|
if (errors == 0) {
|
//搬运相关数据
|
if (!assemble.getBigdata()){
|
masterDataService.uploadedData(assemble.getTableName(), "All", assemble.getUserId());
|
}
|
else {
|
// TODO: 2020/2/16 hbase 搬运数据
|
}
|
sysAssembleLog.setCnt(sorceCnt).setMessage("汇集成功").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.success).updateById();
|
}else {
|
sysAssembleLog.setMessage("汇集搬运过程中发生问题").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
}
|
boolean hasTable = dbTrans.getDsOutput().checkTableExists(assemble.getTableName());
|
if (hasTable) {
|
//TODO 全量还是增量
|
}
|
else {
|
//创建表相关 目前禁用 log
|
sysAssembleLog.setMessage("当前系统中不存在相关表数据,请先初始化表结构等基础主数据结构信息").setUpdateTime(new Date()).setStatus(SysAssembleLogStatus.defeat).updateById();
|
}
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
finally {
|
Database dsOutput = dbTrans.getDsOutput();
|
if (dsOutput!= null) {
|
dsOutput.disconnect();
|
}
|
Database dsInput = dbTrans.getDsInput();
|
if (dsInput != null) {
|
dsInput.disconnect();
|
}
|
}
|
}
|
|
private String getTempTableNameFromMenu(String menuId) {
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", menuId));
|
String tableName = menuMapping.getTableName();
|
return tableName + Constant.RECORD;
|
}
|
|
private boolean checkField(KettleDBTrans dbTrans) {
|
SysAssemble assemble = dbTrans.getAssemble();
|
boolean checked = true;
|
Connection conn = null;
|
Database ds = null;
|
try {
|
//1 check 源库中是否有相关字段
|
List<SysAssembleDetail> detailList = assembleDetailService.selectList(new EntityWrapper<SysAssembleDetail>().eq("assemble_id", assemble.getId()));
|
String fieldStr = detailList.stream().map(sysAssembleDetail -> sysAssembleDetail.getField()).collect(Collectors.joining(Constant.COMMA));
|
List<String> fieldList = detailList.stream().map(sysAssembleDetail -> sysAssembleDetail.getField()).collect(Collectors.toList());
|
String checkSqlStr = MessageFormat.format(Constant.checkFieldSqlTemplate, fieldStr, assemble.getTableName());
|
//连接数据库
|
KettleClientEnvironment.init();
|
DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep",assemble.getDbType(), "Native", assemble.getHost(),assemble.getSchema(),assemble.getPort().toString(),assemble.getUserName(),assemble.getPassword());
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
ds = new Database(loggingObject, dataMeta);
|
ds.connect();
|
conn = ds.getConnection();
|
PreparedStatement stmt = conn.prepareStatement(checkSqlStr);
|
stmt.executeQuery();
|
ds.disconnect();
|
ds = null;
|
//2 验证 目标库中是否有相关字段
|
Boolean bigdata = assemble.getBigdata();
|
if (bigdata) {
|
//TODO hbase
|
}
|
else {
|
boolean exists = dbTrans.getDsOutput().checkTableExists(assemble.getTableName());
|
if (exists) {
|
List<TableSchemaResult> tableFieldFromDb = infoMapper.getTableFieldFromDb(assemble.getTableName(), assemble.getSchema());
|
List<String> targetFieldList = tableFieldFromDb.stream().map(tableSchemaResult -> tableSchemaResult.getFieldName()).collect(Collectors.toList());
|
long count = fieldList.stream().filter(sourceField -> !targetFieldList.contains(sourceField)).count();
|
if (count > 0) {
|
//TODO 有部分字段未能匹配上 且是mysql数据库 暂处理为不导入 log
|
checked = false;
|
}
|
}
|
|
}
|
checked = true;
|
} catch(Exception e) {
|
e.printStackTrace();
|
checked = false;
|
}finally {
|
if (ds != null) {
|
ds.disconnect();
|
}
|
if (conn != null) {
|
try {
|
conn.close();
|
} catch (SQLException e) {
|
e.printStackTrace();
|
checked = false;
|
}
|
}
|
}
|
return checked;
|
}
|
|
|
@Override
|
public Result updateTask(String assembleId) {
|
return null;
|
}
|
|
private Database getDataBaseByKettle(SysAssemble assemble) throws KettleException {
|
Database ds = null;
|
KettleClientEnvironment.init();
|
DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", assemble.getDbType(), "Native",assemble.getHost(), assemble.getSchema(), assemble.getPort().toString(),
|
assemble.getUserName(), assemble.getPassword());
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
ds = new Database(loggingObject, dataMeta);
|
|
return ds;
|
}
|
@Override
|
public boolean connectionDb(SysAssemble assemble) {
|
Connection conn = null;
|
Database ds = null;
|
boolean connected = false;
|
try {
|
ds = getDataBaseByKettle(assemble);
|
ds.normalConnect(null);
|
conn = ds.getConnection();
|
|
if (conn == null) {
|
connected = false;
|
}else {
|
connected = true;
|
}
|
|
ds.disconnect();
|
ds = null;
|
} catch (KettleException e) {
|
e.printStackTrace();
|
connected = false;
|
}
|
finally {
|
if (ds != null) {
|
ds.disconnect();
|
}
|
|
try {
|
if (conn != null) {
|
conn.close();
|
}
|
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
return connected;
|
}
|
|
@Override
|
public boolean sqlTest(SysAssemble assemble, String sqlStr) {
|
Connection conn = null;
|
Database ds = null;
|
boolean connected = false;
|
try {
|
ds = getDataBaseByKettle(assemble);
|
ds.normalConnect(null);
|
conn = ds.getConnection();
|
PreparedStatement stmt = conn.prepareStatement(sqlStr);
|
stmt.executeQuery();
|
|
ds.disconnect();
|
ds = null;
|
connected = true;
|
} catch (Exception e) {
|
e.printStackTrace();
|
connected = false;
|
}
|
finally {
|
if (ds != null) {
|
ds.disconnect();
|
}
|
|
try {
|
if (conn != null) {
|
conn.close();
|
}
|
|
} catch (SQLException e) {
|
e.printStackTrace();
|
}
|
}
|
return connected;
|
}
|
}
|