package com.highdatas.mdm.controller; import com.alibaba.druid.pool.DruidPooledConnection; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.highdatas.mdm.entity.TUser; import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo; import com.highdatas.mdm.pojo.kettle.KettleDBTrans; import com.highdatas.mdm.pojo.kettle.TableColumn; import org.pentaho.di.core.KettleClientEnvironment; import org.pentaho.di.core.database.Database; import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.logging.LoggingObjectInterface; import org.pentaho.di.core.logging.LoggingObjectType; import org.pentaho.di.core.logging.SimpleLoggingObject; import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.trans.Trans; import org.pentaho.di.trans.TransHopMeta; import org.pentaho.di.trans.TransMeta; import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta; import org.pentaho.di.trans.steps.tableinput.TableInputMeta; import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import javax.servlet.http.HttpServletResponse; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; @Controller @RequestMapping("/database") //http://localhost:8080/users/addUser public class SettleController { @Autowired private UnBigDataDataSourceInfo dataSourceConn; private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); private String STEP_READ_FROM_TABLE = "Read data from table"; private String STEP_INSERT_UPDATE = "Insert or update"; private String STEP_DUMMY = "Dummy"; /** * 提取mongodb数据 * @param response * @param transData * @return * @throws */ // @RequestMapping("/drawMongoDB") // @ResponseBody // public String drawMongoDB(HttpServletResponse response, @RequestBody JSONObject transData) { // String returnStr = ""; // try { // KettleClientEnvironment.init(); // // TransMeta transMeta = new TransMeta(); // transMeta.setName("抽取mongodb数据"); // //导出数据的数据库连接 // DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(), // dataSourceConn.getUsername(), dataSourceConn.getPassword()); // transMeta.addDatabase(dataBaseOutput); // // MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta(); // mongoDbInputMeta.setHostnames(transData.getString("hostName")); // mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName")); // mongoDbInputMeta.setPort(transData.getString("portNum")); // mongoDbInputMeta.setAuthenticationUser(transData.getString("userName")); // mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd")); // mongoDbInputMeta.setDbName(transData.getString("dbName")); // mongoDbInputMeta.setCollection(transData.getString("collection")); // //得到抽取字段 // ArrayList fieldList = new ArrayList(transData.getString("drawfield").split(",").length); // Collections.addAll(fieldList, transData.getString("drawfield").split(",")); // //设置不取JSON字段 // mongoDbInputMeta.setOutputJson(false); // //设置提取字段名称、path、type // List normalList = new ArrayList(); // for (String fieldStr : fieldList) { // MongoField newField = new MongoField(); // newField.m_fieldName = fieldStr; // newField.m_fieldPath = "$." + fieldStr; // newField.m_kettleType = "String"; // normalList.add(newField); // } // //设置mongodb提取字段 // mongoDbInputMeta.setMongoFields(normalList); // //设置mogodb步骤元 // StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta); // inputMongoDBStep.setLocation(50, 50); // inputMongoDBStep.setDraw(true); // //将mogodb步骤元加入转化中 // transMeta.addStep(inputMongoDBStep); // // //设置mysql元 // TableOutputMeta tableOutputMeta = new TableOutputMeta(); // //设置数据库元 // tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output")); // //mongodb中数据库表/集合,就是表名 // tableOutputMeta.setTableName(transData.getString("collection")); // //将mysql元加入步骤元 // StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta); // insertUpdateStep.setLocation(150, 50); // insertUpdateStep.setDraw(true); // // transMeta.addStep(insertUpdateStep); // // //增加个空元, // DummyTransMeta dummyMeta = new DummyTransMeta(); // // //将空元加入步骤元 // StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta); // dummyStep.setLocation(200, 50); // dummyStep.setDraw(true); // // transMeta.addStep(dummyStep); // // //设置步骤直接的关系 // TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep); // transMeta.addTransHop(hop); // TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); // transMeta.addTransHop(hop2); // // //开始执行数据抽取 // //将转化元实例化转换 // Trans trans = new Trans(transMeta); // trans.prepareExecution(null); // // trans.startThreads(); // trans.waitUntilFinished(); // // if (trans.getErrors() > 0) { // System.out.println(">>>>>>>>>> ERROR"); // returnStr = "{result:\"fail\"}"; // } // else { // System.out.println(">>>>>>>>>> SUCCESS "); // returnStr = "{result:\"success\"}"; // } // } catch(Exception e) { // return "{result:\"fail\"}"; // } // return returnStr; // } // // /** // * 提取mongodb数据 // * @param response // * @param transData // * @return // * @throws // */ // @RequestMapping("/drawHbase") // @ResponseBody // public String drawHbase(HttpServletResponse response, @RequestBody JSONObject transData) { // String returnStr = ""; // try { // KettleClientEnvironment.init(); // // TransMeta transMeta = new TransMeta(); // transMeta.setName("抽取Hbase数据"); // //导出数据的数据库连接 // DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(), // dataSourceConn.getUsername(), dataSourceConn.getPassword()); // transMeta.addDatabase(dataBaseOutput); // //HBaseInputMeta hBaseInputMeta= new HBaseInputMeta() // // MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta(); // mongoDbInputMeta.setHostnames(transData.getString("hostName")); // mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName")); // mongoDbInputMeta.setPort(transData.getString("portNum")); // mongoDbInputMeta.setAuthenticationUser(transData.getString("userName")); // mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd")); // mongoDbInputMeta.setDbName(transData.getString("dbName")); // mongoDbInputMeta.setCollection(transData.getString("collection")); // //得到抽取字段 // ArrayList fieldList = new ArrayList(transData.getString("drawfield").split(",").length); // Collections.addAll(fieldList, transData.getString("drawfield").split(",")); // //设置不取JSON字段 // mongoDbInputMeta.setOutputJson(false); // //设置提取字段名称、path、type // List normalList = new ArrayList(); // for (String fieldStr : fieldList) { // MongoField newField = new MongoField(); // newField.m_fieldName = fieldStr; // newField.m_fieldPath = "$." + fieldStr; // newField.m_kettleType = "String"; // normalList.add(newField); // } // //设置mongodb提取字段 // mongoDbInputMeta.setMongoFields(normalList); // //设置mogodb步骤元 // StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta); // inputMongoDBStep.setLocation(50, 50); // inputMongoDBStep.setDraw(true); // //将mogodb步骤元加入转化中 // transMeta.addStep(inputMongoDBStep); // // //设置mysql元 // TableOutputMeta tableOutputMeta = new TableOutputMeta(); // //设置数据库元 // tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output")); // //mongodb中数据库表/集合,就是表名 // tableOutputMeta.setTableName(transData.getString("collection")); // //将mysql元加入步骤元 // StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta); // insertUpdateStep.setLocation(150, 50); // insertUpdateStep.setDraw(true); // // transMeta.addStep(insertUpdateStep); // // //增加个空元, // DummyTransMeta dummyMeta = new DummyTransMeta(); // // //将空元加入步骤元 // StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta); // dummyStep.setLocation(200, 50); // dummyStep.setDraw(true); // // transMeta.addStep(dummyStep); // // //设置步骤直接的关系 // TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep); // transMeta.addTransHop(hop); // TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); // transMeta.addTransHop(hop2); // // //开始执行数据抽取 // //将转化元实例化转换 // Trans trans = new Trans(transMeta); // trans.prepareExecution(null); // // trans.startThreads(); // trans.waitUntilFinished(); // // if (trans.getErrors() > 0) { // System.out.println(">>>>>>>>>> ERROR"); // returnStr = "{result:\"fail\"}"; // } // else { // System.out.println(">>>>>>>>>> SUCCESS "); // returnStr = "{result:\"success\"}"; // } // } catch(Exception e) { // return "{result:\"fail\"}"; // } // return returnStr; // } /** * 测试数据库连接 * @param response * @param transData * @return * @throws Exception */ @RequestMapping("/connectionDB") @ResponseBody public String connectionDB(HttpServletResponse response, @RequestBody JSONObject transData, TUser users) throws Exception { //userService.addUser(users); KettleClientEnvironment.init(); DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"), transData.getString("userName"), transData.getString("pwd")); LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); Database ds = new Database(loggingObject, dataMeta); ds.normalConnect(null); Connection conn = ds.getConnection(); /* PreparedStatement stmt = conn.prepareStatement("select * from usr"); ResultSet rslt = stmt.executeQuery(); while (rslt.next()) { System.out.println(rslt.getString("username")); }*/ ds.disconnect(); return "{result:\"success\"}"; } /** * 创建SQL中tablename * 返回表名 */ public String createSQLTable(DruidPooledConnection connSys, ResultSet rslt) throws Exception { List columnNameAndType = new ArrayList(); ResultSetMetaData md = rslt.getMetaData(); for (int i = 1; i <= md.getColumnCount(); i++) { TableColumn tableColumn = new TableColumn(); tableColumn.setColumnName(md.getColumnName(i)); tableColumn.setColumnType(md.getColumnTypeName(i)); tableColumn.setColumnLength(String.valueOf(md.getColumnDisplaySize(i))); columnNameAndType.add(tableColumn); } //创建表 String SQLStr = "", tableName = ""; if (!columnNameAndType.isEmpty()) { tableName = "SQL_" + sdf.format(new Date()); SQLStr = splicingSQL(tableName, columnNameAndType); PreparedStatement stmt = connSys.prepareStatement(SQLStr); stmt.execute(); } return tableName; } /** * 通过SQL提取数据 */ @RequestMapping("/sqlPick") @ResponseBody public String sqlPick(@RequestBody JSONObject dbData) { try { String tableName = "", sqlStr = ""; Database dsInput = null; Database dsOutput = null; TableInputMeta tableInputMeta = new TableInputMeta(); TableOutputMeta talbeOutputMeta = new TableOutputMeta(); TransMeta transMeta = new TransMeta(); KettleDBTrans kettleDBTrans = new KettleDBTrans(); kettleDBTrans.setDsInput(dsInput); kettleDBTrans.setDsOutput(dsOutput); kettleDBTrans.setTableInputMeta(tableInputMeta); kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta); kettleDBTrans.setTransMeta(transMeta); kettleDBConn(kettleDBTrans); //SQL提取数据 tableName = "SQL_" + sdf.format(new Date()); sqlStr = dbData.getString("sqlStr"); dataExtraction(tableName, sqlStr, kettleDBTrans); kettleDBTrans.getDsInput().disconnect(); kettleDBTrans.getDsOutput().disconnect(); } catch(Exception e) { return "{result:\"fail\"}"; } return "{result:\"success\"}"; } /** * 抽取SQL数据 */ public void extractSQLData(DruidPooledConnection connSys, ResultSet rslt, String tableName) throws Exception { PreparedStatement stmtDes; int columnCount; String SQLStr = "", valueStr = ""; columnCount = rslt.getMetaData().getColumnCount(); SQLStr = " insert into "+tableName+" ( "; valueStr = " values ("; for(int i = 0; i < columnCount; i++) { SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", "; valueStr = valueStr + "?, "; } SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")"; valueStr = valueStr.substring(0, valueStr.length() - 2) + ")"; SQLStr = SQLStr + valueStr; stmtDes = connSys.prepareStatement(SQLStr); while (rslt.next()) { for(int i = 0; i < columnCount; i++) { stmtDes.setString(i + 1, rslt.getString(i+1)); } stmtDes.addBatch(); } stmtDes.executeBatch(); } /** * 测试SQL语句正确性 */ @RequestMapping("/sqlTest") @ResponseBody public String sqlTest(@RequestBody JSONObject dbData) { try { //连接数据库 String dbConn = dbData.getString("DBConn"); JSONObject DBParam = JSONArray.parseObject(dbConn); KettleClientEnvironment.init(); DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"), DBParam.getString("userName"), DBParam.getString("pwd")); LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); Database ds = new Database(loggingObject, dataMeta); ds.connect(); Connection conn = ds.getConnection(); String sqlStr = dbData.getString("sqlStr"); PreparedStatement stmt = conn.prepareStatement(sqlStr); stmt.executeQuery(); ds.disconnect(); } catch(Exception e) { return "{result:\"fail\"}"; } return "{result:\"success\"}"; } /** * 当选中数据表选项卡,显示数据表及视图名称 * @param response * @param transData * @return * @throws Exception */ @RequestMapping("/showTableAndView") @ResponseBody public String showTableAndView(HttpServletResponse response, @RequestBody JSONObject transData) throws Exception { KettleClientEnvironment.init(); DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"), transData.getString("userName"), transData.getString("pwd")); LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); Database ds = new Database(loggingObject, dataMeta); //ds.normalConnect(null); ds.connect(); Connection conn = ds.getConnection(); List UTable = new ArrayList(); List VTable = new ArrayList(); //得到数据库中的表 // PreparedStatement stmt = conn.prepareStatement("select name from sysobjects where xtype='U'"); // ResultSet rslt = stmt.executeQuery(); // while (rslt.next()) { // UTable.add(rslt.getString("name")); // } // String[] UTableArray = ds.getTablenames("dbo", true); // for (int i = 0; i < UTableArray.length; i++) { // UTable.add(UTableArray[i].replace("dbo.", "")); // } String[] UTableArray = ds.getTablenames(true); for (int i = 0; i < UTableArray.length; i++) { int lastHao = UTableArray[i].lastIndexOf("."); String uTable = UTableArray[i]; if (lastHao > 0) { UTable.add(uTable.substring(lastHao + 1, uTable.length())); } else { UTable.add(UTableArray[i]); } } String tableJson = JSON.toJSONString(UTable); //得到数据库中的视图 // stmt = conn.prepareStatement("select name from sysobjects where xtype='V'"); // rslt = stmt.executeQuery(); // while (rslt.next()) { // VTable.add(rslt.getString("name")); // } String[] VTableArray = ds.getViews("dbo", true); for (int i = 0; i < VTableArray.length; i++) { VTable.add(VTableArray[i].replace("dbo.", "")); } String viewJson = JSON.toJSONString(VTable); ds.disconnect(); return "{result:\"success\",table:["+tableJson+"],view:["+viewJson+"]}"; } /** * 使用kettle创建表 */ public void createTable(Database dsOutput, RowMetaInterface rm, String tableName) { try { System.out.println("开始创建表:" + tableName); String dbProName = dsOutput.getConnection().getMetaData().getDatabaseProductName(); System.out.println("数据库类型:" + dbProName); String sql = dsOutput.getDDL(tableName, rm); dsOutput.execStatement(sql.replace(";", "")); System.out.println("创建表成功"); } catch(Exception e) { System.out.println("创建表失败"); } } /** * 新版抽取数据 * dataExtraction(String tableName, String sqlStr, TransMeta transMeta, Database dsInput, Database dsOutput, TableInputMeta tableInputMeta, TableOutputMeta tableOutputMeta) throws Exception { tableInputMeta.setSQL(sqlStr); */ public void dataExtraction(String tableName, String sqlStr, KettleDBTrans kettleDBTrans) throws Exception { kettleDBTrans.getTableInputMeta().setSQL(sqlStr); StepMeta inputStep = new StepMeta(STEP_READ_FROM_TABLE, kettleDBTrans.getTableInputMeta()); inputStep.setLocation(50, 50); inputStep.setDraw(true); kettleDBTrans.getTransMeta().addStep(inputStep); //判断目标数据库中,是否存在表,如:存在重新命名表名,用新表名创建表,如:不存在,直接创建表 boolean hasTable = kettleDBTrans.getDsOutput().checkTableExists(tableName); if (hasTable) { tableName = tableName + "_" + sdf.format(new Date()); } //得到原始表结构 PreparedStatement stmt = null; RowMetaInterface rm = null; stmt = kettleDBTrans.getDsInput().getConnection().prepareStatement(sqlStr); stmt.executeQuery(); rm = kettleDBTrans.getDsInput().getQueryFields(sqlStr, false); //创建数据表 createTable(kettleDBTrans.getDsOutput(), rm, tableName); kettleDBTrans.getTalbeOutputMeta().setTableName(tableName); StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, kettleDBTrans.getTalbeOutputMeta()); insertUpdateStep.setLocation(150, 50); insertUpdateStep.setDraw(true); kettleDBTrans.getTransMeta().addStep(insertUpdateStep); DummyTransMeta dummyMeta = new DummyTransMeta(); StepMeta dummyStep = new StepMeta(STEP_DUMMY, dummyMeta); dummyStep.setLocation(200, 50); dummyStep.setDraw(true); kettleDBTrans.getTransMeta().addStep(dummyStep); //设置步骤直接的关系 TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep); kettleDBTrans.getTransMeta().addTransHop(hop); TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); kettleDBTrans.getTransMeta().addTransHop(hop2); //开始执行 Trans trans = new Trans(kettleDBTrans.getTransMeta()); trans.prepareExecution(null); trans.startThreads(); trans.waitUntilFinished(); if (trans.getErrors() > 0) { System.out.println(">>>>>>>>>> ERROR"); } else { System.out.println(">>>>>>>>>> SUCCESS "); } } /** * 创建集成方法 */ public void kettleDBConn(KettleDBTrans kettleDBTrans) throws Exception { //连接数据库 // String dbConn = kettleDBTrans.getAssemble().getString("DBConn");\ String dbConn= ""; JSONObject DBParam = JSONArray.parseObject(dbConn); //初始化kettle环境 KettleClientEnvironment.init(); KettleClientEnvironment.init(); //导入数据的数据库连接 DatabaseMeta dataMetaInput = new DatabaseMeta("Input", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"), DBParam.getString("userName"), DBParam.getString("pwd")); //导出数据的数据库连接 DatabaseMeta dataMetaOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(), dataSourceConn.getUsername(), dataSourceConn.getPassword()); //导出数据库连接 LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput)); kettleDBTrans.getDsInput().connect(); Connection connInput = kettleDBTrans.getDsInput().getConnection(); //导入数据库连接 kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput)); kettleDBTrans.getDsOutput().connect(); Connection connOutput = kettleDBTrans.getDsOutput().getConnection(); kettleDBTrans.getTransMeta().setName("数据抽取"); kettleDBTrans.getTransMeta().addDatabase(dataMetaInput); kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput); //导出数据 kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input")); //导入数据 kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output")); } /** * 1.得到要提取数据的表与视图名称 * 2.在程序数据库中创建要提取数据的表与视图的表的名称 * 3.将要提取表与视图中的数据存储到系统的实体表中 * 4.最后,返回结果 */ //提取表、视图中的数据 @RequestMapping("/extractDBData") @ResponseBody public String extractDBData(HttpServletResponse response, @RequestBody JSONObject dbData) throws Exception { //表 int k = 0; String sqlStr = "", tableName = ""; Database dsInput = null; Database dsOutput = null; TableInputMeta tableInputMeta = new TableInputMeta(); TableOutputMeta talbeOutputMeta = new TableOutputMeta(); TransMeta transMeta = new TransMeta(); KettleDBTrans kettleDBTrans = new KettleDBTrans(); // kettleDBTrans.setDbData(dbData); kettleDBTrans.setDsInput(dsInput); kettleDBTrans.setDsOutput(dsOutput); kettleDBTrans.setTableInputMeta(tableInputMeta); kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta); kettleDBTrans.setTransMeta(transMeta); kettleDBConn(kettleDBTrans); JSONArray uTable = dbData.getJSONArray("UTable"); //循环实体表 if (!uTable.isEmpty()) { for(k = 0; k < uTable.size(); k++) { tableName = uTable.getString(k); sqlStr = " select * from " + tableName; dataExtraction(tableName, sqlStr, kettleDBTrans); } } JSONArray vTable = dbData.getJSONArray("VTable"); //循环视图 if (!vTable.isEmpty()) { for(k = 0; k < vTable.size(); k++) { tableName = vTable.getString(k); sqlStr = " select * from " + tableName; dataExtraction(tableName, sqlStr, kettleDBTrans); } } //关闭数据库连接 kettleDBTrans.getDsInput().disconnect(); kettleDBTrans.getDsOutput().disconnect(); return "{result: \"success\"}"; } /** * 抽取数据 * extractData(connSys, realTableName, conn, tableName); * connSys 系统数据库 * realTableName 系统数据库表名 * conn 提取数据库 * tableName 提取数据库表名 */ public void extractData(DruidPooledConnection connSys, String tableNameDes, Connection conn, String tableNameSrc) throws Exception { PreparedStatement stmt, stmtDes; ResultSet rslt; int columnCount; String SQLStr = "", valueStr = ""; SQLStr = " select * from "+tableNameSrc+" "; stmt = conn.prepareStatement(SQLStr); rslt = stmt.executeQuery(); columnCount = rslt.getMetaData().getColumnCount(); SQLStr = " insert into "+tableNameDes+" ( "; valueStr = " values ("; for(int i = 0; i < columnCount; i++) { SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", "; valueStr = valueStr + "?, "; } SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")"; valueStr = valueStr.substring(0, valueStr.length() - 2) + ")"; SQLStr = SQLStr + valueStr; stmtDes = connSys.prepareStatement(SQLStr); while (rslt.next()) { for(int i = 0; i < columnCount; i++) { stmtDes.setString(i + 1, rslt.getString(i+1)); } stmtDes.addBatch(); } stmtDes.executeBatch(); } /** * 根据数据库类型转换对应类型 * */ public String getColumnStr(TableColumn tableColumn) throws Exception { StringBuffer sqlStr = new StringBuffer(); String returnStr = ""; if (tableColumn.getColumnType().contains("varchar")) { sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),"); } else if (tableColumn.getColumnType().contains("int")) { sqlStr.append(tableColumn.getColumnName() + " int(" + tableColumn.getColumnLength() + "),"); } else if (tableColumn.getColumnType().contains("date")) { sqlStr.append(tableColumn.getColumnName() + " date,"); } else if (tableColumn.getColumnType().contains("nvarchar")) { sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),"); } else if (tableColumn.getColumnType().contains("nchar")) { sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),"); } return sqlStr.toString(); } /** * 拼接创建新表的语句 * 1.系统数据库是mysql,所以,需要将其它数据库的类型,转换成mysql可以识别的类型 */ public String splicingSQL(String tableName, List columnNameAndType) throws Exception { String sqlStr = ""; //第一步,类型 识别类型 sqlStr = "create table " + tableName + " ("; StringBuffer columnStr = new StringBuffer(); columnStr.append(sqlStr); for (TableColumn tableColumn : columnNameAndType) { columnStr.append(getColumnStr(tableColumn)); } sqlStr = columnStr.toString(); return sqlStr.substring(0, sqlStr.length() - 1) + ")"; } /** * 创建表 * 1.需要判段,系统数据库中是否已存在表名(1.有的话重新命名,tableName + 年月日时分秒 */ public String createTable(DruidPooledConnection connSys, String tableName, List columnNameAndType) throws Exception { String SQLStr = ""; if (!columnNameAndType.isEmpty()) { PreparedStatement stmt; ResultSet rslt; SQLStr = " select count(table_name) as tableNum " + " from information_schema.tables " + " where table_schema='ssm' and table_name='"+tableName+"'"; stmt = connSys.prepareStatement(SQLStr); rslt = stmt.executeQuery(); rslt.next(); if (rslt.getInt("tableNum") > 0) { tableName = tableName + "_" + sdf.format(new Date()); } SQLStr = splicingSQL(tableName, columnNameAndType); stmt = connSys.prepareStatement(SQLStr); stmt.execute(); } return tableName; } //得到抽取数据库表的字段名称及字段类型 public List getColumnNameAndType(Connection conn, String tableName, String type) throws Exception { List columnNameAndType = new ArrayList(); //得到数据库中的表 String SQLStr = " select c.name As ColumnsName , t.name as ColumnsType, c.length as ColumnsLength " + " from SysObjects As o " + " left join SysColumns As c on o.id=c.id " + " left join SysTypes As t on c.xtype=t.xusertype " + " where o.type = '"+type+"' and o.name='"+tableName+"' "; PreparedStatement stmt = conn.prepareStatement(SQLStr); ResultSet rslt = stmt.executeQuery(); while (rslt.next()) { TableColumn tableColumn = new TableColumn(); tableColumn.setColumnName(rslt.getString("ColumnsName")); tableColumn.setColumnType(rslt.getString("ColumnsType")); tableColumn.setColumnLength(rslt.getString("ColumnsLength")); columnNameAndType.add(tableColumn); } return columnNameAndType; } /* protected void sendJsonData(HttpServletResponse response) throws Exception{ response.setContentType("text/javascript;charset=UTF-8"); PrintWriter out = response.getWriter(); out.println("alert(\"数据库连接成功!!!\")"); out.flush(); out.close(); }*/ }