package com.highdatas.mdm.controller; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.mapper.EntityWrapper; import com.baomidou.mybatisplus.mapper.Wrapper; import com.baomidou.mybatisplus.plugins.Page; import com.highdatas.mdm.entity.*; import com.highdatas.mdm.job.JobClient; import com.highdatas.mdm.pojo.*; import com.highdatas.mdm.pojo.kettle.BigDataDataSourceInfo; import com.highdatas.mdm.pojo.kettle.DataSourceInfo; import com.highdatas.mdm.pojo.kettle.UnBigDataDataSourceInfo; import com.highdatas.mdm.service.*; import com.highdatas.mdm.util.Constant; import com.highdatas.mdm.util.DbUtils; import com.highdatas.mdm.util.RuleClient; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.support.CronSequenceGenerator; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.util.*; import java.util.stream.Collectors; /** * @author kimi * @description * @date 2020-02-17 10:58 */ @Slf4j @RestController @RequestMapping("/assemble") public class AssembleController { @Autowired ISysAssembleService assembleService; @Autowired ISysAssembleDbService dbService; @Autowired ISysAssembleDbFieldService fieldService; @Autowired ISysAssembleDbTableService tableService; @Autowired ISysAssembleApiService apiService; @Autowired ISysAssembleCommonparamsService commonparamsService; @Autowired ISysAssembleParamsService paramsService; @Autowired ISysMenuService menuService; @Autowired IMenuMappingService menuMappingService; @Autowired BigDataDataSourceInfo bigDataDataSourceInfo; @Autowired UnBigDataDataSourceInfo unBigDataDataSourceInfo; @Autowired JobClient client; @Autowired RuleClient ruleClient; /** * * @description: 立即执行汇集流程 * @param id 汇集流程的id * @return: result 是否调起xxljob成功 * */ @RequestMapping(value = "run/{id}", method = RequestMethod.GET) public Result trigger(@PathVariable String id, HttpServletRequest request) { //通过id获取汇集信息 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //调起xxljob 执行一次 接口 boolean trigger = client.trigger(assemble); if (trigger) { return Result.success(null); }else { return Result.error(CodeMsg.OPERATR_ERROR); } } /** * * @description: 修改xxljob jobGroup信息 手动获取ip信息 * @return: result 保存是否成功 * */ @RequestMapping(value = "saveJobGroup", method = RequestMethod.GET) public Result saveJobGroup(HttpServletRequest request) { boolean b = client.saveJobGroup(); if (b) { return Result.success(null); }else { return Result.error(CodeMsg.OPERATR_ERROR); } } /** * * @description: 修改xxljob jobGroup信息 自动获取ip信息 * @return: result 保存是否成功 * */ @RequestMapping(value = "saveAutoJobGroup", method = RequestMethod.GET) public Result saveAutoJobGroup(HttpServletRequest request) { boolean b = client.saveAutoJobGroup(); if (b) { return Result.success(null); }else { return Result.error(CodeMsg.OPERATR_ERROR); } } /** * * @description: 删除xxljob jobGroup信息 * @return: result 保存是否成功 * */ @RequestMapping(value = "deleteJobGroup", method = RequestMethod.GET) public Result deleteJobGroup(HttpServletRequest request) { boolean b = client.saveJobGroup(); if (b) { return Result.success(null); }else { return Result.error(CodeMsg.OPERATR_ERROR); } } /** * * @description: 新建汇集记录 * @return: result 新建是否成功 * */ @RequestMapping(value = "new", method = RequestMethod.GET) public Result newAssemble(HttpServletRequest request) { SysAssemble assemble = new SysAssemble(); //获取用户信息 TUser user = DbUtils.getUser(request); //保存初始汇集信息 assemble.setStatus(SysAssembleStatus.edit).setCreateTime(new Date()).setUserId(user.getUserId()).setUpdateType(SysAssembleUpdateType.All); //保存进数据库 boolean insert = assemble.setId(DbUtils.getUUID()).setEmptyData(true).insert(); if (insert) { String assembleId = assemble.getId(); // 创建job的时候将所有 通用参数搬一份过去 通用参数 见 sys_assemble_commonparams List sysAssembleCommonparams = commonparamsService.selectList(null); for (SysAssembleCommonparams commonParam : sysAssembleCommonparams) { SysAssembleParams sysAssembleParams = new SysAssembleParams(); sysAssembleParams .setName(commonParam.getName()) .setCode(commonParam.getCode()) .setInitSql(commonParam.getInitSql()).setUpdateSql(commonParam.getUpdateSql()) .setCreateTime(new Date()) .setParentId(assembleId) .setId(DbUtils.getUUID()).insert(); } return Result.success(assemble); }else { return Result.error(CodeMsg.INSERT_ERROR); } } /** * * @description: 保存汇集的基础信息 * @param menuId : 主题id * @param id 汇集id * @param bigData 保存数据库选择 * @param updateType 更新类型 * @param updateFields 唯一字段 * @return: result 保存是否成功 * */ @RequestMapping(value = "/saveCommon/{id}", method = RequestMethod.GET) public Result saveCommon(@PathVariable String id, @RequestParam String menuId, @RequestParam SysAssembleUpdateType updateType, @RequestParam Boolean bigData, @RequestParam String updateFields, HttpServletRequest request) { //通过id获取汇集记录 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //保存请求过来的数据 assemble.setUpdateType(updateType).setUpdateFields(updateFields); assemble.setEmptyData(false); assemble.setBigdata(Boolean.valueOf(bigData)); //请求头里面获取用户信息 TUser user = DbUtils.getUser(request); //更新数据库里的信息 boolean insert = assemble.setUserId(user.getUserId()).setMenuId(menuId).setUpdateTime(new Date()).updateById(); if (insert) { return Result.success(assemble); }else { return Result.error(CodeMsg.INSERT_ERROR); } } /** * * @description: 删除汇集任务 * @param id 汇集id * @return: result 删除是否成功 * */ @Transactional(rollbackFor=Exception.class) @RequestMapping(value = "delete/{id}", method = RequestMethod.GET) public Result delete(@PathVariable String id) throws Exception { //通过id获取汇集记录 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //获取汇集数据保存在哪个数据库 Boolean bigdata = assemble.getBigdata(); DataSourceInfo dataSourceInfo; if (bigdata) { dataSourceInfo = bigDataDataSourceInfo; }else { dataSourceInfo = unBigDataDataSourceInfo; } //循环删除 汇集数据源信息 List dbList = dbService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, id)); for (SysAssembleDb sysAssembleDb : dbList) { String dbId = sysAssembleDb.getId(); //循环删除汇集数据源中选中的表信息 Wrapper eq = new EntityWrapper().eq(Constant.PARENT_ID, dbId); List tableList = tableService.selectList(eq); for (SysAssembleDbTable dbTable : tableList) { String tableId = dbTable.getId(); //删除汇集汇集数据源中选中表的选中字段信息,外加删除数据库中已经创建的物理表 String tempTableName = dbTable.getTempTableName(); dataSourceInfo.dropData(tempTableName); Wrapper fieldWrapper = new EntityWrapper().eq(Constant.PARENT_ID, tableId); boolean delete = fieldService.delete(fieldWrapper); if (!delete) { return Result.error(CodeMsg.DELETE_ERROR); } dbTable.deleteById(); } sysAssembleDb.deleteById(); } //删除汇集从api中获取的数据信息 apiService.delete(new EntityWrapper().eq(Constant.PARENT_ID, id)); //删除 汇集任务相关的参数信息 paramsService.delete(new EntityWrapper().eq(Constant.PARENT_ID, id)); boolean b = true; String jobId = assemble.getJobId(); if (!StringUtils.isEmpty(jobId)) { //xxljob中删除 汇集任务 b = client.deleteJob(jobId); } //最后删除汇集任务 boolean deleted = assemble.deleteById(); if (deleted) { return Result.success(assemble); }else { return Result.error(CodeMsg.DELETE_ERROR); } } /** * * @description: 保存汇集质量检验类型 * @param id 汇集id * @param type 汇集质量检验的类型具体信息见SysAssembleType * @return: result 保存是否成功 * */ @RequestMapping(value = "saveCheckType/{id}", method = RequestMethod.GET) public Result saveUpdateType(@RequestParam SysAssembleCheckType type, @PathVariable String id,HttpServletRequest request) { //通过id获取汇集任务 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //保存type信息 assemble.setCheckType(type); if (type.equals(SysAssembleCheckType.partSuccessAdd)) { //部分通过需要保存 页面选中的字段信息 String fields = request.getParameter("fields"); if (StringUtils.isEmpty(fields)) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } assemble.setCheckFields(fields); } //更新数据库信息 boolean updated = assemble.updateById(); if (updated) { return Result.success(assemble); }else { return Result.error(CodeMsg.UPDATE_ERROR); } } /** * * @description: 更新汇集任务状态 * @param id 汇集id * @param status 任务状态 * @return: result 保存是否成功 * */ @RequestMapping(value = "updateStatus/{id}", method = RequestMethod.GET) public Result updateStatus(@RequestParam SysAssembleStatus status,@RequestParam String id) { //通过id获取汇集任务 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //保存状态 assemble.setStatus(status); boolean updated = assemble.updateById(); if (updated) { return Result.success(assemble); }else { return Result.error(CodeMsg.UPDATE_ERROR); } } /** * * @description: 保存汇集清洗sql * @param id 汇集id * @param object 清洗sql * @return: result 保存是否成功 * */ @RequestMapping(value = "loadPurgeSql/{id}", method = RequestMethod.POST) public Result loadPurgeSql(@RequestBody JSONObject object,@PathVariable String id) { String sql = object.getString("sql"); //通过id获取汇集信息 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //保存清洗sql assemble.setPurgeSql(sql); boolean updated = assemble.updateById(); if (updated) { return Result.success(assemble); }else { return Result.error(CodeMsg.UPDATE_ERROR); } } /** * * @description: 获取数据源选中的表信息 * @param id 汇集id * @return: result 保存是否成功 * */ @RequestMapping(value = "getTables/{id}", method = RequestMethod.GET) public Result getTables(@PathVariable String id) { //获取汇集任务信息 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //TODO ONLY db 缺少 api的 //获取数据源信息 List dbList = dbService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, id)); JSONArray sourceTables = new JSONArray(); //循环从源中获取 源表信息 for (SysAssembleDb sysAssembleDb : dbList) { List tableList = tableService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, sysAssembleDb.getId())); for (SysAssembleDbTable dbTable : tableList) { JSONObject object = new JSONObject(); object.fluentPut(Constant.ID, dbTable.getId()); object.fluentPut(Constant.tableName, dbTable.getTempTableName()); sourceTables.add(object); } } //通过menuId 获取 目标表名 MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper().eq("menu_id", assemble.getMenuId())); String tableName = menuMapping.getTableName(); String tempTableName = Constant.Temp + tableName; //组装返回信息 JSONObject object = new JSONObject(); object.fluentPut(Constant.ID, null); object.fluentPut(Constant.tableName, tempTableName); JSONObject result = new JSONObject(); result.fluentPut("source", sourceTables); result.fluentPut("target", object); return Result.success(result); } /** * * @description: 保存汇集的定时任务信息 * @param cron : 定时任务的corn表达式 * @param id 汇集id * @return: result 保存是否成功 * */ @Transactional(rollbackFor=Exception.class) @RequestMapping(value = "saveCorn/{id}", method = RequestMethod.GET) public Result add(@RequestParam String cron,@PathVariable String id) throws Exception { //获取汇集任务信息 SysAssemble assemble = assembleService.selectById(id); if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } //校验表达式 boolean validExpression = CronSequenceGenerator.isValidExpression(cron); if (!validExpression) { return Result.error(new CodeMsg(6006, "corn 表达式不规范")); } assemble.setCron(cron); //更新cron参数 boolean b = paramsService.updateCornVal(assemble.getId(), cron); if (!b) { return Result.error(new CodeMsg(6007, "corn 表达式更新值错误")); } assemble.setStatus(SysAssembleStatus.working); String jobId = assemble.getJobId(); boolean xxljobStatus = true; //保存汇集任务到xxljob内 if (StringUtils.isEmpty(jobId)) { xxljobStatus = client.addJob(assemble); } else { xxljobStatus = client.updateJob(assemble); } if (!xxljobStatus) { throw new Exception("add or update xxxljob fail"); } //更新汇集信息 boolean updated = assemble.updateById(); if (updated) { return Result.success(assemble); }else { return Result.error(CodeMsg.UPDATE_ERROR); } } /** * * @description: 页面汇集信息 * @param id 汇集id * @return: result 汇集任务信息 * */ @RequestMapping(value = "get/{id}", method = RequestMethod.GET) public Result get(@PathVariable String id) { SysAssemble assemble = assembleService.selectById(id); String menuId = assemble.getMenuId(); JSONObject o = (JSONObject) JSON.toJSON(assemble); if (!StringUtils.isEmpty(menuId)) { //通过menu Id 将 此主题的父级主题找到返回给前端 LinkedHashSet ids = new LinkedHashSet<>(); ids.add(menuId); //获取所有的父级主题 Set byParentId = menuService.getByParentId(ids); ArrayList strings = new ArrayList<>(byParentId); Collections.reverse(strings); o.fluentPut("menupath", strings); } if (assemble == null) { return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED); } return Result.success(o); } /** * * @description: 获取汇集list * @param pageNo 页数 * @return: result 分页后的汇集list * */ @RequestMapping(value = "/{pageNo}", method = RequestMethod.GET) public Result page(@PathVariable Integer pageNo, HttpServletRequest request) { EntityWrapper wrapper = new EntityWrapper<>(); //dbName 筛选---数据库源信息 String dbName = request.getParameter("dbName"); if (!StringUtils.isEmpty(dbName)) { List dbList = dbService.selectList(new EntityWrapper().eq("datasource_name", dbName)); if (dbList != null && dbList.size() != 0) { List collect = dbList.stream().map(sysAssembleDb -> sysAssembleDb.getParentId()).collect(Collectors.toList()); wrapper.in(Constant.ID, collect); } } //添加筛选--主题信息 String menuId = request.getParameter("menuId"); if (!StringUtils.isEmpty(menuId)) { wrapper.eq("menu_id", menuId); } //筛选-- 保存再哪个数据库 String bigdata = request.getParameter("bigdata"); if (!StringUtils.isEmpty(bigdata)) { wrapper.eq("bigdata", Boolean.valueOf(bigdata)); } //筛选-- 当前状态 String status = request.getParameter("status"); if (!StringUtils.isEmpty(status)) { wrapper.eq("status", status); } Page page; //每页数据量 String pageSize = request.getParameter("pageSize"); if (!StringUtils.isEmpty(pageSize)) { page = new Page(pageNo, Integer.valueOf(pageSize)); }else { page = new Page(pageNo, 15); } //通过更新时间,创建时间倒叙 wrapper.orderBy("update_time desc, create_time desc"); Page resultPage = assembleService.selectPage(page,wrapper); List records = resultPage.getRecords(); List array = new ArrayList<>(); for (SysAssemble record : records) { String id = record.getId(); //找到corn参数 获取下次执行时间 组装后返回给前端 Wrapper eq = new EntityWrapper().eq(Constant.PARENT_ID, id).eq(Constant.Code, Constant.Cron); JSONObject o = (JSONObject) JSON.toJSON(record); SysAssembleParams sysAssembleParams = paramsService.selectOne(eq); if(sysAssembleParams != null) { String val = sysAssembleParams.getVal(); o.fluentPut("nextTime", val); } String realMenuId = record.getMenuId(); if (!StringUtils.isEmpty(realMenuId)) { //获取选中主题的父级主题链,组装后返回给前端 LinkedHashSet menuSet = new LinkedHashSet<>(); menuSet.add(realMenuId); LinkedHashSet byParentId = menuService.getByParentId(menuSet); List sysMenus = menuService.selectBatchIds(byParentId); o.fluentPut("menuList", sysMenus); } //本汇集任务所有的源信息 List dbList = dbService.selectList(new EntityWrapper().eq(Constant.PARENT_ID, record.getId())); o.fluentPut("dbList", dbList); array.add(o); } //组装返回给前端的信息 Page jsonObjectPage = new Page<>(); jsonObjectPage.setCurrent(resultPage.getCurrent()); jsonObjectPage.setSize(resultPage.getSize()); jsonObjectPage.setTotal(resultPage.getTotal()); jsonObjectPage.setRecords(array); return Result.success(jsonObjectPage); } }