New file |
| | |
| | | package com.highdatas.mdm.controller;
|
| | |
|
| | | import com.alibaba.druid.pool.DruidPooledConnection;
|
| | | import com.alibaba.fastjson.JSON;
|
| | | import com.alibaba.fastjson.JSONArray;
|
| | | import com.alibaba.fastjson.JSONObject;
|
| | |
|
| | | import com.highdatas.mdm.entity.TUser;
|
| | | import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
|
| | | import com.highdatas.mdm.pojo.kettle.KettleDBTrans;
|
| | | import com.highdatas.mdm.pojo.kettle.TableColumn;
|
| | | import org.pentaho.di.core.KettleClientEnvironment;
|
| | | import org.pentaho.di.core.database.Database;
|
| | | import org.pentaho.di.core.database.DatabaseMeta;
|
| | | import org.pentaho.di.core.logging.LoggingObjectInterface;
|
| | | import org.pentaho.di.core.logging.LoggingObjectType;
|
| | | import org.pentaho.di.core.logging.SimpleLoggingObject;
|
| | | import org.pentaho.di.core.row.RowMetaInterface;
|
| | | import org.pentaho.di.trans.Trans;
|
| | | import org.pentaho.di.trans.TransHopMeta;
|
| | | import org.pentaho.di.trans.TransMeta;
|
| | | import org.pentaho.di.trans.step.StepMeta;
|
| | | import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta;
|
| | |
|
| | | import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
|
| | | import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
|
| | |
|
| | | import org.springframework.beans.factory.annotation.Autowired;
|
| | | import org.springframework.stereotype.Controller;
|
| | | import org.springframework.web.bind.annotation.RequestBody;
|
| | | import org.springframework.web.bind.annotation.RequestMapping;
|
| | | import org.springframework.web.bind.annotation.ResponseBody;
|
| | |
|
| | | import javax.servlet.http.HttpServletResponse;
|
| | | import java.sql.Connection;
|
| | | import java.sql.PreparedStatement;
|
| | | import java.sql.ResultSet;
|
| | | import java.sql.ResultSetMetaData;
|
| | | import java.text.SimpleDateFormat;
|
| | | import java.util.ArrayList;
|
| | | import java.util.Date;
|
| | | import java.util.List;
|
| | |
|
| | | @Controller
|
| | | @RequestMapping("/database")
|
| | | //http://localhost:8080/users/addUser
|
| | | public class SettleController {
|
| | |
|
| | | |
| | | @Autowired
|
| | | private UnBigDataDataSourceInfo dataSourceConn;
|
| | | |
| | | private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
|
| | | private String STEP_READ_FROM_TABLE = "Read data from table";
|
| | | private String STEP_INSERT_UPDATE = "Insert or update";
|
| | | private String STEP_DUMMY = "Dummy";
|
| | | |
| | |
|
| | | /**
|
| | | * 提取mongodb数据
|
| | | * @param response
|
| | | * @param transData
|
| | | * @return
|
| | | * @throws
|
| | | */
|
| | | // @RequestMapping("/drawMongoDB")
|
| | | // @ResponseBody
|
| | | // public String drawMongoDB(HttpServletResponse response, @RequestBody JSONObject transData) {
|
| | | // String returnStr = "";
|
| | | // try {
|
| | | // KettleClientEnvironment.init();
|
| | | //
|
| | | // TransMeta transMeta = new TransMeta();
|
| | | // transMeta.setName("抽取mongodb数据");
|
| | | // //导出数据的数据库连接
|
| | | // DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
|
| | | // dataSourceConn.getUsername(), dataSourceConn.getPassword());
|
| | | // transMeta.addDatabase(dataBaseOutput);
|
| | | //
|
| | | // MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta();
|
| | | // mongoDbInputMeta.setHostnames(transData.getString("hostName"));
|
| | | // mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName"));
|
| | | // mongoDbInputMeta.setPort(transData.getString("portNum"));
|
| | | // mongoDbInputMeta.setAuthenticationUser(transData.getString("userName"));
|
| | | // mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd"));
|
| | | // mongoDbInputMeta.setDbName(transData.getString("dbName"));
|
| | | // mongoDbInputMeta.setCollection(transData.getString("collection"));
|
| | | // //得到抽取字段
|
| | | // ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length);
|
| | | // Collections.addAll(fieldList, transData.getString("drawfield").split(","));
|
| | | // //设置不取JSON字段
|
| | | // mongoDbInputMeta.setOutputJson(false);
|
| | | // //设置提取字段名称、path、type
|
| | | // List<MongoField> normalList = new ArrayList<MongoField>();
|
| | | // for (String fieldStr : fieldList) {
|
| | | // MongoField newField = new MongoField();
|
| | | // newField.m_fieldName = fieldStr;
|
| | | // newField.m_fieldPath = "$." + fieldStr;
|
| | | // newField.m_kettleType = "String";
|
| | | // normalList.add(newField);
|
| | | // }
|
| | | // //设置mongodb提取字段
|
| | | // mongoDbInputMeta.setMongoFields(normalList);
|
| | | // //设置mogodb步骤元
|
| | | // StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta);
|
| | | // inputMongoDBStep.setLocation(50, 50);
|
| | | // inputMongoDBStep.setDraw(true);
|
| | | // //将mogodb步骤元加入转化中
|
| | | // transMeta.addStep(inputMongoDBStep);
|
| | | //
|
| | | // //设置mysql元
|
| | | // TableOutputMeta tableOutputMeta = new TableOutputMeta();
|
| | | // //设置数据库元
|
| | | // tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output"));
|
| | | // //mongodb中数据库表/集合,就是表名
|
| | | // tableOutputMeta.setTableName(transData.getString("collection"));
|
| | | // //将mysql元加入步骤元
|
| | | // StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta);
|
| | | // insertUpdateStep.setLocation(150, 50);
|
| | | // insertUpdateStep.setDraw(true);
|
| | | //
|
| | | // transMeta.addStep(insertUpdateStep);
|
| | | //
|
| | | // //增加个空元,
|
| | | // DummyTransMeta dummyMeta = new DummyTransMeta();
|
| | | //
|
| | | // //将空元加入步骤元
|
| | | // StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta);
|
| | | // dummyStep.setLocation(200, 50);
|
| | | // dummyStep.setDraw(true);
|
| | | //
|
| | | // transMeta.addStep(dummyStep);
|
| | | //
|
| | | // //设置步骤直接的关系
|
| | | // TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep);
|
| | | // transMeta.addTransHop(hop);
|
| | | // TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
| | | // transMeta.addTransHop(hop2);
|
| | | //
|
| | | // //开始执行数据抽取
|
| | | // //将转化元实例化转换
|
| | | // Trans trans = new Trans(transMeta);
|
| | | // trans.prepareExecution(null);
|
| | | //
|
| | | // trans.startThreads();
|
| | | // trans.waitUntilFinished();
|
| | | //
|
| | | // if (trans.getErrors() > 0) {
|
| | | // System.out.println(">>>>>>>>>> ERROR");
|
| | | // returnStr = "{result:\"fail\"}";
|
| | | // }
|
| | | // else {
|
| | | // System.out.println(">>>>>>>>>> SUCCESS ");
|
| | | // returnStr = "{result:\"success\"}";
|
| | | // }
|
| | | // } catch(Exception e) {
|
| | | // return "{result:\"fail\"}";
|
| | | // }
|
| | | // return returnStr;
|
| | | // }
|
| | | //
|
| | | // /**
|
| | | // * 提取mongodb数据
|
| | | // * @param response
|
| | | // * @param transData
|
| | | // * @return
|
| | | // * @throws
|
| | | // */
|
| | | // @RequestMapping("/drawHbase")
|
| | | // @ResponseBody
|
| | | // public String drawHbase(HttpServletResponse response, @RequestBody JSONObject transData) {
|
| | | // String returnStr = "";
|
| | | // try {
|
| | | // KettleClientEnvironment.init();
|
| | | //
|
| | | // TransMeta transMeta = new TransMeta();
|
| | | // transMeta.setName("抽取Hbase数据");
|
| | | // //导出数据的数据库连接
|
| | | // DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
|
| | | // dataSourceConn.getUsername(), dataSourceConn.getPassword());
|
| | | // transMeta.addDatabase(dataBaseOutput);
|
| | | // //HBaseInputMeta hBaseInputMeta= new HBaseInputMeta()
|
| | | //
|
| | | // MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta();
|
| | | // mongoDbInputMeta.setHostnames(transData.getString("hostName"));
|
| | | // mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName"));
|
| | | // mongoDbInputMeta.setPort(transData.getString("portNum"));
|
| | | // mongoDbInputMeta.setAuthenticationUser(transData.getString("userName"));
|
| | | // mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd"));
|
| | | // mongoDbInputMeta.setDbName(transData.getString("dbName"));
|
| | | // mongoDbInputMeta.setCollection(transData.getString("collection"));
|
| | | // //得到抽取字段
|
| | | // ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length);
|
| | | // Collections.addAll(fieldList, transData.getString("drawfield").split(","));
|
| | | // //设置不取JSON字段
|
| | | // mongoDbInputMeta.setOutputJson(false);
|
| | | // //设置提取字段名称、path、type
|
| | | // List<MongoField> normalList = new ArrayList<MongoField>();
|
| | | // for (String fieldStr : fieldList) {
|
| | | // MongoField newField = new MongoField();
|
| | | // newField.m_fieldName = fieldStr;
|
| | | // newField.m_fieldPath = "$." + fieldStr;
|
| | | // newField.m_kettleType = "String";
|
| | | // normalList.add(newField);
|
| | | // }
|
| | | // //设置mongodb提取字段
|
| | | // mongoDbInputMeta.setMongoFields(normalList);
|
| | | // //设置mogodb步骤元
|
| | | // StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta);
|
| | | // inputMongoDBStep.setLocation(50, 50);
|
| | | // inputMongoDBStep.setDraw(true);
|
| | | // //将mogodb步骤元加入转化中
|
| | | // transMeta.addStep(inputMongoDBStep);
|
| | | //
|
| | | // //设置mysql元
|
| | | // TableOutputMeta tableOutputMeta = new TableOutputMeta();
|
| | | // //设置数据库元
|
| | | // tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output"));
|
| | | // //mongodb中数据库表/集合,就是表名
|
| | | // tableOutputMeta.setTableName(transData.getString("collection"));
|
| | | // //将mysql元加入步骤元
|
| | | // StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta);
|
| | | // insertUpdateStep.setLocation(150, 50);
|
| | | // insertUpdateStep.setDraw(true);
|
| | | //
|
| | | // transMeta.addStep(insertUpdateStep);
|
| | | //
|
| | | // //增加个空元,
|
| | | // DummyTransMeta dummyMeta = new DummyTransMeta();
|
| | | //
|
| | | // //将空元加入步骤元
|
| | | // StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta);
|
| | | // dummyStep.setLocation(200, 50);
|
| | | // dummyStep.setDraw(true);
|
| | | //
|
| | | // transMeta.addStep(dummyStep);
|
| | | //
|
| | | // //设置步骤直接的关系
|
| | | // TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep);
|
| | | // transMeta.addTransHop(hop);
|
| | | // TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
| | | // transMeta.addTransHop(hop2);
|
| | | //
|
| | | // //开始执行数据抽取
|
| | | // //将转化元实例化转换
|
| | | // Trans trans = new Trans(transMeta);
|
| | | // trans.prepareExecution(null);
|
| | | //
|
| | | // trans.startThreads();
|
| | | // trans.waitUntilFinished();
|
| | | //
|
| | | // if (trans.getErrors() > 0) {
|
| | | // System.out.println(">>>>>>>>>> ERROR");
|
| | | // returnStr = "{result:\"fail\"}";
|
| | | // }
|
| | | // else {
|
| | | // System.out.println(">>>>>>>>>> SUCCESS ");
|
| | | // returnStr = "{result:\"success\"}";
|
| | | // }
|
| | | // } catch(Exception e) {
|
| | | // return "{result:\"fail\"}";
|
| | | // }
|
| | | // return returnStr;
|
| | | // }
|
| | |
|
| | | /**
|
| | | * 测试数据库连接
|
| | | * @param response
|
| | | * @param transData
|
| | | * @return
|
| | | * @throws Exception
|
| | | */
|
| | | @RequestMapping("/connectionDB")
|
| | | @ResponseBody
|
| | | public String connectionDB(HttpServletResponse response, @RequestBody JSONObject transData, TUser users) throws Exception {
|
| | | //userService.addUser(users);
|
| | | KettleClientEnvironment.init();
|
| | | DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"),
|
| | | transData.getString("userName"), transData.getString("pwd"));
|
| | | LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
| | | Database ds = new Database(loggingObject, dataMeta);
|
| | | ds.normalConnect(null);
|
| | | Connection conn = ds.getConnection(); |
| | | /* PreparedStatement stmt = conn.prepareStatement("select * from usr");
|
| | | ResultSet rslt = stmt.executeQuery();
|
| | | while (rslt.next()) {
|
| | | System.out.println(rslt.getString("username"));
|
| | | }*/
|
| | | ds.disconnect();
|
| | | return "{result:\"success\"}";
|
| | | }
|
| | | |
| | | /**
|
| | | * 创建SQL中tablename
|
| | | * 返回表名
|
| | | */
|
| | | public String createSQLTable(DruidPooledConnection connSys, ResultSet rslt) throws Exception {
|
| | | List<TableColumn> columnNameAndType = new ArrayList<TableColumn>();
|
| | | ResultSetMetaData md = rslt.getMetaData();
|
| | | for (int i = 1; i <= md.getColumnCount(); i++) {
|
| | | TableColumn tableColumn = new TableColumn();
|
| | | tableColumn.setColumnName(md.getColumnName(i));
|
| | | tableColumn.setColumnType(md.getColumnTypeName(i));
|
| | | tableColumn.setColumnLength(String.valueOf(md.getColumnDisplaySize(i)));
|
| | | columnNameAndType.add(tableColumn); |
| | | }
|
| | | //创建表
|
| | | String SQLStr = "", tableName = "";
|
| | | if (!columnNameAndType.isEmpty()) {
|
| | | tableName = "SQL_" + sdf.format(new Date());
|
| | | SQLStr = splicingSQL(tableName, columnNameAndType);
|
| | | PreparedStatement stmt = connSys.prepareStatement(SQLStr);
|
| | | stmt.execute(); |
| | | }
|
| | | return tableName; |
| | | }
|
| | | |
| | | /**
|
| | | * 通过SQL提取数据
|
| | | */
|
| | | @RequestMapping("/sqlPick")
|
| | | @ResponseBody
|
| | | public String sqlPick(@RequestBody JSONObject dbData) {
|
| | | try {
|
| | | String tableName = "", sqlStr = "";
|
| | |
|
| | | Database dsInput = null;
|
| | | Database dsOutput = null;
|
| | | TableInputMeta tableInputMeta = new TableInputMeta();
|
| | | TableOutputMeta talbeOutputMeta = new TableOutputMeta();
|
| | | TransMeta transMeta = new TransMeta();
|
| | | KettleDBTrans kettleDBTrans = new KettleDBTrans();
|
| | | kettleDBTrans.setDsInput(dsInput);
|
| | | kettleDBTrans.setDsOutput(dsOutput);
|
| | | kettleDBTrans.setTableInputMeta(tableInputMeta);
|
| | | kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
|
| | | kettleDBTrans.setTransMeta(transMeta);
|
| | | |
| | | kettleDBConn(kettleDBTrans);
|
| | | |
| | | //SQL提取数据
|
| | | tableName = "SQL_" + sdf.format(new Date());
|
| | | sqlStr = dbData.getString("sqlStr");
|
| | | dataExtraction(tableName, sqlStr, kettleDBTrans);
|
| | | kettleDBTrans.getDsInput().disconnect();
|
| | | kettleDBTrans.getDsOutput().disconnect();
|
| | | } catch(Exception e) {
|
| | | return "{result:\"fail\"}"; |
| | | }
|
| | | return "{result:\"success\"}";
|
| | | }
|
| | | /**
|
| | | * 抽取SQL数据
|
| | | */
|
| | | public void extractSQLData(DruidPooledConnection connSys, ResultSet rslt, String tableName) throws Exception {
|
| | | PreparedStatement stmtDes;
|
| | | int columnCount;
|
| | | String SQLStr = "", valueStr = "";
|
| | | columnCount = rslt.getMetaData().getColumnCount();
|
| | | SQLStr = " insert into "+tableName+" ( ";
|
| | | valueStr = " values (";
|
| | | for(int i = 0; i < columnCount; i++) {
|
| | | SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", ";
|
| | | valueStr = valueStr + "?, ";
|
| | | }
|
| | | SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")";
|
| | | valueStr = valueStr.substring(0, valueStr.length() - 2) + ")";
|
| | | SQLStr = SQLStr + valueStr;
|
| | | stmtDes = connSys.prepareStatement(SQLStr);
|
| | | while (rslt.next()) {
|
| | | for(int i = 0; i < columnCount; i++) {
|
| | | stmtDes.setString(i + 1, rslt.getString(i+1));
|
| | | }
|
| | | stmtDes.addBatch();
|
| | | }
|
| | | stmtDes.executeBatch(); |
| | | }
|
| | | /**
|
| | | * 测试SQL语句正确性
|
| | | */
|
| | | @RequestMapping("/sqlTest")
|
| | | @ResponseBody
|
| | | public String sqlTest(@RequestBody JSONObject dbData) {
|
| | | |
| | | try {
|
| | | //连接数据库
|
| | | String dbConn = dbData.getString("DBConn");
|
| | | JSONObject DBParam = JSONArray.parseObject(dbConn);
|
| | | |
| | | KettleClientEnvironment.init();
|
| | | DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"),
|
| | | DBParam.getString("userName"), DBParam.getString("pwd"));
|
| | | LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
| | | Database ds = new Database(loggingObject, dataMeta);
|
| | | ds.connect();
|
| | | Connection conn = ds.getConnection();
|
| | | String sqlStr = dbData.getString("sqlStr");
|
| | | PreparedStatement stmt = conn.prepareStatement(sqlStr);
|
| | | stmt.executeQuery(); |
| | | ds.disconnect();
|
| | | } catch(Exception e) {
|
| | | return "{result:\"fail\"}"; |
| | | }
|
| | | return "{result:\"success\"}"; |
| | | }
|
| | | /**
|
| | | * 当选中数据表选项卡,显示数据表及视图名称
|
| | | * @param response
|
| | | * @param transData
|
| | | * @return
|
| | | * @throws Exception
|
| | | */
|
| | | @RequestMapping("/showTableAndView")
|
| | | @ResponseBody
|
| | | public String showTableAndView(HttpServletResponse response, @RequestBody JSONObject transData) throws Exception {
|
| | | KettleClientEnvironment.init();
|
| | | DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"),
|
| | | transData.getString("userName"), transData.getString("pwd"));
|
| | | |
| | | LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
| | | Database ds = new Database(loggingObject, dataMeta);
|
| | | //ds.normalConnect(null);
|
| | | ds.connect();
|
| | | Connection conn = ds.getConnection();
|
| | | List<String> UTable = new ArrayList<String>();
|
| | | List<String> VTable = new ArrayList<String>();
|
| | | //得到数据库中的表
|
| | | // PreparedStatement stmt = conn.prepareStatement("select name from sysobjects where xtype='U'");
|
| | | // ResultSet rslt = stmt.executeQuery();
|
| | | // while (rslt.next()) {
|
| | | // UTable.add(rslt.getString("name"));
|
| | | // }
|
| | | // String[] UTableArray = ds.getTablenames("dbo", true);
|
| | | // for (int i = 0; i < UTableArray.length; i++) {
|
| | | // UTable.add(UTableArray[i].replace("dbo.", ""));
|
| | | // }
|
| | | String[] UTableArray = ds.getTablenames(true);
|
| | | for (int i = 0; i < UTableArray.length; i++) {
|
| | | int lastHao = UTableArray[i].lastIndexOf(".");
|
| | | String uTable = UTableArray[i];
|
| | | if (lastHao > 0) {
|
| | | UTable.add(uTable.substring(lastHao + 1, uTable.length()));
|
| | | }
|
| | | else {
|
| | | UTable.add(UTableArray[i]);
|
| | | }
|
| | | |
| | | } |
| | | String tableJson = JSON.toJSONString(UTable);
|
| | | //得到数据库中的视图
|
| | | // stmt = conn.prepareStatement("select name from sysobjects where xtype='V'");
|
| | | // rslt = stmt.executeQuery();
|
| | | // while (rslt.next()) {
|
| | | // VTable.add(rslt.getString("name"));
|
| | | // } |
| | | String[] VTableArray = ds.getViews("dbo", true);
|
| | | for (int i = 0; i < VTableArray.length; i++) {
|
| | | VTable.add(VTableArray[i].replace("dbo.", ""));
|
| | | } |
| | | String viewJson = JSON.toJSONString(VTable);
|
| | | ds.disconnect();
|
| | | return "{result:\"success\",table:["+tableJson+"],view:["+viewJson+"]}";
|
| | | }
|
| | | /**
|
| | | * 使用kettle创建表 |
| | | */
|
| | | public void createTable(Database dsOutput, RowMetaInterface rm, String tableName) {
|
| | | try {
|
| | | System.out.println("开始创建表:" + tableName);
|
| | | String dbProName = dsOutput.getConnection().getMetaData().getDatabaseProductName();
|
| | | System.out.println("数据库类型:" + dbProName);
|
| | | String sql = dsOutput.getDDL(tableName, rm);
|
| | | dsOutput.execStatement(sql.replace(";", ""));
|
| | | System.out.println("创建表成功"); |
| | | } catch(Exception e) {
|
| | | System.out.println("创建表失败");
|
| | | }
|
| | |
|
| | | }
|
| | | /**
|
| | | * 新版抽取数据
|
| | | * dataExtraction(String tableName, String sqlStr, TransMeta transMeta, Database dsInput, Database dsOutput, TableInputMeta tableInputMeta, TableOutputMeta tableOutputMeta) throws Exception {
|
| | | tableInputMeta.setSQL(sqlStr);
|
| | | */
|
| | | public void dataExtraction(String tableName, String sqlStr, KettleDBTrans kettleDBTrans) throws Exception {
|
| | | kettleDBTrans.getTableInputMeta().setSQL(sqlStr);
|
| | | |
| | | StepMeta inputStep = new StepMeta(STEP_READ_FROM_TABLE, kettleDBTrans.getTableInputMeta());
|
| | | inputStep.setLocation(50, 50);
|
| | | inputStep.setDraw(true);
|
| | | kettleDBTrans.getTransMeta().addStep(inputStep);
|
| | | |
| | | //判断目标数据库中,是否存在表,如:存在重新命名表名,用新表名创建表,如:不存在,直接创建表
|
| | | boolean hasTable = kettleDBTrans.getDsOutput().checkTableExists(tableName);
|
| | | if (hasTable) {
|
| | | tableName = tableName + "_" + sdf.format(new Date());
|
| | | }
|
| | | //得到原始表结构
|
| | | PreparedStatement stmt = null;
|
| | | RowMetaInterface rm = null;
|
| | | stmt = kettleDBTrans.getDsInput().getConnection().prepareStatement(sqlStr);
|
| | | stmt.executeQuery();
|
| | | rm = kettleDBTrans.getDsInput().getQueryFields(sqlStr, false);
|
| | | //创建数据表
|
| | | createTable(kettleDBTrans.getDsOutput(), rm, tableName);
|
| | | |
| | | kettleDBTrans.getTalbeOutputMeta().setTableName(tableName);
|
| | | |
| | | StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, kettleDBTrans.getTalbeOutputMeta());
|
| | | insertUpdateStep.setLocation(150, 50);
|
| | | insertUpdateStep.setDraw(true);
|
| | | kettleDBTrans.getTransMeta().addStep(insertUpdateStep);
|
| | | |
| | | DummyTransMeta dummyMeta = new DummyTransMeta();
|
| | | |
| | | StepMeta dummyStep = new StepMeta(STEP_DUMMY, dummyMeta);
|
| | | dummyStep.setLocation(200, 50);
|
| | | dummyStep.setDraw(true);
|
| | | |
| | | kettleDBTrans.getTransMeta().addStep(dummyStep);
|
| | | |
| | | //设置步骤直接的关系
|
| | | TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep);
|
| | | kettleDBTrans.getTransMeta().addTransHop(hop);
|
| | | TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
|
| | | kettleDBTrans.getTransMeta().addTransHop(hop2);
|
| | | |
| | | //开始执行
|
| | | Trans trans = new Trans(kettleDBTrans.getTransMeta());
|
| | | trans.prepareExecution(null);
|
| | | |
| | | trans.startThreads();
|
| | | |
| | | trans.waitUntilFinished();
|
| | | |
| | | if (trans.getErrors() > 0) {
|
| | | System.out.println(">>>>>>>>>> ERROR");
|
| | | }
|
| | | else {
|
| | | System.out.println(">>>>>>>>>> SUCCESS ");
|
| | | }
|
| | | }
|
| | | /**
|
| | | * 创建集成方法
|
| | | */
|
| | | public void kettleDBConn(KettleDBTrans kettleDBTrans) throws Exception {
|
| | | //连接数据库
|
| | | // String dbConn = kettleDBTrans.getAssemble().getString("DBConn");\
|
| | | String dbConn= "";
|
| | | JSONObject DBParam = JSONArray.parseObject(dbConn);
|
| | | //初始化kettle环境
|
| | | KettleClientEnvironment.init();
|
| | | KettleClientEnvironment.init();
|
| | | //导入数据的数据库连接
|
| | | DatabaseMeta dataMetaInput = new DatabaseMeta("Input", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"),
|
| | | DBParam.getString("userName"), DBParam.getString("pwd"));
|
| | | //导出数据的数据库连接
|
| | | DatabaseMeta dataMetaOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
|
| | | dataSourceConn.getUsername(), dataSourceConn.getPassword());
|
| | | //导出数据库连接
|
| | | LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
|
| | | kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput));
|
| | | kettleDBTrans.getDsInput().connect();
|
| | | Connection connInput = kettleDBTrans.getDsInput().getConnection();
|
| | | //导入数据库连接
|
| | | kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput));
|
| | | kettleDBTrans.getDsOutput().connect();
|
| | | Connection connOutput = kettleDBTrans.getDsOutput().getConnection();
|
| | | |
| | | kettleDBTrans.getTransMeta().setName("数据抽取");
|
| | | |
| | | kettleDBTrans.getTransMeta().addDatabase(dataMetaInput);
|
| | | kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput);
|
| | | |
| | | //导出数据
|
| | | kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input"));
|
| | | //导入数据
|
| | | kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output")); |
| | | }
|
| | | /**
|
| | | * 1.得到要提取数据的表与视图名称
|
| | | * 2.在程序数据库中创建要提取数据的表与视图的表的名称
|
| | | * 3.将要提取表与视图中的数据存储到系统的实体表中
|
| | | * 4.最后,返回结果
|
| | | */
|
| | | //提取表、视图中的数据
|
| | | @RequestMapping("/extractDBData")
|
| | | @ResponseBody
|
| | | public String extractDBData(HttpServletResponse response, @RequestBody JSONObject dbData) throws Exception {
|
| | | //表
|
| | | int k = 0;
|
| | | String sqlStr = "", tableName = "";
|
| | | |
| | | Database dsInput = null;
|
| | | Database dsOutput = null;
|
| | | TableInputMeta tableInputMeta = new TableInputMeta();
|
| | | TableOutputMeta talbeOutputMeta = new TableOutputMeta();
|
| | | TransMeta transMeta = new TransMeta();
|
| | | KettleDBTrans kettleDBTrans = new KettleDBTrans();
|
| | | // kettleDBTrans.setDbData(dbData);
|
| | | kettleDBTrans.setDsInput(dsInput);
|
| | | kettleDBTrans.setDsOutput(dsOutput);
|
| | | kettleDBTrans.setTableInputMeta(tableInputMeta);
|
| | | kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
|
| | | kettleDBTrans.setTransMeta(transMeta);
|
| | | |
| | | kettleDBConn(kettleDBTrans);
|
| | | |
| | | JSONArray uTable = dbData.getJSONArray("UTable");
|
| | | //循环实体表
|
| | | if (!uTable.isEmpty()) {
|
| | | for(k = 0; k < uTable.size(); k++) {
|
| | | tableName = uTable.getString(k);
|
| | | sqlStr = " select * from " + tableName;
|
| | | dataExtraction(tableName, sqlStr, kettleDBTrans);
|
| | | }
|
| | | }
|
| | | |
| | | JSONArray vTable = dbData.getJSONArray("VTable");
|
| | | //循环视图
|
| | | if (!vTable.isEmpty()) {
|
| | | for(k = 0; k < vTable.size(); k++) {
|
| | | tableName = vTable.getString(k);
|
| | | sqlStr = " select * from " + tableName;
|
| | | dataExtraction(tableName, sqlStr, kettleDBTrans);
|
| | | }
|
| | | } |
| | | //关闭数据库连接
|
| | | kettleDBTrans.getDsInput().disconnect();
|
| | | kettleDBTrans.getDsOutput().disconnect();
|
| | | return "{result: \"success\"}";
|
| | | }
|
| | | |
| | | /**
|
| | | * 抽取数据
|
| | | * extractData(connSys, realTableName, conn, tableName);
|
| | | * connSys 系统数据库
|
| | | * realTableName 系统数据库表名
|
| | | * conn 提取数据库
|
| | | * tableName 提取数据库表名 |
| | | */
|
| | | public void extractData(DruidPooledConnection connSys, String tableNameDes, Connection conn, String tableNameSrc) throws Exception {
|
| | | PreparedStatement stmt, stmtDes;
|
| | | ResultSet rslt;
|
| | | int columnCount;
|
| | | String SQLStr = "", valueStr = "";
|
| | | |
| | | SQLStr = " select * from "+tableNameSrc+" ";
|
| | | stmt = conn.prepareStatement(SQLStr);
|
| | | rslt = stmt.executeQuery();
|
| | | columnCount = rslt.getMetaData().getColumnCount();
|
| | | SQLStr = " insert into "+tableNameDes+" ( ";
|
| | | valueStr = " values (";
|
| | | for(int i = 0; i < columnCount; i++) {
|
| | | SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", ";
|
| | | valueStr = valueStr + "?, ";
|
| | | }
|
| | | SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")";
|
| | | valueStr = valueStr.substring(0, valueStr.length() - 2) + ")";
|
| | | SQLStr = SQLStr + valueStr;
|
| | | stmtDes = connSys.prepareStatement(SQLStr);
|
| | | while (rslt.next()) {
|
| | | for(int i = 0; i < columnCount; i++) {
|
| | | stmtDes.setString(i + 1, rslt.getString(i+1));
|
| | | }
|
| | | stmtDes.addBatch();
|
| | | }
|
| | | stmtDes.executeBatch();
|
| | | }
|
| | | |
| | | /**
|
| | | * 根据数据库类型转换对应类型
|
| | | * |
| | | */
|
| | | public String getColumnStr(TableColumn tableColumn) throws Exception {
|
| | | StringBuffer sqlStr = new StringBuffer();
|
| | | String returnStr = "";
|
| | | if (tableColumn.getColumnType().contains("varchar")) {
|
| | | sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
|
| | | }
|
| | | else if (tableColumn.getColumnType().contains("int")) {
|
| | | sqlStr.append(tableColumn.getColumnName() + " int(" + tableColumn.getColumnLength() + "),");
|
| | | }
|
| | | else if (tableColumn.getColumnType().contains("date")) {
|
| | | sqlStr.append(tableColumn.getColumnName() + " date,");
|
| | | }
|
| | | else if (tableColumn.getColumnType().contains("nvarchar")) {
|
| | | sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
|
| | | }
|
| | | else if (tableColumn.getColumnType().contains("nchar")) {
|
| | | sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
|
| | | } |
| | | return sqlStr.toString();
|
| | | }
|
| | | |
| | | /**
|
| | | * 拼接创建新表的语句
|
| | | * 1.系统数据库是mysql,所以,需要将其它数据库的类型,转换成mysql可以识别的类型
|
| | | */
|
| | | public String splicingSQL(String tableName, List<TableColumn> columnNameAndType) throws Exception {
|
| | | String sqlStr = "";
|
| | | //第一步,类型 识别类型
|
| | | sqlStr = "create table " + tableName + " (";
|
| | | StringBuffer columnStr = new StringBuffer();
|
| | | columnStr.append(sqlStr);
|
| | | for (TableColumn tableColumn : columnNameAndType) {
|
| | | columnStr.append(getColumnStr(tableColumn));
|
| | | }
|
| | | sqlStr = columnStr.toString();
|
| | | return sqlStr.substring(0, sqlStr.length() - 1) + ")";
|
| | | }
|
| | | /**
|
| | | * 创建表
|
| | | * 1.需要判段,系统数据库中是否已存在表名(1.有的话重新命名,tableName + 年月日时分秒
|
| | | */
|
| | | public String createTable(DruidPooledConnection connSys, String tableName, List<TableColumn> columnNameAndType) throws Exception {
|
| | | String SQLStr = "";
|
| | | if (!columnNameAndType.isEmpty()) {
|
| | | PreparedStatement stmt;
|
| | | ResultSet rslt;
|
| | | |
| | | SQLStr = " select count(table_name) as tableNum "
|
| | | + " from information_schema.tables "
|
| | | + " where table_schema='ssm' and table_name='"+tableName+"'";
|
| | | stmt = connSys.prepareStatement(SQLStr);
|
| | | rslt = stmt.executeQuery();
|
| | | rslt.next();
|
| | | if (rslt.getInt("tableNum") > 0) {
|
| | | tableName = tableName + "_" + sdf.format(new Date());
|
| | | }
|
| | | SQLStr = splicingSQL(tableName, columnNameAndType);
|
| | | stmt = connSys.prepareStatement(SQLStr);
|
| | | stmt.execute(); |
| | | }
|
| | | return tableName;
|
| | | }
|
| | | |
| | | |
| | | //得到抽取数据库表的字段名称及字段类型
|
| | | public List<TableColumn> getColumnNameAndType(Connection conn, String tableName, String type) throws Exception {
|
| | | List<TableColumn> columnNameAndType = new ArrayList<TableColumn>();
|
| | | //得到数据库中的表
|
| | | String SQLStr = " select c.name As ColumnsName , t.name as ColumnsType, c.length as ColumnsLength "
|
| | | + " from SysObjects As o "
|
| | | + " left join SysColumns As c on o.id=c.id "
|
| | | + " left join SysTypes As t on c.xtype=t.xusertype "
|
| | | + " where o.type = '"+type+"' and o.name='"+tableName+"' ";
|
| | | |
| | | PreparedStatement stmt = conn.prepareStatement(SQLStr);
|
| | | ResultSet rslt = stmt.executeQuery();
|
| | | while (rslt.next()) {
|
| | | TableColumn tableColumn = new TableColumn();
|
| | | tableColumn.setColumnName(rslt.getString("ColumnsName"));
|
| | | tableColumn.setColumnType(rslt.getString("ColumnsType"));
|
| | | tableColumn.setColumnLength(rslt.getString("ColumnsLength"));
|
| | | columnNameAndType.add(tableColumn);
|
| | | }
|
| | | |
| | | return columnNameAndType;
|
| | | }
|
| | | |
| | | |
| | |
|
| | | /* protected void sendJsonData(HttpServletResponse response) throws Exception{
|
| | | response.setContentType("text/javascript;charset=UTF-8");
|
| | | PrintWriter out = response.getWriter();
|
| | | out.println("alert(\"数据库连接成功!!!\")");
|
| | | out.flush();
|
| | | out.close();
|
| | | }*/ |
| | | |
| | | }
|