package foundation.workflow;
|
|
import java.lang.reflect.InvocationTargetException;
|
import java.lang.reflect.Method;
|
import java.util.List;
|
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
|
import foundation.dao.DataPackage;
|
import foundation.dao.DataReader;
|
import foundation.dao.DataSource;
|
import foundation.dao.DataWriter;
|
import foundation.dao.Domain;
|
import foundation.dao.OperatorCode;
|
import foundation.dao.PackageItem;
|
import foundation.dao.bizlogic.IActionProvider;
|
import foundation.data.entity.Entity;
|
import foundation.data.object.DataObject;
|
import foundation.handler.DataPool;
|
import foundation.handler.ResultPool;
|
import foundation.handler.SingletonHandler;
|
import foundation.handler.SystemMethodBucket;
|
import foundation.persist.SQLRunner;
|
import foundation.route.Operation;
|
|
public class WorkflowDispatcher extends SingletonHandler {
|
|
protected static Logger logger;
|
private static WorkflowDispatcher instance;
|
private WorkflowBuckets workflowBuckets;
|
|
static {
|
logger = LogManager.getLogger(WorkflowDispatcher.class);
|
getInstance();
|
}
|
|
private WorkflowDispatcher() {
|
workflowBuckets = new WorkflowBuckets();
|
}
|
|
public static synchronized WorkflowDispatcher getInstance() {
|
if (instance == null) {
|
instance = new WorkflowDispatcher();
|
}
|
|
return instance;
|
}
|
|
@Override
|
protected void dispatch(Operation operation, DataPool dataPool, ResultPool resultPool) throws Exception {
|
//1. 创建DataReader、DataWriter
|
DataReader dataReader = new DataReader(operation, dataPool);
|
DataWriter dataWriter = new DataWriter(resultPool, dataReader.getDomain());
|
|
//2. 获取method
|
String methodName = operation.getOperator();
|
Method method = methodMap.get(methodName);
|
|
//3. 根据配置,运行 Task Actions
|
if (method == null) {
|
execWorkflow(dataReader, dataWriter, null);
|
return;
|
}
|
|
//4. 根据注册的方法,运行特定方法
|
try {
|
method.invoke(this, dataReader, dataWriter);
|
}
|
catch (InvocationTargetException e) {
|
e.printStackTrace();
|
Throwable throwable = e.getTargetException();
|
|
if (throwable == null) {
|
throw e;
|
}
|
else {
|
throw (Exception) throwable;
|
}
|
}
|
}
|
|
@Override
|
protected void publishMethod() {
|
//1. 根据dataName和operator/state得到相应的task
|
addMethod("execWorkflow");
|
|
//2. 获取 Workflow
|
addMethod("simulateWorkflow");
|
|
//3. 重新加载action配置
|
addMethod("reloadOneAction");
|
|
//4. 重新加载workflow
|
addMethod("reloadOneStep");
|
|
//5. 获取系统的action列表
|
addMethod("getActions");
|
|
//6. 获取系统的操作列表
|
addMethod("getOperators");
|
}
|
|
public void execWorkflow(DataReader dataReader, DataWriter dataWriter, IWorkStep senderStep, Object... steps) throws Exception {
|
//1. 创建DataPackage
|
Domain domain = dataReader.getDomain();
|
DataPackage dataPackage = (domain != null) ? domain.getDataPackage() : null;
|
|
//2. 检查DataPackage配置是否需要更新,如果需要就更新
|
checkMetasRefresh(dataReader, dataPackage);
|
|
//3. 创建 work flow
|
WorkFlowRuntime workflow = createWorkflow(dataReader, dataWriter, dataPackage);
|
workflow.setDataPackage(dataPackage);
|
|
//4. 运行 work flow
|
processWorkflow(workflow, dataReader, dataWriter);
|
|
//5. 返回
|
dataWriter.addValue("workflow", workflow);
|
}
|
|
protected void simulateWorkflow(DataReader dataReader, DataWriter dataWriter) throws Exception {
|
//1. 创建DataPackage
|
Domain domain = dataReader.getDomain();
|
DataPackage dataPackage = (domain != null) ? domain.getDataPackage() : null;
|
|
//2. 检查DataPackage配置是否需要更新,如果需要就更新
|
checkMetasRefresh(dataReader, dataPackage);
|
|
//3. 创建 work flow
|
WorkFlowRuntime workflow = createWorkflow(dataReader, dataWriter, dataPackage);
|
workflow.setDataPackage(dataPackage);
|
workflow.setSimulate(true);
|
|
//4. 拟执行Workflow
|
processWorkflow(workflow, dataReader, dataWriter);
|
|
//5. 返回
|
dataWriter.addValue("workflow", workflow);
|
}
|
|
public void reloadOneAction(DataReader dataReader, DataWriter dataWriter) throws Exception {
|
Domain domain = dataReader.getDomain();
|
DataPackage dataPackage = (domain != null) ? domain.getDataPackage() : null;
|
Entity master = dataPackage.getMasterEntity();
|
|
//1. 加载action
|
ActionMeta meta = new ActionMeta();
|
meta.load(master);
|
if (!meta.checkValid()) {
|
return ;
|
}
|
|
registerOneAction(meta);
|
|
//2. 保存进数据库
|
dataPackage.saveOneDataToDB(DataSource.Request);
|
|
//3. 更新workflow
|
WorkflowLoader.reloadAction(meta.getName());
|
}
|
|
public void reloadOneStep(DataReader dataReader, DataWriter dataWriter) throws Exception {
|
Domain domain = dataReader.getDomain();
|
DataPackage dataPackage = (domain != null) ? domain.getDataPackage() : null;
|
dataPackage.saveOneDataToDB(DataSource.Request);
|
Entity master = dataPackage.getMasterEntity();
|
|
WorkStepMeta step = new WorkStepMeta();
|
step.load(master);
|
registerOneStep(step);
|
}
|
|
public void getActions(DataReader dataReader, DataWriter dataWriter) throws Exception {
|
SystemMethodBucket bucket = SystemMethodBucket.getInstance();
|
dataWriter.addValue(bucket);
|
}
|
|
public void getOperators(DataReader dataReader, DataWriter dataWriter) throws Exception {
|
dataWriter.addValue("operators", OperatorCode.getList());
|
}
|
|
private void checkMetasRefresh(DataReader dataReader, DataPackage dataPackage) throws Exception {
|
if (dataPackage == null) {
|
return;
|
}
|
|
// if (!ServerStatus.isCheckMetaUpdateOnTime()) {
|
// return;
|
// }
|
|
for (PackageItem item: dataPackage) {
|
DataObject dataObject = item.getDataObject();
|
dataObject.checkFieldMetasReload();
|
}
|
}
|
|
private WorkFlowRuntime createWorkflow(DataReader dataReader, DataWriter dataWriter, DataPackage dataPackage) throws Exception {
|
WorkFlowRuntime workflowRuntime = new WorkFlowRuntime(dataReader, dataWriter);
|
|
String dataName = (dataPackage != null) ? dataPackage.getName() : null;
|
String operator = dataReader.getOperator();
|
|
workflowRuntime.loadSteps(dataName, operator, workflowBuckets);
|
|
return workflowRuntime;
|
}
|
|
private void processWorkflow(WorkFlowRuntime workflow, DataReader dataReader, DataWriter dataWriter) throws Exception{
|
if (workflow == null) {
|
return;
|
}
|
|
logger.info("exec workflow: " + workflow.toString());
|
|
if (workflow.nesessaryTransaction()) {
|
boolean transWorking = false;
|
|
try {
|
for (WorkStep step: workflow) {
|
if (!transWorking && step.getTransactionScore() > 0) {
|
SQLRunner.beginTrans();
|
transWorking = true;
|
}
|
|
if (workflow.isTerminated()) {
|
break;
|
}
|
|
execOneStep(workflow.isSimulate(), step);
|
}
|
|
if (!workflow.isTerminated()) {
|
SQLRunner.commit();
|
}
|
else {
|
SQLRunner.rollback();
|
logger.error("tase exec fail and trans rollbck, {}", workflow);
|
}
|
}
|
catch(Exception e) {
|
SQLRunner.rollback();
|
}
|
}
|
else {
|
for (WorkStep step: workflow) {
|
if (workflow.isTerminated()) {
|
logger.error("tase exec fail and no trans, {}", workflow);
|
break;
|
}
|
|
execOneStep(workflow.isSimulate(), step);
|
}
|
}
|
}
|
|
public static void onFireEvent(WorkStep step, Event event) throws Exception {
|
//1. get children steps
|
List<WorkStepMeta> childMetas = instance.workflowBuckets.getEventSteps(event);
|
step.addChildren(childMetas);
|
|
List<WorkStep> children = step.getChildren();
|
if (children == null) {
|
return;
|
}
|
|
//2. execute children
|
while (step.getCurrentStepNo() < children.size()){
|
WorkStep child = children.get(step.getCurrentStepNo());
|
WorkFlowRuntime workflow = step.getWorkFlow();
|
|
if (workflow.isTerminated()) {
|
logger.error("tase exec fail and no trans, {}", workflow);
|
break;
|
}
|
|
child.setEvent(event);
|
execOneStep(workflow.isSimulate(), child);
|
|
step.goNextStep();
|
}
|
}
|
|
private static void execOneStep(boolean simulate, WorkStep step) throws Exception{
|
IActionProvider provider = step.getActionProvider();
|
|
if (provider == null) {
|
return ;
|
}
|
|
if (provider instanceof ActionProvider) {
|
ActionProvider actionProvider = (ActionProvider)provider;
|
actionProvider.setStep(step);
|
}
|
|
String method = step.getMethodName();
|
|
if (!simulate) {
|
provider.exec(step, method);
|
}
|
|
if (step.isTerminate()) {
|
throw new Exception("tase exec fail and no trans" + step.getActionName());
|
}
|
}
|
|
public static void registerOneAction(ActionMeta action) {
|
instance.workflowBuckets.registerOneAction(action);
|
}
|
|
public static void registerOneStep(WorkStepMeta step) {
|
instance.workflowBuckets.registerOneStep(step);
|
}
|
|
public static void build() {
|
instance.workflowBuckets.build();
|
}
|
|
public WorkflowBuckets getWorkflowBuckets() {
|
return instance.workflowBuckets;
|
}
|
|
}
|