package foundation.action; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import foundation.dao.DataPackage; import foundation.dao.DataReader; import foundation.dao.DataWriter; import foundation.dao.Domain; import foundation.dao.PackageItem; import foundation.dao.bizlogic.DataStates; import foundation.dao.bizlogic.IActionProvider; import foundation.dao.bizlogic.IStateProvider; import foundation.data.object.DataObject; import foundation.handler.DataPool; import foundation.handler.ResultPool; import foundation.handler.SingletonHandler; import foundation.persist.SQLRunner; import foundation.route.Operation; import foundation.token.IOnlineUser; import foundation.util.Util; public class WorkflowDispatcher extends SingletonHandler { protected static Logger logger; private static WorkflowDispatcher instance; private WorkflowBuckets workflowBuckets; private Set stateOperators; private IStateProvider stateProvider; static { logger = LogManager.getLogger(WorkflowDispatcher.class); getInstance(); } private WorkflowDispatcher() { workflowBuckets = new WorkflowBuckets(); } public static synchronized WorkflowDispatcher getInstance() { if (instance == null) { instance = new WorkflowDispatcher(); } return instance; } @Override protected void dispatch(Operation operation, DataPool dataPool, ResultPool resultPool) throws Exception { //1. 创建DataReader、DataWriter DataReader dataReader = new DataReader(operation, dataPool); DataWriter dataWriter = new DataWriter(resultPool, dataReader.getDomain()); //2. 获取method String methodName = operation.getOperator(); Method method = methodMap.get(methodName); //3. 根据配置,运行 Task Actions if (method == null) { execWorkflow(dataReader, dataWriter, null); return; } //4. 根据注册的方法,运行特定方法 try { method.invoke(this, dataReader, dataWriter); } catch (InvocationTargetException e) { e.printStackTrace(); Throwable throwable = e.getTargetException(); if (throwable == null) { throw e; } else { throw (Exception) throwable; } } } @Override protected void publishMethod() { //1. 根据dataName和operator/state得到相应的task addMethod("execWorkflow"); //2. 重新加载action配置 addMethod("reloadConfig"); //3. 获取 Workflow addMethod("getWorkflow"); } public void execWorkflow(DataReader dataReader, DataWriter dataWriter, IWorkStep senderStep, Object... contexts) throws Exception { //1. 检查公司冻结状态 IOnlineUser user = IOnlineUser.getInstance(); if (user.isOrgFrozen()) { dataWriter.reportOneRejectValidation("该公司已经被冻结"); } //2. 创建DataPackage Domain domain = dataReader.getDomain(); DataPackage dataPackage = (domain != null) ? domain.getDataPackage() : null; //3. 检查DataPackage配置是否需要更新,如果需要就更新 checkMetasRefresh(dataReader, dataPackage); //4. 创建 work flow WorkFlowRuntime workflow = createWorkflow(dataReader, dataWriter, dataPackage, senderStep); workflow.setDataPackage(dataPackage); if (contexts != null) { for (Object context : contexts ) { workflow.addContext(context); } } //5. 检查 work flow 是否有中止标志 if (workflow.isTerminated()) { logger.error("tase exec fail, {}", workflow); return; } //6. 运行 work flow processWorkflow(workflow, dataReader, dataWriter); } protected void getWorkflow(DataReader dataReader, DataWriter dataWriter) throws Exception { //1. 检查公司冻结状态 IOnlineUser user = IOnlineUser.getInstance(); if (user.isOrgFrozen()) { dataWriter.reportOneRejectValidation("该公司已经被冻结"); } //2. 创建DataPackage Domain domain = dataReader.getDomain(); DataPackage dataPackage = domain.getDataPackage(); //3. 检查DataPackage配置是否需要更新,如果需要就更新 checkMetasRefresh(dataReader, dataPackage); //4. 创建 work flow WorkFlowRuntime workflow = createWorkflow(dataReader, dataWriter, dataPackage, null); workflow.setDataPackage(dataPackage); //5. 返回 dataWriter.addValue(workflow); } public void reloadConfig(DataReader dataReader, DataWriter dataWriter) { } private void checkMetasRefresh(DataReader dataReader, DataPackage dataPackage) throws Exception { if (dataPackage == null) { return; } // if (!ServerStatus.isCheckMetaUpdateOnTime()) { // return; // } for (PackageItem item: dataPackage) { DataObject dataObject = item.getDataObject(); dataObject.checkFieldMetasReload(); } } private WorkFlowRuntime createWorkflow(DataReader dataReader, DataWriter dataWriter, DataPackage dataPackage, IWorkStep senderStep) throws Exception { WorkFlowRuntime workflowRuntime = new WorkFlowRuntime(dataReader, dataWriter, senderStep); String dataName = (dataPackage != null) ? dataPackage.getName() : null; if (dataName == null) { dataName = (senderStep != null) ? senderStep.getDataName() : null; } String operator = dataReader.getString("operator"); if (Util.isEmpty(operator)) { operator = dataReader.getOperator(); } Events events = (senderStep != null) ? senderStep.getFireEvents() : null; String event = (events != null) ? events.getFirst() : null; //1. 如果当前操作与状态无关,加载主工作流程后返回(不带事件处理的工作流) if (!isDependOnFutureState(dataName, operator)) { workflowRuntime.loadMainSteps(dataName, operator, event, workflowBuckets); return workflowRuntime; } DataStates dataStates = stateProvider.queryToStates(workflowRuntime, dataPackage, operator, dataWriter); //2. 查询对象的目标状态,如果目标状态没有改变,加载主工作流程后返回(不带事件处理的工作流) String toState = dataStates.getState(dataName); if (Util.isEmpty(toState)) { workflowRuntime.loadMainSteps(dataName, operator, event, workflowBuckets); return workflowRuntime; } //3. 加载对象的主工作流程(带事件处理的工作流) workflowRuntime.loadMainSteps(dataName, operator, toState, workflowBuckets); //4. 加载 Host 的事件工作流程 DataPackage host = dataPackage.getHost(); if (host != null) { String hostName = host.getName(); toState = dataStates.getState(hostName); if (Util.isEmpty(toState)) { workflowRuntime.loadReferToSteps(host, Moment.Before, toState, workflowBuckets); workflowRuntime.loadReferToSteps(host, Moment.After, toState, workflowBuckets); } } //5. 返回结果 return workflowRuntime; } private void processWorkflow(WorkFlowRuntime workflow, DataReader dataReader, DataWriter dataWriter) throws Exception { if (workflow == null) { return; } logger.info("exec workflow: " + workflow.toString()); if (workflow.nesessaryTransaction()) { boolean transWorking = false; try { for (ActionContext context: workflow) { if (!transWorking && context.getTransactionScore() > 0) { SQLRunner.beginTrans(); transWorking = true; } if (workflow.isTerminated()) { break; } if (!context.isApprovedByStateEvent()) { continue; } execOneStep(context); } if (!workflow.isTerminated()) { SQLRunner.commit(); } else { SQLRunner.rollback(); logger.error("tase exec fail and trans rollbck, {}", workflow); } } catch(Exception e) { SQLRunner.rollback(); e.printStackTrace(); } } else { for (ActionContext context: workflow) { if (workflow.isTerminated()) { logger.error("tase exec fail and no trans, {}", workflow); break; } if (!context.isApprovedByStateEvent()) { continue; } execOneStep(context); } } dataWriter.addValue("workflow", workflow); } private void execOneStep(ActionContext context) throws Exception { IActionProvider provider = context.getActionProvider(); if (provider == null) { return; } if (provider instanceof ActionProvider) { ActionProvider actionProvider = (ActionProvider)provider; actionProvider.setContext(context); } String method = context.getMethodName(); provider.exec(context, method); } private boolean isDependOnFutureState(String dataName, String operator) { String key = WorkStepKey.getObjectKey(dataName, operator); return stateOperators.contains(key); } public static void registerOneAction(ActionMeta action) { instance.workflowBuckets.registerOneAction(action); } public static void registerOneStep(WorkStep step) { instance.workflowBuckets.registerOneStep(step); } public static void registerStateProvider(IStateProvider stateProvider, Set stateOperators) { instance.stateProvider = stateProvider; instance.stateOperators = stateOperators; } public static void build() { instance.workflowBuckets.build(); } }