package com.highdatas.mdm.controller;
|
|
import com.alibaba.druid.pool.DruidPooledConnection;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
|
import com.highdatas.mdm.entity.TUser;
|
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
|
import com.highdatas.mdm.pojo.kettle.KettleDBTrans;
|
import com.highdatas.mdm.pojo.kettle.TableColumn;
|
import org.pentaho.di.core.KettleClientEnvironment;
|
import org.pentaho.di.core.database.Database;
|
import org.pentaho.di.core.database.DatabaseMeta;
|
import org.pentaho.di.core.logging.LoggingObjectInterface;
|
import org.pentaho.di.core.logging.LoggingObjectType;
|
import org.pentaho.di.core.logging.SimpleLoggingObject;
|
import org.pentaho.di.core.row.RowMetaInterface;
|
import org.pentaho.di.trans.Trans;
|
import org.pentaho.di.trans.TransHopMeta;
|
import org.pentaho.di.trans.TransMeta;
|
import org.pentaho.di.trans.step.StepMeta;
|
import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta;
|
|
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
|
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Controller;
|
import org.springframework.web.bind.annotation.RequestBody;
|
import org.springframework.web.bind.annotation.RequestMapping;
|
import org.springframework.web.bind.annotation.ResponseBody;
|
|
import javax.servlet.http.HttpServletResponse;
|
import java.sql.Connection;
|
import java.sql.PreparedStatement;
|
import java.sql.ResultSet;
|
import java.sql.ResultSetMetaData;
|
import java.text.SimpleDateFormat;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.List;
|
|
@Controller
|
@RequestMapping("/database")
|
//http://localhost:8080/users/addUser
|
public class SettleController {
|
|
|
@Autowired
|
private UnBigDataDataSourceInfo dataSourceConn;
|
|
private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
|
private String STEP_READ_FROM_TABLE = "Read data from table";
|
private String STEP_INSERT_UPDATE = "Insert or update";
|
private String STEP_DUMMY = "Dummy";
|
|
|
/**
|
* 提取mongodb数据
|
* @param response
|
* @param transData
|
* @return
|
* @throws
|
*/
|
// @RequestMapping("/drawMongoDB")
|
// @ResponseBody
|
// public String drawMongoDB(HttpServletResponse response, @RequestBody JSONObject transData) {
|
// String returnStr = "";
|
// try {
|
// KettleClientEnvironment.init();
|
//
|
// TransMeta transMeta = new TransMeta();
|
// transMeta.setName("抽取mongodb数据");
|
// //导出数据的数据库连接
|
// DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
|
// dataSourceConn.getUsername(), dataSourceConn.getPassword());
|
// transMeta.addDatabase(dataBaseOutput);
|
//
|
// MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta();
|
// mongoDbInputMeta.setHostnames(transData.getString("hostName"));
|
// mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName"));
|
// mongoDbInputMeta.setPort(transData.getString("portNum"));
|
// mongoDbInputMeta.setAuthenticationUser(transData.getString("userName"));
|
// mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd"));
|
// mongoDbInputMeta.setDbName(transData.getString("dbName"));
|
// mongoDbInputMeta.setCollection(transData.getString("collection"));
|
// //得到抽取字段
|
// ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length);
|
// Collections.addAll(fieldList, transData.getString("drawfield").split(","));
|
// //设置不取JSON字段
|
// mongoDbInputMeta.setOutputJson(false);
|
// //设置提取字段名称、path、type
|
// List<MongoField> normalList = new ArrayList<MongoField>();
|
// for (String fieldStr : fieldList) {
|
// MongoField newField = new MongoField();
|
// newField.m_fieldName = fieldStr;
|
// newField.m_fieldPath = "$." + fieldStr;
|
// newField.m_kettleType = "String";
|
// normalList.add(newField);
|
// }
|
// //设置mongodb提取字段
|
// mongoDbInputMeta.setMongoFields(normalList);
|
// //设置mogodb步骤元
|
// StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta);
|
// inputMongoDBStep.setLocation(50, 50);
|
// inputMongoDBStep.setDraw(true);
|
// //将mogodb步骤元加入转化中
|
// transMeta.addStep(inputMongoDBStep);
|
//
|
// //设置mysql元
|
// TableOutputMeta tableOutputMeta = new TableOutputMeta();
|
// //设置数据库元
|
// tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output"));
|
// //mongodb中数据库表/集合,就是表名
|
// tableOutputMeta.setTableName(transData.getString("collection"));
|
// //将mysql元加入步骤元
|
// StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta);
|
// insertUpdateStep.setLocation(150, 50);
|
// insertUpdateStep.setDraw(true);
|
//
|
// transMeta.addStep(insertUpdateStep);
|
//
|
// //增加个空元,
|
// DummyTransMeta dummyMeta = new DummyTransMeta();
|
//
|
// //将空元加入步骤元
|
// StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta);
|
// dummyStep.setLocation(200, 50);
|
// dummyStep.setDraw(true);
|
//
|
// transMeta.addStep(dummyStep);
|
//
|
// //设置步骤直接的关系
|
// TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep);
|
// transMeta.addTransHop(hop);
|
// TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
// transMeta.addTransHop(hop2);
|
//
|
// //开始执行数据抽取
|
// //将转化元实例化转换
|
// Trans trans = new Trans(transMeta);
|
// trans.prepareExecution(null);
|
//
|
// trans.startThreads();
|
// trans.waitUntilFinished();
|
//
|
// if (trans.getErrors() > 0) {
|
// System.out.println(">>>>>>>>>> ERROR");
|
// returnStr = "{result:\"fail\"}";
|
// }
|
// else {
|
// System.out.println(">>>>>>>>>> SUCCESS ");
|
// returnStr = "{result:\"success\"}";
|
// }
|
// } catch(Exception e) {
|
// return "{result:\"fail\"}";
|
// }
|
// return returnStr;
|
// }
|
//
|
// /**
|
// * 提取mongodb数据
|
// * @param response
|
// * @param transData
|
// * @return
|
// * @throws
|
// */
|
// @RequestMapping("/drawHbase")
|
// @ResponseBody
|
// public String drawHbase(HttpServletResponse response, @RequestBody JSONObject transData) {
|
// String returnStr = "";
|
// try {
|
// KettleClientEnvironment.init();
|
//
|
// TransMeta transMeta = new TransMeta();
|
// transMeta.setName("抽取Hbase数据");
|
// //导出数据的数据库连接
|
// DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
|
// dataSourceConn.getUsername(), dataSourceConn.getPassword());
|
// transMeta.addDatabase(dataBaseOutput);
|
// //HBaseInputMeta hBaseInputMeta= new HBaseInputMeta()
|
//
|
// MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta();
|
// mongoDbInputMeta.setHostnames(transData.getString("hostName"));
|
// mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName"));
|
// mongoDbInputMeta.setPort(transData.getString("portNum"));
|
// mongoDbInputMeta.setAuthenticationUser(transData.getString("userName"));
|
// mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd"));
|
// mongoDbInputMeta.setDbName(transData.getString("dbName"));
|
// mongoDbInputMeta.setCollection(transData.getString("collection"));
|
// //得到抽取字段
|
// ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length);
|
// Collections.addAll(fieldList, transData.getString("drawfield").split(","));
|
// //设置不取JSON字段
|
// mongoDbInputMeta.setOutputJson(false);
|
// //设置提取字段名称、path、type
|
// List<MongoField> normalList = new ArrayList<MongoField>();
|
// for (String fieldStr : fieldList) {
|
// MongoField newField = new MongoField();
|
// newField.m_fieldName = fieldStr;
|
// newField.m_fieldPath = "$." + fieldStr;
|
// newField.m_kettleType = "String";
|
// normalList.add(newField);
|
// }
|
// //设置mongodb提取字段
|
// mongoDbInputMeta.setMongoFields(normalList);
|
// //设置mogodb步骤元
|
// StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta);
|
// inputMongoDBStep.setLocation(50, 50);
|
// inputMongoDBStep.setDraw(true);
|
// //将mogodb步骤元加入转化中
|
// transMeta.addStep(inputMongoDBStep);
|
//
|
// //设置mysql元
|
// TableOutputMeta tableOutputMeta = new TableOutputMeta();
|
// //设置数据库元
|
// tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output"));
|
// //mongodb中数据库表/集合,就是表名
|
// tableOutputMeta.setTableName(transData.getString("collection"));
|
// //将mysql元加入步骤元
|
// StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta);
|
// insertUpdateStep.setLocation(150, 50);
|
// insertUpdateStep.setDraw(true);
|
//
|
// transMeta.addStep(insertUpdateStep);
|
//
|
// //增加个空元,
|
// DummyTransMeta dummyMeta = new DummyTransMeta();
|
//
|
// //将空元加入步骤元
|
// StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta);
|
// dummyStep.setLocation(200, 50);
|
// dummyStep.setDraw(true);
|
//
|
// transMeta.addStep(dummyStep);
|
//
|
// //设置步骤直接的关系
|
// TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep);
|
// transMeta.addTransHop(hop);
|
// TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
// transMeta.addTransHop(hop2);
|
//
|
// //开始执行数据抽取
|
// //将转化元实例化转换
|
// Trans trans = new Trans(transMeta);
|
// trans.prepareExecution(null);
|
//
|
// trans.startThreads();
|
// trans.waitUntilFinished();
|
//
|
// if (trans.getErrors() > 0) {
|
// System.out.println(">>>>>>>>>> ERROR");
|
// returnStr = "{result:\"fail\"}";
|
// }
|
// else {
|
// System.out.println(">>>>>>>>>> SUCCESS ");
|
// returnStr = "{result:\"success\"}";
|
// }
|
// } catch(Exception e) {
|
// return "{result:\"fail\"}";
|
// }
|
// return returnStr;
|
// }
|
|
/**
|
* 测试数据库连接
|
* @param response
|
* @param transData
|
* @return
|
* @throws Exception
|
*/
|
@RequestMapping("/connectionDB")
|
@ResponseBody
|
public String connectionDB(HttpServletResponse response, @RequestBody JSONObject transData, TUser users) throws Exception {
|
//userService.addUser(users);
|
KettleClientEnvironment.init();
|
DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"),
|
transData.getString("userName"), transData.getString("pwd"));
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
Database ds = new Database(loggingObject, dataMeta);
|
ds.normalConnect(null);
|
Connection conn = ds.getConnection();
|
/* PreparedStatement stmt = conn.prepareStatement("select * from usr");
|
ResultSet rslt = stmt.executeQuery();
|
while (rslt.next()) {
|
System.out.println(rslt.getString("username"));
|
}*/
|
ds.disconnect();
|
return "{result:\"success\"}";
|
}
|
|
/**
|
* 创建SQL中tablename
|
* 返回表名
|
*/
|
public String createSQLTable(DruidPooledConnection connSys, ResultSet rslt) throws Exception {
|
List<TableColumn> columnNameAndType = new ArrayList<TableColumn>();
|
ResultSetMetaData md = rslt.getMetaData();
|
for (int i = 1; i <= md.getColumnCount(); i++) {
|
TableColumn tableColumn = new TableColumn();
|
tableColumn.setColumnName(md.getColumnName(i));
|
tableColumn.setColumnType(md.getColumnTypeName(i));
|
tableColumn.setColumnLength(String.valueOf(md.getColumnDisplaySize(i)));
|
columnNameAndType.add(tableColumn);
|
}
|
//创建表
|
String SQLStr = "", tableName = "";
|
if (!columnNameAndType.isEmpty()) {
|
tableName = "SQL_" + sdf.format(new Date());
|
SQLStr = splicingSQL(tableName, columnNameAndType);
|
PreparedStatement stmt = connSys.prepareStatement(SQLStr);
|
stmt.execute();
|
}
|
return tableName;
|
}
|
|
/**
|
* 通过SQL提取数据
|
*/
|
@RequestMapping("/sqlPick")
|
@ResponseBody
|
public String sqlPick(@RequestBody JSONObject dbData) {
|
try {
|
String tableName = "", sqlStr = "";
|
|
Database dsInput = null;
|
Database dsOutput = null;
|
TableInputMeta tableInputMeta = new TableInputMeta();
|
TableOutputMeta talbeOutputMeta = new TableOutputMeta();
|
TransMeta transMeta = new TransMeta();
|
KettleDBTrans kettleDBTrans = new KettleDBTrans();
|
kettleDBTrans.setDsInput(dsInput);
|
kettleDBTrans.setDsOutput(dsOutput);
|
kettleDBTrans.setTableInputMeta(tableInputMeta);
|
kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
|
kettleDBTrans.setTransMeta(transMeta);
|
|
kettleDBConn(kettleDBTrans);
|
|
//SQL提取数据
|
tableName = "SQL_" + sdf.format(new Date());
|
sqlStr = dbData.getString("sqlStr");
|
dataExtraction(tableName, sqlStr, kettleDBTrans);
|
kettleDBTrans.getDsInput().disconnect();
|
kettleDBTrans.getDsOutput().disconnect();
|
} catch(Exception e) {
|
return "{result:\"fail\"}";
|
}
|
return "{result:\"success\"}";
|
}
|
/**
|
* 抽取SQL数据
|
*/
|
public void extractSQLData(DruidPooledConnection connSys, ResultSet rslt, String tableName) throws Exception {
|
PreparedStatement stmtDes;
|
int columnCount;
|
String SQLStr = "", valueStr = "";
|
columnCount = rslt.getMetaData().getColumnCount();
|
SQLStr = " insert into "+tableName+" ( ";
|
valueStr = " values (";
|
for(int i = 0; i < columnCount; i++) {
|
SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", ";
|
valueStr = valueStr + "?, ";
|
}
|
SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")";
|
valueStr = valueStr.substring(0, valueStr.length() - 2) + ")";
|
SQLStr = SQLStr + valueStr;
|
stmtDes = connSys.prepareStatement(SQLStr);
|
while (rslt.next()) {
|
for(int i = 0; i < columnCount; i++) {
|
stmtDes.setString(i + 1, rslt.getString(i+1));
|
}
|
stmtDes.addBatch();
|
}
|
stmtDes.executeBatch();
|
}
|
/**
|
* 测试SQL语句正确性
|
*/
|
@RequestMapping("/sqlTest")
|
@ResponseBody
|
public String sqlTest(@RequestBody JSONObject dbData) {
|
|
try {
|
//连接数据库
|
String dbConn = dbData.getString("DBConn");
|
JSONObject DBParam = JSONArray.parseObject(dbConn);
|
|
KettleClientEnvironment.init();
|
DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"),
|
DBParam.getString("userName"), DBParam.getString("pwd"));
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
Database ds = new Database(loggingObject, dataMeta);
|
ds.connect();
|
Connection conn = ds.getConnection();
|
String sqlStr = dbData.getString("sqlStr");
|
PreparedStatement stmt = conn.prepareStatement(sqlStr);
|
stmt.executeQuery();
|
ds.disconnect();
|
} catch(Exception e) {
|
return "{result:\"fail\"}";
|
}
|
return "{result:\"success\"}";
|
}
|
/**
|
* 当选中数据表选项卡,显示数据表及视图名称
|
* @param response
|
* @param transData
|
* @return
|
* @throws Exception
|
*/
|
@RequestMapping("/showTableAndView")
|
@ResponseBody
|
public String showTableAndView(HttpServletResponse response, @RequestBody JSONObject transData) throws Exception {
|
KettleClientEnvironment.init();
|
DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"),
|
transData.getString("userName"), transData.getString("pwd"));
|
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
Database ds = new Database(loggingObject, dataMeta);
|
//ds.normalConnect(null);
|
ds.connect();
|
Connection conn = ds.getConnection();
|
List<String> UTable = new ArrayList<String>();
|
List<String> VTable = new ArrayList<String>();
|
//得到数据库中的表
|
// PreparedStatement stmt = conn.prepareStatement("select name from sysobjects where xtype='U'");
|
// ResultSet rslt = stmt.executeQuery();
|
// while (rslt.next()) {
|
// UTable.add(rslt.getString("name"));
|
// }
|
// String[] UTableArray = ds.getTablenames("dbo", true);
|
// for (int i = 0; i < UTableArray.length; i++) {
|
// UTable.add(UTableArray[i].replace("dbo.", ""));
|
// }
|
String[] UTableArray = ds.getTablenames(true);
|
for (int i = 0; i < UTableArray.length; i++) {
|
int lastHao = UTableArray[i].lastIndexOf(".");
|
String uTable = UTableArray[i];
|
if (lastHao > 0) {
|
UTable.add(uTable.substring(lastHao + 1, uTable.length()));
|
}
|
else {
|
UTable.add(UTableArray[i]);
|
}
|
|
}
|
String tableJson = JSON.toJSONString(UTable);
|
//得到数据库中的视图
|
// stmt = conn.prepareStatement("select name from sysobjects where xtype='V'");
|
// rslt = stmt.executeQuery();
|
// while (rslt.next()) {
|
// VTable.add(rslt.getString("name"));
|
// }
|
String[] VTableArray = ds.getViews("dbo", true);
|
for (int i = 0; i < VTableArray.length; i++) {
|
VTable.add(VTableArray[i].replace("dbo.", ""));
|
}
|
String viewJson = JSON.toJSONString(VTable);
|
ds.disconnect();
|
return "{result:\"success\",table:["+tableJson+"],view:["+viewJson+"]}";
|
}
|
/**
|
* 使用kettle创建表
|
*/
|
public void createTable(Database dsOutput, RowMetaInterface rm, String tableName) {
|
try {
|
System.out.println("开始创建表:" + tableName);
|
String dbProName = dsOutput.getConnection().getMetaData().getDatabaseProductName();
|
System.out.println("数据库类型:" + dbProName);
|
String sql = dsOutput.getDDL(tableName, rm);
|
dsOutput.execStatement(sql.replace(";", ""));
|
System.out.println("创建表成功");
|
} catch(Exception e) {
|
System.out.println("创建表失败");
|
}
|
|
}
|
/**
|
* 新版抽取数据
|
* dataExtraction(String tableName, String sqlStr, TransMeta transMeta, Database dsInput, Database dsOutput, TableInputMeta tableInputMeta, TableOutputMeta tableOutputMeta) throws Exception {
|
tableInputMeta.setSQL(sqlStr);
|
*/
|
public void dataExtraction(String tableName, String sqlStr, KettleDBTrans kettleDBTrans) throws Exception {
|
kettleDBTrans.getTableInputMeta().setSQL(sqlStr);
|
|
StepMeta inputStep = new StepMeta(STEP_READ_FROM_TABLE, kettleDBTrans.getTableInputMeta());
|
inputStep.setLocation(50, 50);
|
inputStep.setDraw(true);
|
kettleDBTrans.getTransMeta().addStep(inputStep);
|
|
//判断目标数据库中,是否存在表,如:存在重新命名表名,用新表名创建表,如:不存在,直接创建表
|
boolean hasTable = kettleDBTrans.getDsOutput().checkTableExists(tableName);
|
if (hasTable) {
|
tableName = tableName + "_" + sdf.format(new Date());
|
}
|
//得到原始表结构
|
PreparedStatement stmt = null;
|
RowMetaInterface rm = null;
|
stmt = kettleDBTrans.getDsInput().getConnection().prepareStatement(sqlStr);
|
stmt.executeQuery();
|
rm = kettleDBTrans.getDsInput().getQueryFields(sqlStr, false);
|
//创建数据表
|
createTable(kettleDBTrans.getDsOutput(), rm, tableName);
|
|
kettleDBTrans.getTalbeOutputMeta().setTableName(tableName);
|
|
StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, kettleDBTrans.getTalbeOutputMeta());
|
insertUpdateStep.setLocation(150, 50);
|
insertUpdateStep.setDraw(true);
|
kettleDBTrans.getTransMeta().addStep(insertUpdateStep);
|
|
DummyTransMeta dummyMeta = new DummyTransMeta();
|
|
StepMeta dummyStep = new StepMeta(STEP_DUMMY, dummyMeta);
|
dummyStep.setLocation(200, 50);
|
dummyStep.setDraw(true);
|
|
kettleDBTrans.getTransMeta().addStep(dummyStep);
|
|
//设置步骤直接的关系
|
TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep);
|
kettleDBTrans.getTransMeta().addTransHop(hop);
|
TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
kettleDBTrans.getTransMeta().addTransHop(hop2);
|
|
//开始执行
|
Trans trans = new Trans(kettleDBTrans.getTransMeta());
|
trans.prepareExecution(null);
|
|
trans.startThreads();
|
|
trans.waitUntilFinished();
|
|
if (trans.getErrors() > 0) {
|
System.out.println(">>>>>>>>>> ERROR");
|
}
|
else {
|
System.out.println(">>>>>>>>>> SUCCESS ");
|
}
|
}
|
/**
|
* 创建集成方法
|
*/
|
public void kettleDBConn(KettleDBTrans kettleDBTrans) throws Exception {
|
//连接数据库
|
// String dbConn = kettleDBTrans.getAssemble().getString("DBConn");\
|
String dbConn= "";
|
JSONObject DBParam = JSONArray.parseObject(dbConn);
|
//初始化kettle环境
|
KettleClientEnvironment.init();
|
KettleClientEnvironment.init();
|
//导入数据的数据库连接
|
DatabaseMeta dataMetaInput = new DatabaseMeta("Input", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"),
|
DBParam.getString("userName"), DBParam.getString("pwd"));
|
//导出数据的数据库连接
|
DatabaseMeta dataMetaOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
|
dataSourceConn.getUsername(), dataSourceConn.getPassword());
|
//导出数据库连接
|
LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput));
|
kettleDBTrans.getDsInput().connect();
|
Connection connInput = kettleDBTrans.getDsInput().getConnection();
|
//导入数据库连接
|
kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput));
|
kettleDBTrans.getDsOutput().connect();
|
Connection connOutput = kettleDBTrans.getDsOutput().getConnection();
|
|
kettleDBTrans.getTransMeta().setName("数据抽取");
|
|
kettleDBTrans.getTransMeta().addDatabase(dataMetaInput);
|
kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput);
|
|
//导出数据
|
kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input"));
|
//导入数据
|
kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output"));
|
}
|
/**
|
* 1.得到要提取数据的表与视图名称
|
* 2.在程序数据库中创建要提取数据的表与视图的表的名称
|
* 3.将要提取表与视图中的数据存储到系统的实体表中
|
* 4.最后,返回结果
|
*/
|
//提取表、视图中的数据
|
@RequestMapping("/extractDBData")
|
@ResponseBody
|
public String extractDBData(HttpServletResponse response, @RequestBody JSONObject dbData) throws Exception {
|
//表
|
int k = 0;
|
String sqlStr = "", tableName = "";
|
|
Database dsInput = null;
|
Database dsOutput = null;
|
TableInputMeta tableInputMeta = new TableInputMeta();
|
TableOutputMeta talbeOutputMeta = new TableOutputMeta();
|
TransMeta transMeta = new TransMeta();
|
KettleDBTrans kettleDBTrans = new KettleDBTrans();
|
// kettleDBTrans.setDbData(dbData);
|
kettleDBTrans.setDsInput(dsInput);
|
kettleDBTrans.setDsOutput(dsOutput);
|
kettleDBTrans.setTableInputMeta(tableInputMeta);
|
kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
|
kettleDBTrans.setTransMeta(transMeta);
|
|
kettleDBConn(kettleDBTrans);
|
|
JSONArray uTable = dbData.getJSONArray("UTable");
|
//循环实体表
|
if (!uTable.isEmpty()) {
|
for(k = 0; k < uTable.size(); k++) {
|
tableName = uTable.getString(k);
|
sqlStr = " select * from " + tableName;
|
dataExtraction(tableName, sqlStr, kettleDBTrans);
|
}
|
}
|
|
JSONArray vTable = dbData.getJSONArray("VTable");
|
//循环视图
|
if (!vTable.isEmpty()) {
|
for(k = 0; k < vTable.size(); k++) {
|
tableName = vTable.getString(k);
|
sqlStr = " select * from " + tableName;
|
dataExtraction(tableName, sqlStr, kettleDBTrans);
|
}
|
}
|
//关闭数据库连接
|
kettleDBTrans.getDsInput().disconnect();
|
kettleDBTrans.getDsOutput().disconnect();
|
return "{result: \"success\"}";
|
}
|
|
/**
|
* 抽取数据
|
* extractData(connSys, realTableName, conn, tableName);
|
* connSys 系统数据库
|
* realTableName 系统数据库表名
|
* conn 提取数据库
|
* tableName 提取数据库表名
|
*/
|
public void extractData(DruidPooledConnection connSys, String tableNameDes, Connection conn, String tableNameSrc) throws Exception {
|
PreparedStatement stmt, stmtDes;
|
ResultSet rslt;
|
int columnCount;
|
String SQLStr = "", valueStr = "";
|
|
SQLStr = " select * from "+tableNameSrc+" ";
|
stmt = conn.prepareStatement(SQLStr);
|
rslt = stmt.executeQuery();
|
columnCount = rslt.getMetaData().getColumnCount();
|
SQLStr = " insert into "+tableNameDes+" ( ";
|
valueStr = " values (";
|
for(int i = 0; i < columnCount; i++) {
|
SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", ";
|
valueStr = valueStr + "?, ";
|
}
|
SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")";
|
valueStr = valueStr.substring(0, valueStr.length() - 2) + ")";
|
SQLStr = SQLStr + valueStr;
|
stmtDes = connSys.prepareStatement(SQLStr);
|
while (rslt.next()) {
|
for(int i = 0; i < columnCount; i++) {
|
stmtDes.setString(i + 1, rslt.getString(i+1));
|
}
|
stmtDes.addBatch();
|
}
|
stmtDes.executeBatch();
|
}
|
|
/**
|
* 根据数据库类型转换对应类型
|
*
|
*/
|
public String getColumnStr(TableColumn tableColumn) throws Exception {
|
StringBuffer sqlStr = new StringBuffer();
|
String returnStr = "";
|
if (tableColumn.getColumnType().contains("varchar")) {
|
sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
|
}
|
else if (tableColumn.getColumnType().contains("int")) {
|
sqlStr.append(tableColumn.getColumnName() + " int(" + tableColumn.getColumnLength() + "),");
|
}
|
else if (tableColumn.getColumnType().contains("date")) {
|
sqlStr.append(tableColumn.getColumnName() + " date,");
|
}
|
else if (tableColumn.getColumnType().contains("nvarchar")) {
|
sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
|
}
|
else if (tableColumn.getColumnType().contains("nchar")) {
|
sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
|
}
|
return sqlStr.toString();
|
}
|
|
/**
|
* 拼接创建新表的语句
|
* 1.系统数据库是mysql,所以,需要将其它数据库的类型,转换成mysql可以识别的类型
|
*/
|
public String splicingSQL(String tableName, List<TableColumn> columnNameAndType) throws Exception {
|
String sqlStr = "";
|
//第一步,类型 识别类型
|
sqlStr = "create table " + tableName + " (";
|
StringBuffer columnStr = new StringBuffer();
|
columnStr.append(sqlStr);
|
for (TableColumn tableColumn : columnNameAndType) {
|
columnStr.append(getColumnStr(tableColumn));
|
}
|
sqlStr = columnStr.toString();
|
return sqlStr.substring(0, sqlStr.length() - 1) + ")";
|
}
|
/**
|
* 创建表
|
* 1.需要判段,系统数据库中是否已存在表名(1.有的话重新命名,tableName + 年月日时分秒
|
*/
|
public String createTable(DruidPooledConnection connSys, String tableName, List<TableColumn> columnNameAndType) throws Exception {
|
String SQLStr = "";
|
if (!columnNameAndType.isEmpty()) {
|
PreparedStatement stmt;
|
ResultSet rslt;
|
|
SQLStr = " select count(table_name) as tableNum "
|
+ " from information_schema.tables "
|
+ " where table_schema='ssm' and table_name='"+tableName+"'";
|
stmt = connSys.prepareStatement(SQLStr);
|
rslt = stmt.executeQuery();
|
rslt.next();
|
if (rslt.getInt("tableNum") > 0) {
|
tableName = tableName + "_" + sdf.format(new Date());
|
}
|
SQLStr = splicingSQL(tableName, columnNameAndType);
|
stmt = connSys.prepareStatement(SQLStr);
|
stmt.execute();
|
}
|
return tableName;
|
}
|
|
|
//得到抽取数据库表的字段名称及字段类型
|
public List<TableColumn> getColumnNameAndType(Connection conn, String tableName, String type) throws Exception {
|
List<TableColumn> columnNameAndType = new ArrayList<TableColumn>();
|
//得到数据库中的表
|
String SQLStr = " select c.name As ColumnsName , t.name as ColumnsType, c.length as ColumnsLength "
|
+ " from SysObjects As o "
|
+ " left join SysColumns As c on o.id=c.id "
|
+ " left join SysTypes As t on c.xtype=t.xusertype "
|
+ " where o.type = '"+type+"' and o.name='"+tableName+"' ";
|
|
PreparedStatement stmt = conn.prepareStatement(SQLStr);
|
ResultSet rslt = stmt.executeQuery();
|
while (rslt.next()) {
|
TableColumn tableColumn = new TableColumn();
|
tableColumn.setColumnName(rslt.getString("ColumnsName"));
|
tableColumn.setColumnType(rslt.getString("ColumnsType"));
|
tableColumn.setColumnLength(rslt.getString("ColumnsLength"));
|
columnNameAndType.add(tableColumn);
|
}
|
|
return columnNameAndType;
|
}
|
|
|
|
/* protected void sendJsonData(HttpServletResponse response) throws Exception{
|
response.setContentType("text/javascript;charset=UTF-8");
|
PrintWriter out = response.getWriter();
|
out.println("alert(\"数据库连接成功!!!\")");
|
out.flush();
|
out.close();
|
}*/
|
|
}
|