kimi
2020-05-27 c007f0ca1785db093d48f4846cda82fe8e955765
src/main/java/com/highdatas/mdm/controller/SettleController.java
New file
@@ -0,0 +1,772 @@
package com.highdatas.mdm.controller;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.highdatas.mdm.entity.TUser;
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
import com.highdatas.mdm.pojo.kettle.KettleDBTrans;
import com.highdatas.mdm.pojo.kettle.TableColumn;
import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.logging.LoggingObjectInterface;
import org.pentaho.di.core.logging.LoggingObjectType;
import org.pentaho.di.core.logging.SimpleLoggingObject;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.dummytrans.DummyTransMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;
import org.pentaho.di.trans.steps.tableoutput.TableOutputMeta;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.servlet.http.HttpServletResponse;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Controller
@RequestMapping("/database")
//http://localhost:8080/users/addUser
public class SettleController {
   @Autowired
   private UnBigDataDataSourceInfo dataSourceConn;
   private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
   private String STEP_READ_FROM_TABLE = "Read data from table";
   private String STEP_INSERT_UPDATE = "Insert or update";
   private String STEP_DUMMY = "Dummy";
   /**
    * 提取mongodb数据
    * @param response
    * @param transData
    * @return
    * @throws
    */
//   @RequestMapping("/drawMongoDB")
//   @ResponseBody
//   public String drawMongoDB(HttpServletResponse response, @RequestBody JSONObject transData) {
//      String returnStr = "";
//      try {
//          KettleClientEnvironment.init();
//
//          TransMeta transMeta = new TransMeta();
//          transMeta.setName("抽取mongodb数据");
//          //导出数据的数据库连接
//          DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
//                                              dataSourceConn.getUsername(), dataSourceConn.getPassword());
//          transMeta.addDatabase(dataBaseOutput);
//
//             MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta();
//             mongoDbInputMeta.setHostnames(transData.getString("hostName"));
//             mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName"));
//             mongoDbInputMeta.setPort(transData.getString("portNum"));
//             mongoDbInputMeta.setAuthenticationUser(transData.getString("userName"));
//             mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd"));
//             mongoDbInputMeta.setDbName(transData.getString("dbName"));
//             mongoDbInputMeta.setCollection(transData.getString("collection"));
//             //得到抽取字段
//             ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length);
//             Collections.addAll(fieldList, transData.getString("drawfield").split(","));
//             //设置不取JSON字段
//             mongoDbInputMeta.setOutputJson(false);
//          //设置提取字段名称、path、type
//          List<MongoField> normalList = new ArrayList<MongoField>();
//          for (String fieldStr : fieldList) {
//              MongoField newField = new MongoField();
//              newField.m_fieldName = fieldStr;
//              newField.m_fieldPath = "$." + fieldStr;
//              newField.m_kettleType = "String";
//              normalList.add(newField);
//          }
//          //设置mongodb提取字段
//          mongoDbInputMeta.setMongoFields(normalList);
//          //设置mogodb步骤元
//          StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta);
//          inputMongoDBStep.setLocation(50, 50);
//          inputMongoDBStep.setDraw(true);
//          //将mogodb步骤元加入转化中
//          transMeta.addStep(inputMongoDBStep);
//
//          //设置mysql元
//          TableOutputMeta tableOutputMeta = new TableOutputMeta();
//          //设置数据库元
//          tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output"));
//          //mongodb中数据库表/集合,就是表名
//          tableOutputMeta.setTableName(transData.getString("collection"));
//          //将mysql元加入步骤元
//          StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta);
//          insertUpdateStep.setLocation(150, 50);
//          insertUpdateStep.setDraw(true);
//
//          transMeta.addStep(insertUpdateStep);
//
//          //增加个空元,
//          DummyTransMeta dummyMeta = new DummyTransMeta();
//
//          //将空元加入步骤元
//          StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta);
//          dummyStep.setLocation(200, 50);
//          dummyStep.setDraw(true);
//
//          transMeta.addStep(dummyStep);
//
//          //设置步骤直接的关系
//          TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep);
//          transMeta.addTransHop(hop);
//          TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
//          transMeta.addTransHop(hop2);
//
//          //开始执行数据抽取
//          //将转化元实例化转换
//          Trans trans = new Trans(transMeta);
//          trans.prepareExecution(null);
//
//          trans.startThreads();
//          trans.waitUntilFinished();
//
//          if (trans.getErrors() > 0) {
//             System.out.println(">>>>>>>>>> ERROR");
//             returnStr = "{result:\"fail\"}";
//          }
//          else {
//             System.out.println(">>>>>>>>>> SUCCESS ");
//             returnStr = "{result:\"success\"}";
//          }
//      } catch(Exception e) {
//          return "{result:\"fail\"}";
//      }
//      return returnStr;
//   }
//
//   /**
//    * 提取mongodb数据
//    * @param response
//    * @param transData
//    * @return
//    * @throws
//    */
//   @RequestMapping("/drawHbase")
//   @ResponseBody
//   public String drawHbase(HttpServletResponse response, @RequestBody JSONObject transData) {
//      String returnStr = "";
//      try {
//         KettleClientEnvironment.init();
//
//         TransMeta transMeta = new TransMeta();
//         transMeta.setName("抽取Hbase数据");
//         //导出数据的数据库连接
//         DatabaseMeta dataBaseOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
//               dataSourceConn.getUsername(), dataSourceConn.getPassword());
//         transMeta.addDatabase(dataBaseOutput);
//         //HBaseInputMeta hBaseInputMeta= new HBaseInputMeta()
//
//         MongoDbInputMeta mongoDbInputMeta = new MongoDbInputMeta();
//         mongoDbInputMeta.setHostnames(transData.getString("hostName"));
//         mongoDbInputMeta.setAuthenticationDatabaseName(transData.getString("authDBName"));
//         mongoDbInputMeta.setPort(transData.getString("portNum"));
//         mongoDbInputMeta.setAuthenticationUser(transData.getString("userName"));
//         mongoDbInputMeta.setAuthenticationPassword(transData.getString("pwd"));
//         mongoDbInputMeta.setDbName(transData.getString("dbName"));
//         mongoDbInputMeta.setCollection(transData.getString("collection"));
//         //得到抽取字段
//         ArrayList<String> fieldList = new ArrayList<String>(transData.getString("drawfield").split(",").length);
//         Collections.addAll(fieldList, transData.getString("drawfield").split(","));
//         //设置不取JSON字段
//         mongoDbInputMeta.setOutputJson(false);
//         //设置提取字段名称、path、type
//         List<MongoField> normalList = new ArrayList<MongoField>();
//         for (String fieldStr : fieldList) {
//            MongoField newField = new MongoField();
//            newField.m_fieldName = fieldStr;
//            newField.m_fieldPath = "$." + fieldStr;
//            newField.m_kettleType = "String";
//            normalList.add(newField);
//         }
//         //设置mongodb提取字段
//         mongoDbInputMeta.setMongoFields(normalList);
//         //设置mogodb步骤元
//         StepMeta inputMongoDBStep = new StepMeta(STEP_READ_FROM_TABLE, mongoDbInputMeta);
//         inputMongoDBStep.setLocation(50, 50);
//         inputMongoDBStep.setDraw(true);
//         //将mogodb步骤元加入转化中
//         transMeta.addStep(inputMongoDBStep);
//
//         //设置mysql元
//         TableOutputMeta tableOutputMeta = new TableOutputMeta();
//         //设置数据库元
//         tableOutputMeta.setDatabaseMeta(transMeta.findDatabase("Output"));
//         //mongodb中数据库表/集合,就是表名
//         tableOutputMeta.setTableName(transData.getString("collection"));
//         //将mysql元加入步骤元
//         StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, tableOutputMeta);
//         insertUpdateStep.setLocation(150, 50);
//         insertUpdateStep.setDraw(true);
//
//         transMeta.addStep(insertUpdateStep);
//
//         //增加个空元,
//         DummyTransMeta dummyMeta = new DummyTransMeta();
//
//         //将空元加入步骤元
//         StepMeta dummyStep = new StepMeta(STEP_DUMMY,dummyMeta);
//         dummyStep.setLocation(200, 50);
//         dummyStep.setDraw(true);
//
//         transMeta.addStep(dummyStep);
//
//         //设置步骤直接的关系
//         TransHopMeta hop = new TransHopMeta(inputMongoDBStep, insertUpdateStep);
//         transMeta.addTransHop(hop);
//         TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
//         transMeta.addTransHop(hop2);
//
//         //开始执行数据抽取
//         //将转化元实例化转换
//         Trans trans = new Trans(transMeta);
//         trans.prepareExecution(null);
//
//         trans.startThreads();
//         trans.waitUntilFinished();
//
//         if (trans.getErrors() > 0) {
//            System.out.println(">>>>>>>>>> ERROR");
//            returnStr = "{result:\"fail\"}";
//         }
//         else {
//            System.out.println(">>>>>>>>>> SUCCESS ");
//            returnStr = "{result:\"success\"}";
//         }
//      } catch(Exception e) {
//         return "{result:\"fail\"}";
//      }
//      return returnStr;
//   }
   /**
    * 测试数据库连接
    * @param response
    * @param transData
    * @return
    * @throws Exception
    */
   @RequestMapping("/connectionDB")
   @ResponseBody
   public String connectionDB(HttpServletResponse response, @RequestBody JSONObject transData, TUser users) throws Exception {
      //userService.addUser(users);
      KettleClientEnvironment.init();
      DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"),
                                     transData.getString("userName"), transData.getString("pwd"));
      LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
      Database ds = new Database(loggingObject, dataMeta);
      ds.normalConnect(null);
      Connection conn = ds.getConnection();
/*      PreparedStatement stmt = conn.prepareStatement("select * from usr");
      ResultSet rslt = stmt.executeQuery();
      while (rslt.next()) {
         System.out.println(rslt.getString("username"));
      }*/
      ds.disconnect();
      return "{result:\"success\"}";
   }
   /**
    * 创建SQL中tablename
    * 返回表名
    */
   public String createSQLTable(DruidPooledConnection connSys, ResultSet rslt) throws Exception {
      List<TableColumn> columnNameAndType = new ArrayList<TableColumn>();
      ResultSetMetaData md = rslt.getMetaData();
      for (int i = 1; i <= md.getColumnCount(); i++) {
           TableColumn tableColumn = new TableColumn();
           tableColumn.setColumnName(md.getColumnName(i));
           tableColumn.setColumnType(md.getColumnTypeName(i));
           tableColumn.setColumnLength(String.valueOf(md.getColumnDisplaySize(i)));
           columnNameAndType.add(tableColumn);
      }
       //创建表
      String SQLStr = "", tableName = "";
      if (!columnNameAndType.isEmpty()) {
         tableName = "SQL_" + sdf.format(new Date());
          SQLStr = splicingSQL(tableName, columnNameAndType);
          PreparedStatement stmt = connSys.prepareStatement(SQLStr);
          stmt.execute();
      }
      return tableName;
   }
   /**
    * 通过SQL提取数据
    */
   @RequestMapping("/sqlPick")
   @ResponseBody
   public String sqlPick(@RequestBody JSONObject dbData) {
      try {
          String tableName = "", sqlStr = "";
          Database dsInput = null;
          Database dsOutput = null;
          TableInputMeta tableInputMeta = new TableInputMeta();
          TableOutputMeta talbeOutputMeta = new TableOutputMeta();
          TransMeta transMeta = new TransMeta();
          KettleDBTrans kettleDBTrans = new KettleDBTrans();
          kettleDBTrans.setDsInput(dsInput);
          kettleDBTrans.setDsOutput(dsOutput);
          kettleDBTrans.setTableInputMeta(tableInputMeta);
          kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
          kettleDBTrans.setTransMeta(transMeta);
          kettleDBConn(kettleDBTrans);
          //SQL提取数据
          tableName = "SQL_" + sdf.format(new Date());
          sqlStr = dbData.getString("sqlStr");
          dataExtraction(tableName, sqlStr, kettleDBTrans);
          kettleDBTrans.getDsInput().disconnect();
          kettleDBTrans.getDsOutput().disconnect();
      } catch(Exception e) {
          return "{result:\"fail\"}";
      }
      return "{result:\"success\"}";
   }
   /**
    * 抽取SQL数据
    */
   public void  extractSQLData(DruidPooledConnection connSys, ResultSet rslt, String tableName) throws Exception {
      PreparedStatement stmtDes;
      int columnCount;
      String SQLStr = "", valueStr = "";
      columnCount = rslt.getMetaData().getColumnCount();
      SQLStr = " insert into "+tableName+" ( ";
      valueStr = " values (";
      for(int i = 0; i < columnCount; i++) {
         SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", ";
         valueStr = valueStr + "?, ";
      }
      SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")";
      valueStr = valueStr.substring(0, valueStr.length() - 2) + ")";
      SQLStr = SQLStr + valueStr;
      stmtDes = connSys.prepareStatement(SQLStr);
      while (rslt.next()) {
            for(int i = 0; i < columnCount; i++) {
               stmtDes.setString(i + 1, rslt.getString(i+1));
            }
            stmtDes.addBatch();
      }
      stmtDes.executeBatch();
   }
   /**
    * 测试SQL语句正确性
    */
   @RequestMapping("/sqlTest")
   @ResponseBody
   public String sqlTest(@RequestBody JSONObject dbData) {
      try {
          //连接数据库
          String dbConn = dbData.getString("DBConn");
          JSONObject DBParam = JSONArray.parseObject(dbConn);
          KettleClientEnvironment.init();
          DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"),
                                          DBParam.getString("userName"), DBParam.getString("pwd"));
          LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
          Database ds = new Database(loggingObject, dataMeta);
          ds.connect();
          Connection conn = ds.getConnection();
          String sqlStr = dbData.getString("sqlStr");
           PreparedStatement stmt = conn.prepareStatement(sqlStr);
          stmt.executeQuery();
          ds.disconnect();
      } catch(Exception e) {
          return "{result:\"fail\"}";
      }
      return "{result:\"success\"}";
   }
   /**
    * 当选中数据表选项卡,显示数据表及视图名称
    * @param response
    * @param transData
    * @return
    * @throws Exception
    */
   @RequestMapping("/showTableAndView")
   @ResponseBody
   public String showTableAndView(HttpServletResponse response, @RequestBody JSONObject transData) throws Exception {
      KettleClientEnvironment.init();
      DatabaseMeta dataMeta = new DatabaseMeta("KettleDBRep", transData.getString("type"), "Native", transData.getString("hostName"), transData.getString("dbName"), transData.getString("portNum"),
                                     transData.getString("userName"), transData.getString("pwd"));
      LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
      Database ds = new Database(loggingObject, dataMeta);
      //ds.normalConnect(null);
      ds.connect();
      Connection conn = ds.getConnection();
      List<String> UTable = new ArrayList<String>();
      List<String> VTable = new ArrayList<String>();
      //得到数据库中的表
//      PreparedStatement stmt = conn.prepareStatement("select name from sysobjects where xtype='U'");
//      ResultSet rslt = stmt.executeQuery();
//      while (rslt.next()) {
//            UTable.add(rslt.getString("name"));
//      }
//      String[] UTableArray = ds.getTablenames("dbo", true);
//      for (int i = 0; i < UTableArray.length; i++) {
//          UTable.add(UTableArray[i].replace("dbo.", ""));
//      }
      String[] UTableArray = ds.getTablenames(true);
      for (int i = 0; i < UTableArray.length; i++) {
          int lastHao = UTableArray[i].lastIndexOf(".");
          String uTable = UTableArray[i];
          if (lastHao > 0) {
             UTable.add(uTable.substring(lastHao + 1, uTable.length()));
          }
          else {
             UTable.add(UTableArray[i]);
          }
      }
      String tableJson = JSON.toJSONString(UTable);
      //得到数据库中的视图
//      stmt = conn.prepareStatement("select name from sysobjects where xtype='V'");
//      rslt = stmt.executeQuery();
//      while (rslt.next()) {
//            VTable.add(rslt.getString("name"));
//      }
      String[] VTableArray = ds.getViews("dbo", true);
      for (int i = 0; i < VTableArray.length; i++) {
          VTable.add(VTableArray[i].replace("dbo.", ""));
      }
      String viewJson = JSON.toJSONString(VTable);
      ds.disconnect();
      return "{result:\"success\",table:["+tableJson+"],view:["+viewJson+"]}";
   }
   /**
    * 使用kettle创建表
    */
   public void createTable(Database dsOutput, RowMetaInterface rm, String tableName) {
      try {
          System.out.println("开始创建表:" + tableName);
          String dbProName = dsOutput.getConnection().getMetaData().getDatabaseProductName();
          System.out.println("数据库类型:" + dbProName);
          String sql = dsOutput.getDDL(tableName, rm);
          dsOutput.execStatement(sql.replace(";", ""));
          System.out.println("创建表成功");
      } catch(Exception e) {
          System.out.println("创建表失败");
      }
   }
   /**
    * 新版抽取数据
    * dataExtraction(String tableName, String sqlStr, TransMeta transMeta, Database dsInput, Database dsOutput, TableInputMeta tableInputMeta, TableOutputMeta tableOutputMeta) throws Exception {
      tableInputMeta.setSQL(sqlStr);
    */
   public void dataExtraction(String tableName, String sqlStr, KettleDBTrans kettleDBTrans) throws Exception {
      kettleDBTrans.getTableInputMeta().setSQL(sqlStr);
      StepMeta inputStep = new StepMeta(STEP_READ_FROM_TABLE, kettleDBTrans.getTableInputMeta());
      inputStep.setLocation(50, 50);
      inputStep.setDraw(true);
      kettleDBTrans.getTransMeta().addStep(inputStep);
      //判断目标数据库中,是否存在表,如:存在重新命名表名,用新表名创建表,如:不存在,直接创建表
      boolean hasTable = kettleDBTrans.getDsOutput().checkTableExists(tableName);
      if (hasTable) {
         tableName = tableName + "_" + sdf.format(new Date());
      }
      //得到原始表结构
      PreparedStatement stmt = null;
      RowMetaInterface rm = null;
      stmt = kettleDBTrans.getDsInput().getConnection().prepareStatement(sqlStr);
      stmt.executeQuery();
      rm = kettleDBTrans.getDsInput().getQueryFields(sqlStr, false);
      //创建数据表
      createTable(kettleDBTrans.getDsOutput(), rm, tableName);
      kettleDBTrans.getTalbeOutputMeta().setTableName(tableName);
      StepMeta insertUpdateStep = new StepMeta(STEP_INSERT_UPDATE, kettleDBTrans.getTalbeOutputMeta());
      insertUpdateStep.setLocation(150, 50);
      insertUpdateStep.setDraw(true);
      kettleDBTrans.getTransMeta().addStep(insertUpdateStep);
      DummyTransMeta dummyMeta = new DummyTransMeta();
      StepMeta dummyStep = new StepMeta(STEP_DUMMY, dummyMeta);
      dummyStep.setLocation(200, 50);
      dummyStep.setDraw(true);
      kettleDBTrans.getTransMeta().addStep(dummyStep);
      //设置步骤直接的关系
      TransHopMeta hop = new TransHopMeta(inputStep, insertUpdateStep);
      kettleDBTrans.getTransMeta().addTransHop(hop);
      TransHopMeta hop2 = new TransHopMeta(insertUpdateStep, dummyStep);
      kettleDBTrans.getTransMeta().addTransHop(hop2);
      //开始执行
      Trans trans = new Trans(kettleDBTrans.getTransMeta());
      trans.prepareExecution(null);
      trans.startThreads();
      trans.waitUntilFinished();
      if (trans.getErrors() > 0) {
         System.out.println(">>>>>>>>>> ERROR");
      }
      else {
         System.out.println(">>>>>>>>>> SUCCESS ");
      }
   }
   /**
    * 创建集成方法
    */
   public void kettleDBConn(KettleDBTrans kettleDBTrans) throws Exception {
      //连接数据库
//      String dbConn = kettleDBTrans.getAssemble().getString("DBConn");\
      String dbConn= "";
      JSONObject DBParam = JSONArray.parseObject(dbConn);
      //初始化kettle环境
      KettleClientEnvironment.init();
      KettleClientEnvironment.init();
      //导入数据的数据库连接
      DatabaseMeta dataMetaInput = new DatabaseMeta("Input", DBParam.getString("type"), "Native", DBParam.getString("hostName"), DBParam.getString("dbName"), DBParam.getString("portNum"),
                                          DBParam.getString("userName"), DBParam.getString("pwd"));
      //导出数据的数据库连接
      DatabaseMeta dataMetaOutput = new DatabaseMeta("Output", dataSourceConn.getDbType(), "Native", dataSourceConn.getDbHostName(), dataSourceConn.getDbName(), dataSourceConn.getDbPort(),
                                          dataSourceConn.getUsername(), dataSourceConn.getPassword());
      //导出数据库连接
      LoggingObjectInterface loggingObject = new SimpleLoggingObject("Database factory", LoggingObjectType.GENERAL, null );
      kettleDBTrans.setDsInput(new Database(loggingObject, dataMetaInput));
      kettleDBTrans.getDsInput().connect();
      Connection connInput = kettleDBTrans.getDsInput().getConnection();
      //导入数据库连接
      kettleDBTrans.setDsOutput(new Database(loggingObject, dataMetaOutput));
      kettleDBTrans.getDsOutput().connect();
      Connection connOutput = kettleDBTrans.getDsOutput().getConnection();
      kettleDBTrans.getTransMeta().setName("数据抽取");
      kettleDBTrans.getTransMeta().addDatabase(dataMetaInput);
      kettleDBTrans.getTransMeta().addDatabase(dataMetaOutput);
      //导出数据
      kettleDBTrans.getTableInputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Input"));
      //导入数据
      kettleDBTrans.getTalbeOutputMeta().setDatabaseMeta(kettleDBTrans.getTransMeta().findDatabase("Output"));
   }
   /**
    * 1.得到要提取数据的表与视图名称
    * 2.在程序数据库中创建要提取数据的表与视图的表的名称
    * 3.将要提取表与视图中的数据存储到系统的实体表中
    * 4.最后,返回结果
    */
   //提取表、视图中的数据
   @RequestMapping("/extractDBData")
   @ResponseBody
   public String extractDBData(HttpServletResponse response, @RequestBody JSONObject dbData) throws Exception {
      //表
      int k = 0;
      String sqlStr = "", tableName = "";
      Database dsInput = null;
      Database dsOutput = null;
      TableInputMeta tableInputMeta = new TableInputMeta();
      TableOutputMeta talbeOutputMeta = new TableOutputMeta();
      TransMeta transMeta = new TransMeta();
      KettleDBTrans kettleDBTrans   = new KettleDBTrans();
//      kettleDBTrans.setDbData(dbData);
      kettleDBTrans.setDsInput(dsInput);
      kettleDBTrans.setDsOutput(dsOutput);
      kettleDBTrans.setTableInputMeta(tableInputMeta);
      kettleDBTrans.setTalbeOutputMeta(talbeOutputMeta);
      kettleDBTrans.setTransMeta(transMeta);
      kettleDBConn(kettleDBTrans);
      JSONArray uTable = dbData.getJSONArray("UTable");
      //循环实体表
      if (!uTable.isEmpty()) {
         for(k = 0; k < uTable.size(); k++) {
            tableName = uTable.getString(k);
            sqlStr = " select * from " + tableName;
            dataExtraction(tableName, sqlStr, kettleDBTrans);
         }
      }
      JSONArray vTable = dbData.getJSONArray("VTable");
      //循环视图
      if (!vTable.isEmpty()) {
         for(k = 0; k < vTable.size(); k++) {
            tableName = vTable.getString(k);
            sqlStr = " select * from " + tableName;
            dataExtraction(tableName, sqlStr, kettleDBTrans);
         }
      }
      //关闭数据库连接
      kettleDBTrans.getDsInput().disconnect();
      kettleDBTrans.getDsOutput().disconnect();
      return "{result: \"success\"}";
   }
   /**
    *    抽取数据
    * extractData(connSys, realTableName, conn, tableName);
    * connSys 系统数据库
    * realTableName 系统数据库表名
    * conn 提取数据库
    * tableName 提取数据库表名
    */
   public void extractData(DruidPooledConnection connSys, String tableNameDes, Connection conn, String tableNameSrc) throws Exception {
      PreparedStatement stmt, stmtDes;
      ResultSet rslt;
      int columnCount;
      String SQLStr = "", valueStr = "";
      SQLStr = " select * from "+tableNameSrc+" ";
      stmt = conn.prepareStatement(SQLStr);
      rslt = stmt.executeQuery();
      columnCount = rslt.getMetaData().getColumnCount();
      SQLStr = " insert into "+tableNameDes+" ( ";
      valueStr = " values (";
      for(int i = 0; i < columnCount; i++) {
         SQLStr = SQLStr + rslt.getMetaData().getColumnName(i+1) + ", ";
         valueStr = valueStr + "?, ";
      }
      SQLStr = SQLStr.substring(0, SQLStr.length() - 2) + ")";
      valueStr = valueStr.substring(0, valueStr.length() - 2) + ")";
      SQLStr = SQLStr + valueStr;
      stmtDes = connSys.prepareStatement(SQLStr);
      while (rslt.next()) {
            for(int i = 0; i < columnCount; i++) {
               stmtDes.setString(i + 1, rslt.getString(i+1));
            }
            stmtDes.addBatch();
      }
      stmtDes.executeBatch();
   }
   /**
    * 根据数据库类型转换对应类型
    *
    */
   public String getColumnStr(TableColumn tableColumn) throws Exception {
      StringBuffer sqlStr = new StringBuffer();
      String returnStr = "";
      if (tableColumn.getColumnType().contains("varchar")) {
         sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
      }
      else if (tableColumn.getColumnType().contains("int")) {
         sqlStr.append(tableColumn.getColumnName() + " int(" + tableColumn.getColumnLength() + "),");
      }
      else if (tableColumn.getColumnType().contains("date")) {
         sqlStr.append(tableColumn.getColumnName() + " date,");
      }
      else if (tableColumn.getColumnType().contains("nvarchar")) {
         sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
      }
      else if (tableColumn.getColumnType().contains("nchar")) {
         sqlStr.append(tableColumn.getColumnName() + " varchar(" + tableColumn.getColumnLength() + "),");
      }
      return sqlStr.toString();
   }
   /**
    * 拼接创建新表的语句
    * 1.系统数据库是mysql,所以,需要将其它数据库的类型,转换成mysql可以识别的类型
    */
   public String splicingSQL(String tableName,  List<TableColumn> columnNameAndType) throws Exception {
      String sqlStr = "";
      //第一步,类型 识别类型
      sqlStr = "create table " + tableName + " (";
      StringBuffer columnStr = new StringBuffer();
      columnStr.append(sqlStr);
      for (TableColumn tableColumn : columnNameAndType) {
          columnStr.append(getColumnStr(tableColumn));
      }
      sqlStr = columnStr.toString();
      return sqlStr.substring(0, sqlStr.length() - 1) + ")";
   }
   /**
    * 创建表
    * 1.需要判段,系统数据库中是否已存在表名(1.有的话重新命名,tableName + 年月日时分秒
    */
   public String createTable(DruidPooledConnection connSys, String tableName, List<TableColumn> columnNameAndType) throws Exception {
      String SQLStr = "";
      if (!columnNameAndType.isEmpty()) {
         PreparedStatement stmt;
         ResultSet rslt;
         SQLStr = " select count(table_name) as tableNum "
               + " from information_schema.tables "
               + " where table_schema='ssm' and table_name='"+tableName+"'";
         stmt = connSys.prepareStatement(SQLStr);
         rslt = stmt.executeQuery();
         rslt.next();
         if (rslt.getInt("tableNum") > 0) {
            tableName = tableName + "_" + sdf.format(new Date());
         }
          SQLStr = splicingSQL(tableName, columnNameAndType);
          stmt = connSys.prepareStatement(SQLStr);
          stmt.execute();
      }
      return tableName;
   }
   //得到抽取数据库表的字段名称及字段类型
   public List<TableColumn> getColumnNameAndType(Connection conn, String tableName, String type) throws Exception {
      List<TableColumn> columnNameAndType = new ArrayList<TableColumn>();
       //得到数据库中的表
       String SQLStr = " select c.name As ColumnsName , t.name as ColumnsType, c.length as ColumnsLength "
                    + " from SysObjects As o "
                    + " left join SysColumns As c on o.id=c.id "
                    + " left join SysTypes As t on c.xtype=t.xusertype "
                    + " where o.type = '"+type+"' and o.name='"+tableName+"' ";
       PreparedStatement stmt = conn.prepareStatement(SQLStr);
       ResultSet rslt = stmt.executeQuery();
       while (rslt.next()) {
             TableColumn tableColumn = new TableColumn();
             tableColumn.setColumnName(rslt.getString("ColumnsName"));
             tableColumn.setColumnType(rslt.getString("ColumnsType"));
             tableColumn.setColumnLength(rslt.getString("ColumnsLength"));
             columnNameAndType.add(tableColumn);
       }
       return columnNameAndType;
   }
/*   protected void sendJsonData(HttpServletResponse response) throws Exception{
       response.setContentType("text/javascript;charset=UTF-8");
       PrintWriter out = response.getWriter();
       out.println("alert(\"数据库连接成功!!!\")");
       out.flush();
       out.close();
   }*/
}