package com.highdatas.mdm.controller;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.baomidou.mybatisplus.mapper.EntityWrapper;
|
import com.baomidou.mybatisplus.mapper.Wrapper;
|
import com.baomidou.mybatisplus.plugins.Page;
|
import com.highdatas.mdm.entity.*;
|
import com.highdatas.mdm.job.JobClient;
|
import com.highdatas.mdm.pojo.*;
|
import com.highdatas.mdm.pojo.kettle.BigDataDataSourceInfo;
|
import com.highdatas.mdm.pojo.kettle.DataSourceInfo;
|
import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo;
|
import com.highdatas.mdm.service.*;
|
import com.highdatas.mdm.util.Constant;
|
import com.highdatas.mdm.util.DbUtils;
|
import com.highdatas.mdm.util.RuleClient;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.scheduling.support.CronSequenceGenerator;
|
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.web.bind.annotation.*;
|
|
import javax.servlet.http.HttpServletRequest;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* @author kimi
|
* @description
|
* @date 2020-02-17 10:58
|
*/
|
|
@Slf4j
|
@RestController
|
@RequestMapping("/assemble")
|
public class AssembleController {
|
@Autowired
|
ISysAssembleService assembleService;
|
@Autowired
|
ISysAssembleDbService dbService;
|
@Autowired
|
ISysAssembleDbFieldService fieldService;
|
@Autowired
|
ISysAssembleDbTableService tableService;
|
@Autowired
|
ISysAssembleApiService apiService;
|
@Autowired
|
ISysAssembleCommonparamsService commonparamsService;
|
@Autowired
|
ISysAssembleParamsService paramsService;
|
@Autowired
|
ISysMenuService menuService;
|
@Autowired
|
IMenuMappingService menuMappingService;
|
@Autowired
|
BigDataDataSourceInfo bigDataDataSourceInfo;
|
@Autowired
|
UnBigDataDataSourceInfo unBigDataDataSourceInfo;
|
@Autowired
|
JobClient client;
|
@Autowired
|
RuleClient ruleClient;
|
|
/**
|
*
|
* @description: 立即执行汇集流程
|
* @param id 汇集流程的id
|
* @return: result 是否调起xxljob成功
|
*
|
*/
|
@RequestMapping(value = "run/{id}", method = RequestMethod.GET)
|
public Result trigger(@PathVariable String id, HttpServletRequest request) {
|
//通过id获取汇集信息
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//调起xxljob 执行一次 接口
|
boolean trigger = client.trigger(assemble);
|
if (trigger) {
|
return Result.success(null);
|
}else {
|
return Result.error(CodeMsg.OPERATR_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 修改xxljob jobGroup信息 手动获取ip信息
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "saveJobGroup", method = RequestMethod.GET)
|
public Result saveJobGroup(HttpServletRequest request) {
|
boolean b = client.saveJobGroup();
|
if (b) {
|
return Result.success(null);
|
}else {
|
return Result.error(CodeMsg.OPERATR_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 修改xxljob jobGroup信息 自动获取ip信息
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "saveAutoJobGroup", method = RequestMethod.GET)
|
public Result saveAutoJobGroup(HttpServletRequest request) {
|
boolean b = client.saveAutoJobGroup();
|
if (b) {
|
return Result.success(null);
|
}else {
|
return Result.error(CodeMsg.OPERATR_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 删除xxljob jobGroup信息
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "deleteJobGroup", method = RequestMethod.GET)
|
public Result deleteJobGroup(HttpServletRequest request) {
|
boolean b = client.saveJobGroup();
|
if (b) {
|
return Result.success(null);
|
}else {
|
return Result.error(CodeMsg.OPERATR_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 新建汇集记录
|
* @return: result 新建是否成功
|
*
|
*/
|
@RequestMapping(value = "new", method = RequestMethod.GET)
|
public Result newAssemble(HttpServletRequest request) {
|
SysAssemble assemble = new SysAssemble();
|
//获取用户信息
|
TUser user = DbUtils.getUser(request);
|
//保存初始汇集信息
|
assemble.setStatus(SysAssembleStatus.edit).setCreateTime(new Date()).setUserId(user.getUserId()).setUpdateType(SysAssembleUpdateType.All);
|
//保存进数据库
|
boolean insert = assemble.setId(DbUtils.getUUID()).setEmptyData(true).insert();
|
if (insert) {
|
String assembleId = assemble.getId();
|
// 创建job的时候将所有 通用参数搬一份过去 通用参数 见 sys_assemble_commonparams
|
List<SysAssembleCommonparams> sysAssembleCommonparams = commonparamsService.selectList(null);
|
for (SysAssembleCommonparams commonParam : sysAssembleCommonparams) {
|
SysAssembleParams sysAssembleParams = new SysAssembleParams();
|
sysAssembleParams
|
.setName(commonParam.getName())
|
.setCode(commonParam.getCode())
|
.setInitSql(commonParam.getInitSql()).setUpdateSql(commonParam.getUpdateSql())
|
.setCreateTime(new Date())
|
.setParentId(assembleId)
|
.setId(DbUtils.getUUID()).insert();
|
}
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.INSERT_ERROR);
|
}
|
}
|
|
|
/**
|
*
|
* @description: 保存汇集的基础信息
|
* @param menuId : 主题id
|
* @param id 汇集id
|
* @param bigData 保存数据库选择
|
* @param updateType 更新类型
|
* @param updateFields 唯一字段
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "/saveCommon/{id}", method = RequestMethod.GET)
|
public Result saveCommon(@PathVariable String id, @RequestParam String menuId, @RequestParam SysAssembleUpdateType updateType, @RequestParam Boolean bigData, @RequestParam String updateFields, HttpServletRequest request) {
|
//通过id获取汇集记录
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//保存请求过来的数据
|
assemble.setUpdateType(updateType).setUpdateFields(updateFields);
|
assemble.setEmptyData(false);
|
assemble.setBigdata(Boolean.valueOf(bigData));
|
//请求头里面获取用户信息
|
TUser user = DbUtils.getUser(request);
|
//更新数据库里的信息
|
boolean insert = assemble.setUserId(user.getUserId()).setMenuId(menuId).setUpdateTime(new Date()).updateById();
|
|
if (insert) {
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.INSERT_ERROR);
|
}
|
}
|
/**
|
*
|
* @description: 删除汇集任务
|
* @param id 汇集id
|
* @return: result 删除是否成功
|
*
|
*/
|
|
@Transactional(rollbackFor=Exception.class)
|
@RequestMapping(value = "delete/{id}", method = RequestMethod.GET)
|
public Result delete(@PathVariable String id) throws Exception {
|
//通过id获取汇集记录
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//获取汇集数据保存在哪个数据库
|
Boolean bigdata = assemble.getBigdata();
|
DataSourceInfo dataSourceInfo;
|
if (bigdata) {
|
dataSourceInfo = bigDataDataSourceInfo;
|
}else {
|
dataSourceInfo = unBigDataDataSourceInfo;
|
}
|
//循环删除 汇集数据源信息
|
List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id));
|
for (SysAssembleDb sysAssembleDb : dbList) {
|
String dbId = sysAssembleDb.getId();
|
//循环删除汇集数据源中选中的表信息
|
|
Wrapper<SysAssembleDbTable> eq = new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId);
|
List<SysAssembleDbTable> tableList = tableService.selectList(eq);
|
for (SysAssembleDbTable dbTable : tableList) {
|
String tableId = dbTable.getId();
|
//删除汇集汇集数据源中选中表的选中字段信息,外加删除数据库中已经创建的物理表
|
String tempTableName = dbTable.getTempTableName();
|
dataSourceInfo.dropData(tempTableName);
|
|
Wrapper<SysAssembleDbField> fieldWrapper = new EntityWrapper<SysAssembleDbField>().eq(Constant.PARENT_ID, tableId);
|
boolean delete = fieldService.delete(fieldWrapper);
|
if (!delete) {
|
return Result.error(CodeMsg.DELETE_ERROR);
|
}
|
dbTable.deleteById();
|
}
|
sysAssembleDb.deleteById();
|
}
|
//删除汇集从api中获取的数据信息
|
apiService.delete(new EntityWrapper<SysAssembleApi>().eq(Constant.PARENT_ID, id));
|
//删除 汇集任务相关的参数信息
|
paramsService.delete(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, id));
|
boolean b = true;
|
String jobId = assemble.getJobId();
|
if (!StringUtils.isEmpty(jobId)) {
|
//xxljob中删除 汇集任务
|
b = client.deleteJob(jobId);
|
}
|
//最后删除汇集任务
|
boolean deleted = assemble.deleteById();
|
if (deleted) {
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.DELETE_ERROR);
|
}
|
}
|
/**
|
*
|
* @description: 保存汇集质量检验类型
|
* @param id 汇集id
|
* @param type 汇集质量检验的类型具体信息见SysAssembleType
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "saveCheckType/{id}", method = RequestMethod.GET)
|
public Result saveUpdateType(@RequestParam SysAssembleCheckType type, @PathVariable String id,HttpServletRequest request) {
|
//通过id获取汇集任务
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//保存type信息
|
assemble.setCheckType(type);
|
|
if (type.equals(SysAssembleCheckType.partSuccessAdd)) {
|
//部分通过需要保存 页面选中的字段信息
|
String fields = request.getParameter("fields");
|
if (StringUtils.isEmpty(fields)) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
assemble.setCheckFields(fields);
|
}
|
//更新数据库信息
|
boolean updated = assemble.updateById();
|
if (updated) {
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.UPDATE_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 更新汇集任务状态
|
* @param id 汇集id
|
* @param status 任务状态
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "updateStatus/{id}", method = RequestMethod.GET)
|
public Result updateStatus(@RequestParam SysAssembleStatus status,@RequestParam String id) {
|
//通过id获取汇集任务
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//保存状态
|
assemble.setStatus(status);
|
boolean updated = assemble.updateById();
|
if (updated) {
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.UPDATE_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 保存汇集清洗sql
|
* @param id 汇集id
|
* @param object 清洗sql
|
* @return: result 保存是否成功
|
*
|
*/
|
@RequestMapping(value = "loadPurgeSql/{id}", method = RequestMethod.POST)
|
public Result loadPurgeSql(@RequestBody JSONObject object,@PathVariable String id) {
|
String sql = object.getString("sql");
|
//通过id获取汇集信息
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//保存清洗sql
|
assemble.setPurgeSql(sql);
|
boolean updated = assemble.updateById();
|
if (updated) {
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.UPDATE_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 获取数据源选中的表信息
|
* @param id 汇集id
|
* @return: result 保存是否成功
|
*
|
*/
|
|
@RequestMapping(value = "getTables/{id}", method = RequestMethod.GET)
|
public Result getTables(@PathVariable String id) {
|
//获取汇集任务信息
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//TODO ONLY db 缺少 api的
|
//获取数据源信息
|
List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id));
|
JSONArray sourceTables = new JSONArray();
|
//循环从源中获取 源表信息
|
for (SysAssembleDb sysAssembleDb : dbList) {
|
List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, sysAssembleDb.getId()));
|
for (SysAssembleDbTable dbTable : tableList) {
|
JSONObject object = new JSONObject();
|
object.fluentPut(Constant.ID, dbTable.getId());
|
object.fluentPut(Constant.tableName, dbTable.getTempTableName());
|
sourceTables.add(object);
|
}
|
}
|
//通过menuId 获取 目标表名
|
MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", assemble.getMenuId()));
|
String tableName = menuMapping.getTableName();
|
String tempTableName = Constant.Temp + tableName;
|
//组装返回信息
|
JSONObject object = new JSONObject();
|
object.fluentPut(Constant.ID, null);
|
object.fluentPut(Constant.tableName, tempTableName);
|
JSONObject result = new JSONObject();
|
result.fluentPut("source", sourceTables);
|
result.fluentPut("target", object);
|
return Result.success(result);
|
}
|
|
/**
|
*
|
* @description: 保存汇集的定时任务信息
|
* @param cron : 定时任务的corn表达式
|
* @param id 汇集id
|
* @return: result 保存是否成功
|
*
|
*/
|
|
@Transactional(rollbackFor=Exception.class)
|
@RequestMapping(value = "saveCorn/{id}", method = RequestMethod.GET)
|
public Result add(@RequestParam String cron,@PathVariable String id) throws Exception {
|
//获取汇集任务信息
|
SysAssemble assemble = assembleService.selectById(id);
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
//校验表达式
|
boolean validExpression = CronSequenceGenerator.isValidExpression(cron);
|
if (!validExpression) {
|
return Result.error(new CodeMsg(6006, "corn 表达式不规范"));
|
}
|
assemble.setCron(cron);
|
//更新cron参数
|
boolean b = paramsService.updateCornVal(assemble.getId(), cron);
|
if (!b) {
|
return Result.error(new CodeMsg(6007, "corn 表达式更新值错误"));
|
}
|
|
assemble.setStatus(SysAssembleStatus.working);
|
String jobId = assemble.getJobId();
|
boolean xxljobStatus = true;
|
//保存汇集任务到xxljob内
|
if (StringUtils.isEmpty(jobId)) {
|
xxljobStatus = client.addJob(assemble);
|
}
|
else {
|
xxljobStatus = client.updateJob(assemble);
|
}
|
if (!xxljobStatus) {
|
throw new Exception("add or update xxxljob fail");
|
}
|
//更新汇集信息
|
boolean updated = assemble.updateById();
|
if (updated) {
|
return Result.success(assemble);
|
}else {
|
return Result.error(CodeMsg.UPDATE_ERROR);
|
}
|
}
|
|
/**
|
*
|
* @description: 页面汇集信息
|
* @param id 汇集id
|
* @return: result 汇集任务信息
|
*
|
*/
|
|
@RequestMapping(value = "get/{id}", method = RequestMethod.GET)
|
public Result get(@PathVariable String id) {
|
SysAssemble assemble = assembleService.selectById(id);
|
String menuId = assemble.getMenuId();
|
JSONObject o = (JSONObject) JSON.toJSON(assemble);
|
if (!StringUtils.isEmpty(menuId)) {
|
//通过menu Id 将 此主题的父级主题找到返回给前端
|
LinkedHashSet<String> ids = new LinkedHashSet<>();
|
ids.add(menuId);
|
//获取所有的父级主题
|
Set<String> byParentId = menuService.getByParentId(ids);
|
ArrayList<String> strings = new ArrayList<>(byParentId);
|
Collections.reverse(strings);
|
|
o.fluentPut("menupath", strings);
|
}
|
|
if (assemble == null) {
|
return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
}
|
return Result.success(o);
|
}
|
|
/**
|
*
|
* @description: 获取汇集list
|
* @param pageNo 页数
|
* @return: result 分页后的汇集list
|
*
|
*/
|
@RequestMapping(value = "/{pageNo}", method = RequestMethod.GET)
|
public Result page(@PathVariable Integer pageNo, HttpServletRequest request) {
|
EntityWrapper<SysAssemble> wrapper = new EntityWrapper<>();
|
//dbName 筛选---数据库源信息
|
String dbName = request.getParameter("dbName");
|
if (!StringUtils.isEmpty(dbName)) {
|
List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq("datasource_name", dbName));
|
if (dbList != null && dbList.size() != 0) {
|
List<String> collect = dbList.stream().map(sysAssembleDb -> sysAssembleDb.getParentId()).collect(Collectors.toList());
|
wrapper.in(Constant.ID, collect);
|
}
|
}
|
//添加筛选--主题信息
|
String menuId = request.getParameter("menuId");
|
if (!StringUtils.isEmpty(menuId)) {
|
wrapper.eq("menu_id", menuId);
|
}
|
//筛选-- 保存再哪个数据库
|
String bigdata = request.getParameter("bigdata");
|
if (!StringUtils.isEmpty(bigdata)) {
|
wrapper.eq("bigdata", Boolean.valueOf(bigdata));
|
}
|
//筛选-- 当前状态
|
String status = request.getParameter("status");
|
if (!StringUtils.isEmpty(status)) {
|
wrapper.eq("status", status);
|
}
|
|
Page<SysAssemble> page;
|
//每页数据量
|
String pageSize = request.getParameter("pageSize");
|
if (!StringUtils.isEmpty(pageSize)) {
|
page = new Page(pageNo, Integer.valueOf(pageSize));
|
}else {
|
page = new Page(pageNo, 15);
|
}
|
//通过更新时间,创建时间倒叙
|
wrapper.orderBy("update_time desc, create_time desc");
|
Page<SysAssemble> resultPage = assembleService.selectPage(page,wrapper);
|
List<SysAssemble> records = resultPage.getRecords();
|
|
List<JSONObject> array = new ArrayList<>();
|
for (SysAssemble record : records) {
|
String id = record.getId();
|
//找到corn参数 获取下次执行时间 组装后返回给前端
|
Wrapper<SysAssembleParams> eq = new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, id).eq(Constant.Code, Constant.Cron);
|
JSONObject o = (JSONObject) JSON.toJSON(record);
|
|
SysAssembleParams sysAssembleParams = paramsService.selectOne(eq);
|
if(sysAssembleParams != null) {
|
String val = sysAssembleParams.getVal();
|
o.fluentPut("nextTime", val);
|
}
|
|
String realMenuId = record.getMenuId();
|
if (!StringUtils.isEmpty(realMenuId)) {
|
//获取选中主题的父级主题链,组装后返回给前端
|
LinkedHashSet<String> menuSet = new LinkedHashSet<>();
|
menuSet.add(realMenuId);
|
LinkedHashSet<String> byParentId = menuService.getByParentId(menuSet);
|
List<SysMenu> sysMenus = menuService.selectBatchIds(byParentId);
|
o.fluentPut("menuList", sysMenus);
|
}
|
//本汇集任务所有的源信息
|
List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, record.getId()));
|
o.fluentPut("dbList", dbList);
|
|
array.add(o);
|
}
|
//组装返回给前端的信息
|
Page<JSONObject> jsonObjectPage = new Page<>();
|
jsonObjectPage.setCurrent(resultPage.getCurrent());
|
jsonObjectPage.setSize(resultPage.getSize());
|
jsonObjectPage.setTotal(resultPage.getTotal());
|
jsonObjectPage.setRecords(array);
|
return Result.success(jsonObjectPage);
|
}
|
}
|