package foundation.action;
|
|
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.Method;
|
import java.util.Set;
|
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
|
import foundation.dao.DataPackage;
|
import foundation.dao.DataReader;
|
import foundation.dao.DataWriter;
|
import foundation.dao.Domain;
|
import foundation.dao.PackageItem;
|
import foundation.dao.bizlogic.DataStates;
|
import foundation.dao.bizlogic.IActionProvider;
|
import foundation.dao.bizlogic.IStateProvider;
|
import foundation.data.object.DataObject;
|
import foundation.handler.DataPool;
|
import foundation.handler.ResultPool;
|
import foundation.handler.SingletonHandler;
|
import foundation.persist.SQLRunner;
|
import foundation.route.Operation;
|
import foundation.token.IOnlineUser;
|
import foundation.util.Util;
|
|
public class WorkflowDispatcher extends SingletonHandler {
|
|
protected static Logger logger;
|
private static WorkflowDispatcher instance;
|
private WorkflowBuckets workflowBuckets;
|
private Set<String> stateOperators;
|
private IStateProvider stateProvider;
|
|
static {
|
logger = LogManager.getLogger(WorkflowDispatcher.class);
|
getInstance();
|
}
|
|
private WorkflowDispatcher() {
|
workflowBuckets = new WorkflowBuckets();
|
}
|
|
public static synchronized WorkflowDispatcher getInstance() {
|
if (instance == null) {
|
instance = new WorkflowDispatcher();
|
}
|
|
return instance;
|
}
|
|
@Override
|
protected void dispatch(Operation operation, DataPool dataPool, ResultPool resultPool) throws Exception {
|
//1. 创建DataReader、DataWriter
|
DataReader dataReader = new DataReader(operation, dataPool);
|
DataWriter dataWriter = new DataWriter(resultPool, dataReader.getDomain());
|
|
//2. 获取method
|
String methodName = operation.getOperator();
|
Method method = methodMap.get(methodName);
|
|
//3. 根据配置,运行 Task Actions
|
if (method == null) {
|
execWorkflow(dataReader, dataWriter, null);
|
return;
|
}
|
|
//4. 根据注册的方法,运行特定方法
|
try {
|
method.invoke(this, dataReader, dataWriter);
|
}
|
catch (InvocationTargetException e) {
|
e.printStackTrace();
|
Throwable throwable = e.getTargetException();
|
|
if (throwable == null) {
|
throw e;
|
}
|
else {
|
throw (Exception) throwable;
|
}
|
}
|
}
|
|
@Override
|
protected void publishMethod() {
|
//1. 根据dataName和operator/state得到相应的task
|
addMethod("execWorkflow");
|
|
//2. 重新加载action配置
|
addMethod("reloadConfig");
|
|
//3. 获取 Workflow
|
addMethod("getWorkflow");
|
}
|
|
public void execWorkflow(DataReader dataReader, DataWriter dataWriter, IWorkStep senderStep, Object... contexts) throws Exception {
|
//1. 检查公司冻结状态
|
IOnlineUser user = IOnlineUser.getInstance();
|
if (user.isOrgFrozen()) {
|
dataWriter.reportOneRejectValidation("该公司已经被冻结");
|
}
|
|
//2. 创建DataPackage
|
Domain domain = dataReader.getDomain();
|
DataPackage dataPackage = (domain != null) ? domain.getDataPackage() : null;
|
|
//3. 检查DataPackage配置是否需要更新,如果需要就更新
|
checkMetasRefresh(dataReader, dataPackage);
|
|
//4. 创建 work flow
|
WorkFlowRuntime workflow = createWorkflow(dataReader, dataWriter, dataPackage, senderStep);
|
workflow.setDataPackage(dataPackage);
|
|
if (contexts != null) {
|
for (Object context : contexts ) {
|
workflow.addContext(context);
|
}
|
}
|
|
//5. 检查 work flow 是否有中止标志
|
if (workflow.isTerminated()) {
|
logger.error("tase exec fail, {}", workflow);
|
return;
|
}
|
|
//6. 运行 work flow
|
processWorkflow(workflow, dataReader, dataWriter);
|
}
|
|
protected void getWorkflow(DataReader dataReader, DataWriter dataWriter) throws Exception {
|
//1. 检查公司冻结状态
|
IOnlineUser user = IOnlineUser.getInstance();
|
if (user.isOrgFrozen()) {
|
dataWriter.reportOneRejectValidation("该公司已经被冻结");
|
}
|
|
//2. 创建DataPackage
|
Domain domain = dataReader.getDomain();
|
DataPackage dataPackage = domain.getDataPackage();
|
|
//3. 检查DataPackage配置是否需要更新,如果需要就更新
|
checkMetasRefresh(dataReader, dataPackage);
|
|
//4. 创建 work flow
|
WorkFlowRuntime workflow = createWorkflow(dataReader, dataWriter, dataPackage, null);
|
workflow.setDataPackage(dataPackage);
|
|
//5. 返回
|
dataWriter.addValue(workflow);
|
}
|
|
public void reloadConfig(DataReader dataReader, DataWriter dataWriter) {
|
|
}
|
|
private void checkMetasRefresh(DataReader dataReader, DataPackage dataPackage) throws Exception {
|
if (dataPackage == null) {
|
return;
|
}
|
|
// if (!ServerStatus.isCheckMetaUpdateOnTime()) {
|
// return;
|
// }
|
|
for (PackageItem item: dataPackage) {
|
DataObject dataObject = item.getDataObject();
|
dataObject.checkFieldMetasReload();
|
}
|
}
|
|
private WorkFlowRuntime createWorkflow(DataReader dataReader, DataWriter dataWriter, DataPackage dataPackage, IWorkStep senderStep) throws Exception {
|
WorkFlowRuntime workflowRuntime = new WorkFlowRuntime(dataReader, dataWriter, senderStep);
|
|
String dataName = (dataPackage != null) ? dataPackage.getName() : null;
|
if (dataName == null) {
|
dataName = (senderStep != null) ? senderStep.getDataName() : null;
|
}
|
|
String operator = dataReader.getString("operator");
|
|
if (Util.isEmpty(operator)) {
|
operator = dataReader.getOperator();
|
}
|
|
Events events = (senderStep != null) ? senderStep.getFireEvents() : null;
|
String event = (events != null) ? events.getFirst() : null;
|
|
//1. 如果当前操作与状态无关,加载主工作流程后返回(不带事件处理的工作流)
|
if (!isDependOnFutureState(dataName, operator)) {
|
workflowRuntime.loadMainSteps(dataName, operator, event, workflowBuckets);
|
return workflowRuntime;
|
}
|
|
DataStates dataStates = stateProvider.queryToStates(workflowRuntime, dataPackage, operator, dataWriter);
|
|
//2. 查询对象的目标状态,如果目标状态没有改变,加载主工作流程后返回(不带事件处理的工作流)
|
String toState = dataStates.getState(dataName);
|
|
if (Util.isEmpty(toState)) {
|
workflowRuntime.loadMainSteps(dataName, operator, event, workflowBuckets);
|
return workflowRuntime;
|
}
|
|
//3. 加载对象的主工作流程(带事件处理的工作流)
|
workflowRuntime.loadMainSteps(dataName, operator, toState, workflowBuckets);
|
|
//4. 加载 Host 的事件工作流程
|
DataPackage host = dataPackage.getHost();
|
|
if (host != null) {
|
String hostName = host.getName();
|
toState = dataStates.getState(hostName);
|
|
if (Util.isEmpty(toState)) {
|
workflowRuntime.loadReferToSteps(host, Moment.Before, toState, workflowBuckets);
|
workflowRuntime.loadReferToSteps(host, Moment.After, toState, workflowBuckets);
|
}
|
}
|
|
//5. 返回结果
|
return workflowRuntime;
|
}
|
|
private void processWorkflow(WorkFlowRuntime workflow, DataReader dataReader, DataWriter dataWriter) throws Exception {
|
if (workflow == null) {
|
return;
|
}
|
|
logger.info("exec workflow: " + workflow.toString());
|
|
if (workflow.nesessaryTransaction()) {
|
boolean transWorking = false;
|
|
try {
|
for (ActionContext context: workflow) {
|
if (!transWorking && context.getTransactionScore() > 0) {
|
SQLRunner.beginTrans();
|
transWorking = true;
|
}
|
|
if (workflow.isTerminated()) {
|
break;
|
}
|
|
if (!context.isApprovedByStateEvent()) {
|
continue;
|
}
|
|
execOneStep(context);
|
}
|
|
if (!workflow.isTerminated()) {
|
SQLRunner.commit();
|
}
|
else {
|
SQLRunner.rollback();
|
logger.error("tase exec fail and trans rollbck, {}", workflow);
|
}
|
}
|
catch(Exception e) {
|
SQLRunner.rollback();
|
e.printStackTrace();
|
}
|
}
|
else {
|
for (ActionContext context: workflow) {
|
if (workflow.isTerminated()) {
|
logger.error("tase exec fail and no trans, {}", workflow);
|
break;
|
}
|
|
if (!context.isApprovedByStateEvent()) {
|
continue;
|
}
|
|
execOneStep(context);
|
}
|
}
|
|
dataWriter.addValue("workflow", workflow);
|
}
|
|
private void execOneStep(ActionContext context) throws Exception {
|
IActionProvider provider = context.getActionProvider();
|
|
if (provider == null) {
|
return;
|
}
|
|
if (provider instanceof ActionProvider) {
|
ActionProvider actionProvider = (ActionProvider)provider;
|
actionProvider.setContext(context);
|
}
|
|
String method = context.getMethodName();
|
provider.exec(context, method);
|
}
|
|
private boolean isDependOnFutureState(String dataName, String operator) {
|
String key = WorkStepKey.getObjectKey(dataName, operator);
|
return stateOperators.contains(key);
|
}
|
|
public static void registerOneAction(ActionMeta action) {
|
instance.workflowBuckets.registerOneAction(action);
|
}
|
|
public static void registerOneStep(WorkStep step) {
|
instance.workflowBuckets.registerOneStep(step);
|
}
|
|
public static void registerStateProvider(IStateProvider stateProvider, Set<String> stateOperators) {
|
instance.stateProvider = stateProvider;
|
instance.stateOperators = stateOperators;
|
}
|
|
public static void build() {
|
instance.workflowBuckets.build();
|
}
|
|
}
|