| | |
| | | import com.highdatas.mdm.service.*;
|
| | | import com.highdatas.mdm.util.Constant;
|
| | | import com.highdatas.mdm.util.DbUtils;
|
| | | import com.highdatas.mdm.util.RuleClient;
|
| | | import lombok.extern.slf4j.Slf4j;
|
| | | import org.apache.commons.lang3.StringUtils;
|
| | | import org.springframework.beans.factory.annotation.Autowired;
|
| | |
| | | UnBigDataDataSourceInfo unBigDataDataSourceInfo;
|
| | | @Autowired
|
| | | JobClient client;
|
| | | @Autowired
|
| | | RuleClient ruleClient;
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 立即执行汇集流程
|
| | | * @param id 汇集流程的id
|
| | | * @return: result 是否调起xxljob成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "run/{id}", method = RequestMethod.GET)
|
| | | public Result trigger(@PathVariable String id, HttpServletRequest request) {
|
| | | //通过id获取汇集信息
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //调起xxljob 执行一次 接口
|
| | | boolean trigger = client.trigger(assemble);
|
| | | if (trigger) {
|
| | | return Result.success(null);
|
| | |
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 修改xxljob jobGroup信息 手动获取ip信息
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "saveJobGroup", method = RequestMethod.GET)
|
| | | public Result saveJobGroup(HttpServletRequest request) {
|
| | | boolean b = client.saveJobGroup();
|
| | |
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 修改xxljob jobGroup信息 自动获取ip信息
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "saveAutoJobGroup", method = RequestMethod.GET)
|
| | | public Result saveAutoJobGroup(HttpServletRequest request) {
|
| | | boolean b = client.saveAutoJobGroup();
|
| | | if (b) {
|
| | | return Result.success(null);
|
| | | }else {
|
| | | return Result.error(CodeMsg.OPERATR_ERROR);
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 删除xxljob jobGroup信息
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "deleteJobGroup", method = RequestMethod.GET)
|
| | | public Result deleteJobGroup(HttpServletRequest request) {
|
| | | boolean b = client.saveJobGroup();
|
| | |
| | | }
|
| | | }
|
| | |
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 新建汇集记录
|
| | | * @return: result 新建是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "new", method = RequestMethod.GET)
|
| | | public Result delete(HttpServletRequest request) {
|
| | | public Result newAssemble(HttpServletRequest request) {
|
| | | SysAssemble assemble = new SysAssemble();
|
| | | TUser user = (TUser) request.getSession().getAttribute(Constant.USER);
|
| | | //获取用户信息
|
| | | TUser user = DbUtils.getUser(request);
|
| | | //保存初始汇集信息
|
| | | assemble.setStatus(SysAssembleStatus.edit).setCreateTime(new Date()).setUserId(user.getUserId()).setUpdateType(SysAssembleUpdateType.All);
|
| | | //保存进数据库
|
| | | boolean insert = assemble.setId(DbUtils.getUUID()).setEmptyData(true).insert();
|
| | | if (insert) {
|
| | | String assembleId = assemble.getId();
|
| | | // 创建job的时候将所有 通用参数搬一份过去
|
| | | // 创建job的时候将所有 通用参数搬一份过去 通用参数 见 sys_assemble_commonparams
|
| | | List<SysAssembleCommonparams> sysAssembleCommonparams = commonparamsService.selectList(null);
|
| | | for (SysAssembleCommonparams commonParam : sysAssembleCommonparams) {
|
| | | SysAssembleParams sysAssembleParams = new SysAssembleParams();
|
| | |
| | | }
|
| | |
|
| | |
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 保存汇集的基础信息
|
| | | * @param menuId : 主题id
|
| | | * @param id 汇集id
|
| | | * @param bigData 保存数据库选择
|
| | | * @param updateType 更新类型
|
| | | * @param updateFields 唯一字段
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "/saveCommon/{id}", method = RequestMethod.GET)
|
| | | public Result saveCommon(@PathVariable String id, @RequestParam String menuId, @RequestParam SysAssembleUpdateType updateType, @RequestParam Boolean bigData, @RequestParam String updateFields, HttpServletRequest request) {
|
| | | //通过id获取汇集记录
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //保存请求过来的数据
|
| | | assemble.setUpdateType(updateType).setUpdateFields(updateFields);
|
| | | assemble.setEmptyData(false);
|
| | | assemble.setBigdata(Boolean.valueOf(bigData));
|
| | | TUser user = (TUser) request.getSession().getAttribute(Constant.USER);
|
| | | //请求头里面获取用户信息
|
| | | TUser user = DbUtils.getUser(request);
|
| | | //更新数据库里的信息
|
| | | boolean insert = assemble.setUserId(user.getUserId()).setMenuId(menuId).setUpdateTime(new Date()).updateById();
|
| | |
|
| | | if (insert) {
|
| | |
| | | return Result.error(CodeMsg.INSERT_ERROR);
|
| | | }
|
| | | }
|
| | | /**
|
| | | *
|
| | | * @description: 删除汇集任务
|
| | | * @param id 汇集id
|
| | | * @return: result 删除是否成功
|
| | | *
|
| | | */
|
| | |
|
| | | @Transactional(rollbackFor=Exception.class)
|
| | | @RequestMapping(value = "delete/{id}", method = RequestMethod.GET)
|
| | | public Result delete(@PathVariable String id) throws Exception {
|
| | | //通过id获取汇集记录
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //获取汇集数据保存在哪个数据库
|
| | | Boolean bigdata = assemble.getBigdata();
|
| | | DataSourceInfo dataSourceInfo;
|
| | | if (bigdata) {
|
| | |
| | | }else {
|
| | | dataSourceInfo = unBigDataDataSourceInfo;
|
| | | }
|
| | | //delete db
|
| | | //循环删除 汇集数据源信息
|
| | | List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id));
|
| | | for (SysAssembleDb sysAssembleDb : dbList) {
|
| | | String dbId = sysAssembleDb.getId();
|
| | | //delete table
|
| | | //循环删除汇集数据源中选中的表信息
|
| | |
|
| | | Wrapper<SysAssembleDbTable> eq = new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, dbId);
|
| | | List<SysAssembleDbTable> tableList = tableService.selectList(eq);
|
| | | for (SysAssembleDbTable dbTable : tableList) {
|
| | | String tableId = dbTable.getId();
|
| | | //delete field
|
| | | //删除汇集汇集数据源中选中表的选中字段信息,外加删除数据库中已经创建的物理表
|
| | | String tempTableName = dbTable.getTempTableName();
|
| | | dataSourceInfo.dropData(tempTableName);
|
| | |
|
| | |
| | | }
|
| | | sysAssembleDb.deleteById();
|
| | | }
|
| | | //TODO delete api
|
| | | //删除汇集从api中获取的数据信息
|
| | | apiService.delete(new EntityWrapper<SysAssembleApi>().eq(Constant.PARENT_ID, id));
|
| | | //delete param
|
| | | //删除 汇集任务相关的参数信息
|
| | | paramsService.delete(new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, id));
|
| | | boolean b = true;
|
| | | String jobId = assemble.getJobId();
|
| | | if (!StringUtils.isEmpty(jobId)) {
|
| | | //xxljob中删除 汇集任务
|
| | | b = client.deleteJob(jobId);
|
| | | }
|
| | | if (!b) {
|
| | | throw new Exception("xxljob 删除失败");
|
| | | }
|
| | | //最后删除汇集任务
|
| | | boolean deleted = assemble.deleteById();
|
| | | if (deleted) {
|
| | | return Result.success(assemble);
|
| | |
| | | return Result.error(CodeMsg.DELETE_ERROR);
|
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 保存汇集质量检验类型
|
| | | * @param id 汇集id
|
| | | * @param type 汇集质量检验的类型具体信息见SysAssembleType
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "saveCheckType/{id}", method = RequestMethod.GET)
|
| | | public Result saveUpdateType(@RequestParam SysAssembleCheckType type, @PathVariable String id,HttpServletRequest request) {
|
| | | //通过id获取汇集任务
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //保存type信息
|
| | | assemble.setCheckType(type);
|
| | |
|
| | | if (type.equals(SysAssembleCheckType.partSuccessAdd)) {
|
| | | //部分通过需要保存 页面选中的字段信息
|
| | | String fields = request.getParameter("fields");
|
| | | if (StringUtils.isEmpty(fields)) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | assemble.setCheckFields(fields);
|
| | | }
|
| | | //更新数据库信息
|
| | | boolean updated = assemble.updateById();
|
| | | if (updated) {
|
| | | return Result.success(assemble);
|
| | |
| | | }
|
| | | }
|
| | |
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 更新汇集任务状态
|
| | | * @param id 汇集id
|
| | | * @param status 任务状态
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "updateStatus/{id}", method = RequestMethod.GET)
|
| | | public Result updateStatus(@RequestParam SysAssembleStatus status,@RequestParam String id) {
|
| | | //通过id获取汇集任务
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //保存状态
|
| | | assemble.setStatus(status);
|
| | | boolean updated = assemble.updateById();
|
| | | if (updated) {
|
| | |
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 保存汇集清洗sql
|
| | | * @param id 汇集id
|
| | | * @param object 清洗sql
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "loadPurgeSql/{id}", method = RequestMethod.POST)
|
| | | public Result loadPurgeSql(@RequestBody JSONObject object,@PathVariable String id) {
|
| | | String sql = object.getString("sql");
|
| | | //通过id获取汇集信息
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //保存清洗sql
|
| | | assemble.setPurgeSql(sql);
|
| | | boolean updated = assemble.updateById();
|
| | | if (updated) {
|
| | |
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 获取数据源选中的表信息
|
| | | * @param id 汇集id
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | |
|
| | | @RequestMapping(value = "getTables/{id}", method = RequestMethod.GET)
|
| | | public Result getTables(@PathVariable String id) {
|
| | | //获取汇集任务信息
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //TODO ONLY db 缺少 api的
|
| | | //获取数据源信息
|
| | | List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, id));
|
| | | JSONArray sourceTables = new JSONArray();
|
| | |
|
| | | //循环从源中获取 源表信息
|
| | | for (SysAssembleDb sysAssembleDb : dbList) {
|
| | | List<SysAssembleDbTable> tableList = tableService.selectList(new EntityWrapper<SysAssembleDbTable>().eq(Constant.PARENT_ID, sysAssembleDb.getId()));
|
| | | for (SysAssembleDbTable dbTable : tableList) {
|
| | |
| | | sourceTables.add(object);
|
| | | }
|
| | | }
|
| | | //通过menuId 获取 目标表名
|
| | | MenuMapping menuMapping = menuMappingService.selectOne(new EntityWrapper<MenuMapping>().eq("menu_id", assemble.getMenuId()));
|
| | | String tableName = menuMapping.getTableName();
|
| | | String tempTableName = Constant.Temp + tableName;
|
| | | //组装返回信息
|
| | | JSONObject object = new JSONObject();
|
| | | object.fluentPut(Constant.ID, null);
|
| | | object.fluentPut(Constant.tableName, tempTableName);
|
| | |
| | | return Result.success(result);
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 保存汇集的定时任务信息
|
| | | * @param cron : 定时任务的corn表达式
|
| | | * @param id 汇集id
|
| | | * @return: result 保存是否成功
|
| | | *
|
| | | */
|
| | |
|
| | | @Transactional(rollbackFor=Exception.class)
|
| | | @RequestMapping(value = "saveCorn/{id}", method = RequestMethod.GET)
|
| | | public Result add(@RequestParam String cron,@PathVariable String id) throws Exception {
|
| | | //获取汇集任务信息
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | if (assemble == null) {
|
| | | return Result.error(CodeMsg.ERROR_PARAMS_NOT_MATHED);
|
| | | }
|
| | | //校验表达式
|
| | | boolean validExpression = CronSequenceGenerator.isValidExpression(cron);
|
| | | if (!validExpression) {
|
| | | return Result.error(new CodeMsg(6006, "corn 表达式不规范"));
|
| | | }
|
| | | assemble.setCron(cron);
|
| | | //更新cron参数
|
| | | boolean b = paramsService.updateCornVal(assemble.getId(), cron);
|
| | | if (!b) {
|
| | | return Result.error(new CodeMsg(6007, "corn 表达式更新值错误"));
|
| | | }
|
| | |
|
| | | assemble.setStatus(SysAssembleStatus.working);
|
| | | String jobId = assemble.getJobId();
|
| | | boolean xxljobStatus = true;
|
| | | //保存汇集任务到xxljob内
|
| | | if (StringUtils.isEmpty(jobId)) {
|
| | | xxljobStatus = client.addJob(assemble);
|
| | | }
|
| | |
| | | if (!xxljobStatus) {
|
| | | throw new Exception("add or update xxxljob fail");
|
| | | }
|
| | |
|
| | | //更新汇集信息
|
| | | boolean updated = assemble.updateById();
|
| | | if (updated) {
|
| | | return Result.success(assemble);
|
| | |
| | | }
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 页面汇集信息
|
| | | * @param id 汇集id
|
| | | * @return: result 汇集任务信息
|
| | | *
|
| | | */
|
| | |
|
| | | @RequestMapping(value = "get/{id}", method = RequestMethod.GET)
|
| | | public Result get(@PathVariable String id) {
|
| | | SysAssemble assemble = assembleService.selectById(id);
|
| | | String menuId = assemble.getMenuId();
|
| | | JSONObject o = (JSONObject) JSON.toJSON(assemble);
|
| | | if (!StringUtils.isEmpty(menuId)) {
|
| | | //通过menu Id 将 此主题的父级主题找到返回给前端
|
| | | LinkedHashSet<String> ids = new LinkedHashSet<>();
|
| | | ids.add(menuId);
|
| | | //获取所有的父级主题
|
| | | Set<String> byParentId = menuService.getByParentId(ids);
|
| | | ArrayList<String> strings = new ArrayList<>(byParentId);
|
| | | Collections.reverse(strings);
|
| | |
| | | return Result.success(o);
|
| | | }
|
| | |
|
| | | /**
|
| | | *
|
| | | * @description: 获取汇集list
|
| | | * @param pageNo 页数
|
| | | * @return: result 分页后的汇集list
|
| | | *
|
| | | */
|
| | | @RequestMapping(value = "/{pageNo}", method = RequestMethod.GET)
|
| | | public Result page(@PathVariable Integer pageNo, HttpServletRequest request) {
|
| | | EntityWrapper<SysAssemble> wrapper = new EntityWrapper<>();
|
| | | //dbName 筛选---数据库源信息
|
| | | String dbName = request.getParameter("dbName");
|
| | | if (!StringUtils.isEmpty(dbName)) {
|
| | | List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq("datasource_name", dbName));
|
| | |
| | | wrapper.in(Constant.ID, collect);
|
| | | }
|
| | | }
|
| | |
|
| | | //添加筛选--主题信息
|
| | | String menuId = request.getParameter("menuId");
|
| | | if (!StringUtils.isEmpty(menuId)) {
|
| | | wrapper.eq("menu_id", menuId);
|
| | | }
|
| | | //筛选-- 保存再哪个数据库
|
| | | String bigdata = request.getParameter("bigdata");
|
| | | if (!StringUtils.isEmpty(bigdata)) {
|
| | | wrapper.eq("bigdata", Boolean.valueOf(bigdata));
|
| | | }
|
| | | //筛选-- 当前状态
|
| | | String status = request.getParameter("status");
|
| | | if (!StringUtils.isEmpty(status)) {
|
| | | wrapper.eq("status", status);
|
| | | }
|
| | |
|
| | | Page<SysAssemble> page;
|
| | | //每页数据量
|
| | | String pageSize = request.getParameter("pageSize");
|
| | | if (!StringUtils.isEmpty(pageSize)) {
|
| | | page = new Page(pageNo, Integer.valueOf(pageSize));
|
| | | }else {
|
| | | page = new Page(pageNo, 15);
|
| | | }
|
| | |
|
| | | //通过更新时间,创建时间倒叙
|
| | | wrapper.orderBy("update_time desc, create_time desc");
|
| | | Page<SysAssemble> resultPage = assembleService.selectPage(page,wrapper);
|
| | | List<SysAssemble> records = resultPage.getRecords();
|
| | |
|
| | | List<JSONObject> array = new ArrayList<>();
|
| | | for (SysAssemble record : records) {
|
| | | String id = record.getId();
|
| | | //找到corn参数 获取下次执行时间 组装后返回给前端
|
| | | Wrapper<SysAssembleParams> eq = new EntityWrapper<SysAssembleParams>().eq(Constant.PARENT_ID, id).eq(Constant.Code, Constant.Cron);
|
| | | JSONObject o = (JSONObject) JSON.toJSON(record);
|
| | |
|
| | | SysAssembleParams sysAssembleParams = paramsService.selectOne(eq);
|
| | | String val = sysAssembleParams.getVal();
|
| | | o.fluentPut("nextTime", val);
|
| | | if(sysAssembleParams != null) {
|
| | | String val = sysAssembleParams.getVal();
|
| | | o.fluentPut("nextTime", val);
|
| | | }
|
| | |
|
| | | String realMenuId = record.getMenuId();
|
| | | if (!StringUtils.isEmpty(realMenuId)) {
|
| | | //获取选中主题的父级主题链,组装后返回给前端
|
| | | LinkedHashSet<String> menuSet = new LinkedHashSet<>();
|
| | | menuSet.add(realMenuId);
|
| | | LinkedHashSet<String> byParentId = menuService.getByParentId(menuSet);
|
| | | List<SysMenu> sysMenus = menuService.selectBatchIds(byParentId);
|
| | | o.fluentPut("menuList", sysMenus);
|
| | | }
|
| | |
|
| | |
|
| | | //本汇集任务所有的源信息
|
| | | List<SysAssembleDb> dbList = dbService.selectList(new EntityWrapper<SysAssembleDb>().eq(Constant.PARENT_ID, record.getId()));
|
| | | o.fluentPut("dbList", dbList);
|
| | |
|
| | | array.add(o);
|
| | | }
|
| | | //组装返回给前端的信息
|
| | | Page<JSONObject> jsonObjectPage = new Page<>();
|
| | | jsonObjectPage.setCurrent(resultPage.getCurrent());
|
| | | jsonObjectPage.setSize(resultPage.getSize());
|