From c007f0ca1785db093d48f4846cda82fe8e955765 Mon Sep 17 00:00:00 2001 From: kimi <kimi42345@gmail.com> Date: 星期三, 27 五月 2020 09:59:29 +0800 Subject: [PATCH] merage --- src/main/java/com/highdatas/mdm/controller/SettleController.java | 772 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 772 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/highdatas/mdm/controller/SettleController.java b/src/main/java/com/highdatas/mdm/controller/SettleController.java new file mode 100644 index 0000000..e1cdf60 --- /dev/null +++ b/src/main/java/com/highdatas/mdm/controller/SettleController.java @@ -0,0 +1,772 @@ +package com.highdatas.mdm.controller; + +import com.alibaba.druid.pool.DruidPooledConnection; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; + +import com.highdatas.mdm.entity.TUser; +import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo; +import com.highdatas.mdm.pojo.kettle.KettleDBTrans; +import com.highdatas.mdm.pojo.kettle.TableColumn; +import org.pentaho.di.core.KettleClientEnvironment; +import org.pentaho.di.core.database.Database; +import org.pentaho.di.core.database.DatabaseMeta; +import org.pentaho.di.core.logging.LoggingObjectInterface; +import org.pentaho.di.core.logging.LoggingObjectType; +import org.pentaho.di.core.logging.SimpleLoggingObject; +import org.pentaho.di.core.row.RowMetaInterface; +import org.pentaho.di.trans.Trans; +import org.pentaho.di.trans.TransHopMeta; +import org.pentaho.di.trans.TransMeta; +import org.pentaho.di.trans.step.StepMeta; +import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta; + +import org.pentaho.di.trans.steps.tableinput.TableInputMeta; +import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; + +import javax.servlet.http.HttpServletResponse; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +@Controller +@RequestMapping("/database") +//http://localhost:8080/users/addUser +public class SettleController { + + + @Autowired + private UnBigDataDataSourceInfo dataSourceConn; + + private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + private String STEP_READ_FROM_TABLE = "Read data from table"; + private String STEP_INSERT_UPDATE = "Insert or update"; + private String STEP_DUMMY = "Dummy"; + + + /** + * 鎻愬彇mongodb鏁版嵁 + * @param response + * @param transData + * @return + * @throws + */ +// @RequestMapping("/drawMongoDB") +// @ResponseBody +// public String drawMongoDB(HttpServletResponse response, @RequestBody JSONObject transData) { +// String returnStr = ""; +// try { +// KettleClientEnvironment.init(); +// +// TransMeta transMeta = new TransMeta(); +// transMeta.setName("鎶藉彇mongodb鏁版嵁"); +// //瀵煎嚭鏁版嵁鐨勬暟鎹簱杩炴帴 +// DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(), +// dataSourceConn.getUsername(), dataSourceConn.getPassword()); +// transMeta.addDatabase(dataBaseOutput); +// +// MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta(); +// mongoDbInputMeta.setHostnames(transData.getString("hostName")); +// mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName")); +// mongoDbInputMeta.setPort(transData.getString("portNum")); +// mongoDbInputMeta.setAuthenticationUser(transData.getString("userName")); +// mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd")); +// mongoDbInputMeta.setDbName(transData.getString("dbName")); +// mongoDbInputMeta.setCollection(transData.getString("collection")); +// //寰楀埌鎶藉彇瀛楁 +// ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length); +// Collections.addAll(fieldList, transData.getString("drawfield").split(",")); +// //璁剧疆涓嶅彇JSON瀛楁 +// mongoDbInputMeta.setOutputJson(false); +// //璁剧疆鎻愬彇瀛楁鍚嶇О銆乸ath銆乼ype +// List<MongoField> normalList = new ArrayList<MongoField>(); +// for (String fieldStr : fieldList) { +// MongoField newField = new MongoField(); +// newField.m_fieldName = fieldStr; +// newField.m_fieldPath = "$." + fieldStr; +// newField.m_kettleType = "String"; +// normalList.add(newField); +// } +// //璁剧疆mongodb鎻愬彇瀛楁 +// mongoDbInputMeta.setMongoFields(normalList); +// //璁剧疆mogodb姝ラ鍏� +// StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta); +// inputMongoDBStep.setLocation(50, 50); +// inputMongoDBStep.setDraw(true); +// //灏唌ogodb姝ラ鍏冨姞鍏ヨ浆鍖栦腑 +// transMeta.addStep(inputMongoDBStep); +// +// //璁剧疆mysql鍏� +// TableOutputMeta tableOutputMeta = new TableOutputMeta(); +// //璁剧疆鏁版嵁搴撳厓 +// tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output")); +// //mongodb涓暟鎹簱琛�/闆嗗悎,灏辨槸琛ㄥ悕 +// tableOutputMeta.setTableName(transData.getString("collection")); +// //灏唌ysql鍏冨姞鍏ユ楠ゅ厓 +// StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta); +// insertUpdateStep.setLocation(150, 50); +// insertUpdateStep.setDraw(true); +// +// transMeta.addStep(insertUpdateStep); +// +// //澧炲姞涓┖鍏冿紝 +// DummyTransMeta dummyMeta = new DummyTransMeta(); +// +// //灏嗙┖鍏冨姞鍏ユ楠ゅ厓 +// StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta); +// dummyStep.setLocation(200, 50); +// dummyStep.setDraw(true); +// +// transMeta.addStep(dummyStep); +// +// //璁剧疆姝ラ鐩存帴鐨勫叧绯� +// TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep); +// transMeta.addTransHop(hop); +// TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); +// transMeta.addTransHop(hop2); +// +// //寮�濮嬫墽琛屾暟鎹娊鍙� +// //灏嗚浆鍖栧厓瀹炰緥鍖栬浆鎹� +// Trans trans = new Trans(transMeta); +// trans.prepareExecution(null); +// +// trans.startThreads(); +// trans.waitUntilFinished(); +// +// if (trans.getErrors() > 0) { +// System.out.println(">>>>>>>>>> ERROR"); +// returnStr = "{result:\"fail\"}"; +// } +// else { +// System.out.println(">>>>>>>>>> SUCCESS "); +// returnStr = "{result:\"success\"}"; +// } +// } catch(Exception e) { +// return "{result:\"fail\"}"; +// } +// return returnStr; +// } +// +// /** +// * 鎻愬彇mongodb鏁版嵁 +// * @param response +// * @param transData +// * @return +// * @throws +// */ +// @RequestMapping("/drawHbase") +// @ResponseBody +// public String drawHbase(HttpServletResponse response, @RequestBody JSONObject transData) { +// String returnStr = ""; +// try { +// KettleClientEnvironment.init(); +// +// TransMeta transMeta = new TransMeta(); +// transMeta.setName("鎶藉彇Hbase鏁版嵁"); +// //瀵煎嚭鏁版嵁鐨勬暟鎹簱杩炴帴 +// DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(), +// dataSourceConn.getUsername(), dataSourceConn.getPassword()); +// transMeta.addDatabase(dataBaseOutput); +// //HBaseInputMeta hBaseInputMeta= new HBaseInputMeta() +// +// MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta(); +// mongoDbInputMeta.setHostnames(transData.getString("hostName")); +// mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName")); +// mongoDbInputMeta.setPort(transData.getString("portNum")); +// mongoDbInputMeta.setAuthenticationUser(transData.getString("userName")); +// mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd")); +// mongoDbInputMeta.setDbName(transData.getString("dbName")); +// mongoDbInputMeta.setCollection(transData.getString("collection")); +// //寰楀埌鎶藉彇瀛楁 +// ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length); +// Collections.addAll(fieldList, transData.getString("drawfield").split(",")); +// //璁剧疆涓嶅彇JSON瀛楁 +// mongoDbInputMeta.setOutputJson(false); +// //璁剧疆鎻愬彇瀛楁鍚嶇О銆乸ath銆乼ype +// List<MongoField> normalList = new ArrayList<MongoField>(); +// for (String fieldStr : fieldList) { +// MongoField newField = new MongoField(); +// newField.m_fieldName = fieldStr; +// newField.m_fieldPath = "$." + fieldStr; +// newField.m_kettleType = "String"; +// normalList.add(newField); +// } +// //璁剧疆mongodb鎻愬彇瀛楁 +// mongoDbInputMeta.setMongoFields(normalList); +// //璁剧疆mogodb姝ラ鍏� +// StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta); +// inputMongoDBStep.setLocation(50, 50); +// inputMongoDBStep.setDraw(true); +// //灏唌ogodb姝ラ鍏冨姞鍏ヨ浆鍖栦腑 +// transMeta.addStep(inputMongoDBStep); +// +// //璁剧疆mysql鍏� +// TableOutputMeta tableOutputMeta = new TableOutputMeta(); +// //璁剧疆鏁版嵁搴撳厓 +// tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output")); +// //mongodb涓暟鎹簱琛�/闆嗗悎,灏辨槸琛ㄥ悕 +// tableOutputMeta.setTableName(transData.getString("collection")); +// //灏唌ysql鍏冨姞鍏ユ楠ゅ厓 +// StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta); +// insertUpdateStep.setLocation(150, 50); +// insertUpdateStep.setDraw(true); +// +// transMeta.addStep(insertUpdateStep); +// +// //澧炲姞涓┖鍏冿紝 +// DummyTransMeta dummyMeta = new DummyTransMeta(); +// +// //灏嗙┖鍏冨姞鍏ユ楠ゅ厓 +// StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta); +// dummyStep.setLocation(200, 50); +// dummyStep.setDraw(true); +// +// transMeta.addStep(dummyStep); +// +// //璁剧疆姝ラ鐩存帴鐨勫叧绯� +// TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep); +// transMeta.addTransHop(hop); +// TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); +// transMeta.addTransHop(hop2); +// +// //寮�濮嬫墽琛屾暟鎹娊鍙� +// //灏嗚浆鍖栧厓瀹炰緥鍖栬浆鎹� +// Trans trans = new Trans(transMeta); +// trans.prepareExecution(null); +// +// trans.startThreads(); +// trans.waitUntilFinished(); +// +// if (trans.getErrors() > 0) { +// System.out.println(">>>>>>>>>> ERROR"); +// returnStr = "{result:\"fail\"}"; +// } +// else { +// System.out.println(">>>>>>>>>> SUCCESS "); +// returnStr = "{result:\"success\"}"; +// } +// } catch(Exception e) { +// return "{result:\"fail\"}"; +// } +// return returnStr; +// } + + /** + * 娴嬭瘯鏁版嵁搴撹繛鎺� + * @param response + * @param transData + * @return + * @throws Exception + */ + @RequestMapping("/connectionDB") + @ResponseBody + public String connectionDB(HttpServletResponse response, @RequestBody JSONObject transData, TUser users) throws Exception { + //userService.addUser(users); + KettleClientEnvironment.init(); + DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"), + transData.getString("userName"), transData.getString("pwd")); + LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); + Database ds = new Database(loggingObject, dataMeta); + ds.normalConnect(null); + Connection conn = ds.getConnection(); +/* PreparedStatement stmt = conn.prepareStatement("select * from usr"); + ResultSet rslt = stmt.executeQuery(); + while (rslt.next()) { + System.out.println(rslt.getString("username")); + }*/ + ds.disconnect(); + return "{result:\"success\"}"; + } + + /** + * 鍒涘缓SQL涓璽ablename + * 杩斿洖琛ㄥ悕 + */ + public String createSQLTable(DruidPooledConnection connSys, ResultSet rslt) throws Exception { + List<TableColumn> columnNameAndType = new ArrayList<TableColumn>(); + ResultSetMetaData md = rslt.getMetaData(); + for (int i = 1; i <= md.getColumnCount(); i++) { + TableColumn tableColumn = new TableColumn(); + tableColumn.setColumnName(md.getColumnName(i)); + tableColumn.setColumnType(md.getColumnTypeName(i)); + tableColumn.setColumnLength(String.valueOf(md.getColumnDisplaySize(i))); + columnNameAndType.add(tableColumn); + } + //鍒涘缓琛� + String SQLStr = "", tableName = ""; + if (!columnNameAndType.isEmpty()) { + tableName = "SQL_" + sdf.format(new Date()); + SQLStr = splicingSQL(tableName, columnNameAndType); + PreparedStatement stmt = connSys.prepareStatement(SQLStr); + stmt.execute(); + } + return tableName; + } + + /** + * 閫氳繃SQL鎻愬彇鏁版嵁 + */ + @RequestMapping("/sqlPick") + @ResponseBody + public String sqlPick(@RequestBody JSONObject dbData) { + try { + String tableName = "", sqlStr = ""; + + Database dsInput = null; + Database dsOutput = null; + TableInputMeta tableInputMeta = new TableInputMeta(); + TableOutputMeta talbeOutputMeta = new TableOutputMeta(); + TransMeta transMeta = new TransMeta(); + KettleDBTrans kettleDBTrans = new KettleDBTrans(); + kettleDBTrans.setDsInput(dsInput); + kettleDBTrans.setDsOutput(dsOutput); + kettleDBTrans.setTableInputMeta(tableInputMeta); + kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta); + kettleDBTrans.setTransMeta(transMeta); + + kettleDBConn(kettleDBTrans); + + //SQL鎻愬彇鏁版嵁 + tableName = "SQL_" + sdf.format(new Date()); + sqlStr = dbData.getString("sqlStr"); + dataExtraction(tableName, sqlStr, kettleDBTrans); + kettleDBTrans.getDsInput().disconnect(); + kettleDBTrans.getDsOutput().disconnect(); + } catch(Exception e) { + return "{result:\"fail\"}"; + } + return "{result:\"success\"}"; + } + /** + * 鎶藉彇SQL鏁版嵁 + */ + public void extractSQLData(DruidPooledConnection connSys, ResultSet rslt, String tableName) throws Exception { + PreparedStatement stmtDes; + int columnCount; + String SQLStr = "", valueStr = ""; + columnCount = rslt.getMetaData().getColumnCount(); + SQLStr = " insert into "+tableName+" ( "; + valueStr = " values ("; + for(int i = 0; i < columnCount; i++) { + SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", "; + valueStr = valueStr + "?, "; + } + SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")"; + valueStr = valueStr.substring(0, valueStr.length() - 2) + ")"; + SQLStr = SQLStr + valueStr; + stmtDes = connSys.prepareStatement(SQLStr); + while (rslt.next()) { + for(int i = 0; i < columnCount; i++) { + stmtDes.setString(i + 1, rslt.getString(i+1)); + } + stmtDes.addBatch(); + } + stmtDes.executeBatch(); + } + /** + * 娴嬭瘯SQL璇彞姝g‘鎬� + */ + @RequestMapping("/sqlTest") + @ResponseBody + public String sqlTest(@RequestBody JSONObject dbData) { + + try { + //杩炴帴鏁版嵁搴� + String dbConn = dbData.getString("DBConn"); + JSONObject DBParam = JSONArray.parseObject(dbConn); + + KettleClientEnvironment.init(); + DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"), + DBParam.getString("userName"), DBParam.getString("pwd")); + LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); + Database ds = new Database(loggingObject, dataMeta); + ds.connect(); + Connection conn = ds.getConnection(); + String sqlStr = dbData.getString("sqlStr"); + PreparedStatement stmt = conn.prepareStatement(sqlStr); + stmt.executeQuery(); + ds.disconnect(); + } catch(Exception e) { + return "{result:\"fail\"}"; + } + return "{result:\"success\"}"; + } + /** + * 褰撻�変腑鏁版嵁琛ㄩ�夐」鍗★紝鏄剧ず鏁版嵁琛ㄥ強瑙嗗浘鍚嶇О + * @param response + * @param transData + * @return + * @throws Exception + */ + @RequestMapping("/showTableAndView") + @ResponseBody + public String showTableAndView(HttpServletResponse response, @RequestBody JSONObject transData) throws Exception { + KettleClientEnvironment.init(); + DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"), + transData.getString("userName"), transData.getString("pwd")); + + LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); + Database ds = new Database(loggingObject, dataMeta); + //ds.normalConnect(null); + ds.connect(); + Connection conn = ds.getConnection(); + List<String> UTable = new ArrayList<String>(); + List<String> VTable = new ArrayList<String>(); + //寰楀埌鏁版嵁搴撲腑鐨勮〃 +// PreparedStatement stmt = conn.prepareStatement("select name from sysobjects where xtype='U'"); +// ResultSet rslt = stmt.executeQuery(); +// while (rslt.next()) { +// UTable.add(rslt.getString("name")); +// } +// String[] UTableArray = ds.getTablenames("dbo", true); +// for (int i = 0; i < UTableArray.length; i++) { +// UTable.add(UTableArray[i].replace("dbo.", "")); +// } + String[] UTableArray = ds.getTablenames(true); + for (int i = 0; i < UTableArray.length; i++) { + int lastHao = UTableArray[i].lastIndexOf("."); + String uTable = UTableArray[i]; + if (lastHao > 0) { + UTable.add(uTable.substring(lastHao + 1, uTable.length())); + } + else { + UTable.add(UTableArray[i]); + } + + } + String tableJson = JSON.toJSONString(UTable); + //寰楀埌鏁版嵁搴撲腑鐨勮鍥� +// stmt = conn.prepareStatement("select name from sysobjects where xtype='V'"); +// rslt = stmt.executeQuery(); +// while (rslt.next()) { +// VTable.add(rslt.getString("name")); +// } + String[] VTableArray = ds.getViews("dbo", true); + for (int i = 0; i < VTableArray.length; i++) { + VTable.add(VTableArray[i].replace("dbo.", "")); + } + String viewJson = JSON.toJSONString(VTable); + ds.disconnect(); + return "{result:\"success\",table:["+tableJson+"],view:["+viewJson+"]}"; + } + /** + * 浣跨敤kettle鍒涘缓琛� + */ + public void createTable(Database dsOutput, RowMetaInterface rm, String tableName) { + try { + System.out.println("寮�濮嬪垱寤鸿〃:" + tableName); + String dbProName = dsOutput.getConnection().getMetaData().getDatabaseProductName(); + System.out.println("鏁版嵁搴撶被鍨�:" + dbProName); + String sql = dsOutput.getDDL(tableName, rm); + dsOutput.execStatement(sql.replace(";", "")); + System.out.println("鍒涘缓琛ㄦ垚鍔�"); + } catch(Exception e) { + System.out.println("鍒涘缓琛ㄥけ璐�"); + } + + } + /** + * 鏂扮増鎶藉彇鏁版嵁 + * dataExtraction(String tableName, String sqlStr, TransMeta transMeta, Database dsInput, Database dsOutput, TableInputMeta tableInputMeta, TableOutputMeta tableOutputMeta) throws Exception { + tableInputMeta.setSQL(sqlStr); + */ + public void dataExtraction(String tableName, String sqlStr, KettleDBTrans kettleDBTrans) throws Exception { + kettleDBTrans.getTableInputMeta().setSQL(sqlStr); + + StepMeta inputStep = new StepMeta(STEP_READ_FROM_TABLE, kettleDBTrans.getTableInputMeta()); + inputStep.setLocation(50, 50); + inputStep.setDraw(true); + kettleDBTrans.getTransMeta().addStep(inputStep); + + //鍒ゆ柇鐩爣鏁版嵁搴撲腑锛屾槸鍚﹀瓨鍦ㄨ〃锛屽:瀛樺湪閲嶆柊鍛藉悕琛ㄥ悕锛岀敤鏂拌〃鍚嶅垱寤鸿〃锛屽:涓嶅瓨鍦�,鐩存帴鍒涘缓琛� + boolean hasTable = kettleDBTrans.getDsOutput().checkTableExists(tableName); + if (hasTable) { + tableName = tableName + "_" + sdf.format(new Date()); + } + //寰楀埌鍘熷琛ㄧ粨鏋� + PreparedStatement stmt = null; + RowMetaInterface rm = null; + stmt = kettleDBTrans.getDsInput().getConnection().prepareStatement(sqlStr); + stmt.executeQuery(); + rm = kettleDBTrans.getDsInput().getQueryFields(sqlStr, false); + //鍒涘缓鏁版嵁琛� + createTable(kettleDBTrans.getDsOutput(), rm, tableName); + + kettleDBTrans.getTalbeOutputMeta().setTableName(tableName); + + StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, kettleDBTrans.getTalbeOutputMeta()); + insertUpdateStep.setLocation(150, 50); + insertUpdateStep.setDraw(true); + kettleDBTrans.getTransMeta().addStep(insertUpdateStep); + + DummyTransMeta dummyMeta = new DummyTransMeta(); + + StepMeta dummyStep = new StepMeta(STEP_DUMMY, dummyMeta); + dummyStep.setLocation(200, 50); + dummyStep.setDraw(true); + + kettleDBTrans.getTransMeta().addStep(dummyStep); + + //璁剧疆姝ラ鐩存帴鐨勫叧绯� + TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep); + kettleDBTrans.getTransMeta().addTransHop(hop); + TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep); + kettleDBTrans.getTransMeta().addTransHop(hop2); + + //寮�濮嬫墽琛� + Trans trans = new Trans(kettleDBTrans.getTransMeta()); + trans.prepareExecution(null); + + trans.startThreads(); + + trans.waitUntilFinished(); + + if (trans.getErrors() > 0) { + System.out.println(">>>>>>>>>> ERROR"); + } + else { + System.out.println(">>>>>>>>>> SUCCESS "); + } + } + /** + * 鍒涘缓闆嗘垚鏂规硶 + */ + public void kettleDBConn(KettleDBTrans kettleDBTrans) throws Exception { + //杩炴帴鏁版嵁搴� +// String dbConn = kettleDBTrans.getAssemble().getString("DBConn");\ + String dbConn= ""; + JSONObject DBParam = JSONArray.parseObject(dbConn); + //鍒濆鍖杒ettle鐜 + KettleClientEnvironment.init(); + KettleClientEnvironment.init(); + //瀵煎叆鏁版嵁鐨勬暟鎹簱杩炴帴 + DatabaseMeta dataMetaInput = new DatabaseMeta("Input", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"), + DBParam.getString("userName"), DBParam.getString("pwd")); + //瀵煎嚭鏁版嵁鐨勬暟鎹簱杩炴帴 + DatabaseMeta dataMetaOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(), + dataSourceConn.getUsername(), dataSourceConn.getPassword()); + //瀵煎嚭鏁版嵁搴撹繛鎺� + LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null ); + kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput)); + kettleDBTrans.getDsInput().connect(); + Connection connInput = kettleDBTrans.getDsInput().getConnection(); + //瀵煎叆鏁版嵁搴撹繛鎺� + kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput)); + kettleDBTrans.getDsOutput().connect(); + Connection connOutput = kettleDBTrans.getDsOutput().getConnection(); + + kettleDBTrans.getTransMeta().setName("鏁版嵁鎶藉彇"); + + kettleDBTrans.getTransMeta().addDatabase(dataMetaInput); + kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput); + + //瀵煎嚭鏁版嵁 + kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input")); + //瀵煎叆鏁版嵁 + kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output")); + } + /** + * 1.寰楀埌瑕佹彁鍙栨暟鎹殑琛ㄤ笌瑙嗗浘鍚嶇О + * 2.鍦ㄧ▼搴忔暟鎹簱涓垱寤鸿鎻愬彇鏁版嵁鐨勮〃涓庤鍥剧殑琛ㄧ殑鍚嶇О + * 3.灏嗚鎻愬彇琛ㄤ笌瑙嗗浘涓殑鏁版嵁瀛樺偍鍒扮郴缁熺殑瀹炰綋琛ㄤ腑 + * 4.鏈�鍚庯紝杩斿洖缁撴灉 + */ + //鎻愬彇琛ㄣ�佽鍥句腑鐨勬暟鎹� + @RequestMapping("/extractDBData") + @ResponseBody + public String extractDBData(HttpServletResponse response, @RequestBody JSONObject dbData) throws Exception { + //琛� + int k = 0; + String sqlStr = "", tableName = ""; + + Database dsInput = null; + Database dsOutput = null; + TableInputMeta tableInputMeta = new TableInputMeta(); + TableOutputMeta talbeOutputMeta = new TableOutputMeta(); + TransMeta transMeta = new TransMeta(); + KettleDBTrans kettleDBTrans = new KettleDBTrans(); +// kettleDBTrans.setDbData(dbData); + kettleDBTrans.setDsInput(dsInput); + kettleDBTrans.setDsOutput(dsOutput); + kettleDBTrans.setTableInputMeta(tableInputMeta); + kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta); + kettleDBTrans.setTransMeta(transMeta); + + kettleDBConn(kettleDBTrans); + + JSONArray uTable = dbData.getJSONArray("UTable"); + //寰幆瀹炰綋琛� + if (!uTable.isEmpty()) { + for(k = 0; k < uTable.size(); k++) { + tableName = uTable.getString(k); + sqlStr = " select * from " + tableName; + dataExtraction(tableName, sqlStr, kettleDBTrans); + } + } + + JSONArray vTable = dbData.getJSONArray("VTable"); + //寰幆瑙嗗浘 + if (!vTable.isEmpty()) { + for(k = 0; k < vTable.size(); k++) { + tableName = vTable.getString(k); + sqlStr = " select * from " + tableName; + dataExtraction(tableName, sqlStr, kettleDBTrans); + } + } + //鍏抽棴鏁版嵁搴撹繛鎺� + kettleDBTrans.getDsInput().disconnect(); + kettleDBTrans.getDsOutput().disconnect(); + return "{result: \"success\"}"; + } + + /** + * 鎶藉彇鏁版嵁 + * extractData(connSys, realTableName, conn, tableName); + * connSys 绯荤粺鏁版嵁搴� + * realTableName 绯荤粺鏁版嵁搴撹〃鍚� + * conn 鎻愬彇鏁版嵁搴� + * tableName 鎻愬彇鏁版嵁搴撹〃鍚� + */ + public void extractData(DruidPooledConnection connSys, String tableNameDes, Connection conn, String tableNameSrc) throws Exception { + PreparedStatement stmt, stmtDes; + ResultSet rslt; + int columnCount; + String SQLStr = "", valueStr = ""; + + SQLStr = " select * from "+tableNameSrc+" "; + stmt = conn.prepareStatement(SQLStr); + rslt = stmt.executeQuery(); + columnCount = rslt.getMetaData().getColumnCount(); + SQLStr = " insert into "+tableNameDes+" ( "; + valueStr = " values ("; + for(int i = 0; i < columnCount; i++) { + SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", "; + valueStr = valueStr + "?, "; + } + SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")"; + valueStr = valueStr.substring(0, valueStr.length() - 2) + ")"; + SQLStr = SQLStr + valueStr; + stmtDes = connSys.prepareStatement(SQLStr); + while (rslt.next()) { + for(int i = 0; i < columnCount; i++) { + stmtDes.setString(i + 1, rslt.getString(i+1)); + } + stmtDes.addBatch(); + } + stmtDes.executeBatch(); + } + + /** + * 鏍规嵁鏁版嵁搴撶被鍨嬭浆鎹㈠搴旂被鍨� + * + */ + public String getColumnStr(TableColumn tableColumn) throws Exception { + StringBuffer sqlStr = new StringBuffer(); + String returnStr = ""; + if (tableColumn.getColumnType().contains("varchar")) { + sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),"); + } + else if (tableColumn.getColumnType().contains("int")) { + sqlStr.append(tableColumn.getColumnName() + " int(" + tableColumn.getColumnLength() + "),"); + } + else if (tableColumn.getColumnType().contains("date")) { + sqlStr.append(tableColumn.getColumnName() + " date,"); + } + else if (tableColumn.getColumnType().contains("nvarchar")) { + sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),"); + } + else if (tableColumn.getColumnType().contains("nchar")) { + sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),"); + } + return sqlStr.toString(); + } + + /** + * 鎷兼帴鍒涘缓鏂拌〃鐨勮鍙� + * 1.绯荤粺鏁版嵁搴撴槸mysql,鎵�浠ワ紝闇�瑕佸皢鍏跺畠鏁版嵁搴撶殑绫诲瀷锛岃浆鎹㈡垚mysql鍙互璇嗗埆鐨勭被鍨� + */ + public String splicingSQL(String tableName, List<TableColumn> columnNameAndType) throws Exception { + String sqlStr = ""; + //绗竴姝�,绫诲瀷 璇嗗埆绫诲瀷 + sqlStr = "create table " + tableName + " ("; + StringBuffer columnStr = new StringBuffer(); + columnStr.append(sqlStr); + for (TableColumn tableColumn : columnNameAndType) { + columnStr.append(getColumnStr(tableColumn)); + } + sqlStr = columnStr.toString(); + return sqlStr.substring(0, sqlStr.length() - 1) + ")"; + } + /** + * 鍒涘缓琛� + * 1.闇�瑕佸垽娈�,绯荤粺鏁版嵁搴撲腑鏄惁宸插瓨鍦ㄨ〃鍚�(1.鏈夌殑璇濋噸鏂板懡鍚�,tableName + 骞存湀鏃ユ椂鍒嗙 + */ + public String createTable(DruidPooledConnection connSys, String tableName, List<TableColumn> columnNameAndType) throws Exception { + String SQLStr = ""; + if (!columnNameAndType.isEmpty()) { + PreparedStatement stmt; + ResultSet rslt; + + SQLStr = " select count(table_name) as tableNum " + + " from information_schema.tables " + + " where table_schema='ssm' and table_name='"+tableName+"'"; + stmt = connSys.prepareStatement(SQLStr); + rslt = stmt.executeQuery(); + rslt.next(); + if (rslt.getInt("tableNum") > 0) { + tableName = tableName + "_" + sdf.format(new Date()); + } + SQLStr = splicingSQL(tableName, columnNameAndType); + stmt = connSys.prepareStatement(SQLStr); + stmt.execute(); + } + return tableName; + } + + + //寰楀埌鎶藉彇鏁版嵁搴撹〃鐨勫瓧娈靛悕绉板強瀛楁绫诲瀷 + public List<TableColumn> getColumnNameAndType(Connection conn, String tableName, String type) throws Exception { + List<TableColumn> columnNameAndType = new ArrayList<TableColumn>(); + //寰楀埌鏁版嵁搴撲腑鐨勮〃 + String SQLStr = " select c.name As ColumnsName , t.name as ColumnsType, c.length as ColumnsLength " + + " from SysObjects As o " + + " left join SysColumns As c on o.id=c.id " + + " left join SysTypes As t on c.xtype=t.xusertype " + + " where o.type = '"+type+"' and o.name='"+tableName+"' "; + + PreparedStatement stmt = conn.prepareStatement(SQLStr); + ResultSet rslt = stmt.executeQuery(); + while (rslt.next()) { + TableColumn tableColumn = new TableColumn(); + tableColumn.setColumnName(rslt.getString("ColumnsName")); + tableColumn.setColumnType(rslt.getString("ColumnsType")); + tableColumn.setColumnLength(rslt.getString("ColumnsLength")); + columnNameAndType.add(tableColumn); + } + + return columnNameAndType; + } + + + +/* protected void sendJsonData(HttpServletResponse response) throws Exception{ + response.setContentType("text/javascript;charset=UTF-8"); + PrintWriter out = response.getWriter(); + out.println("alert(\"鏁版嵁搴撹繛鎺ユ垚鍔�!!!\")"); + out.flush(); + out.close(); + }*/ + +} -- Gitblit v1.8.0