package foundation.icall.callout;
|
|
import java.sql.ResultSet;
|
|
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.Logger;
|
|
import foundation.action.ActionContext;
|
import foundation.dao.OrderBy;
|
import foundation.dao.Page;
|
import foundation.dao.bizlogic.IActionProvider;
|
import foundation.data.entity.EntitySet;
|
import foundation.data.getter.EntitySetGetter;
|
import foundation.data.mapping.Mappings;
|
import foundation.data.meta.field.FieldsRuntime;
|
import foundation.data.meta.field.TableFieldLoader;
|
import foundation.data.object.DataObject;
|
import foundation.icall.ICall;
|
import foundation.icall.OutboundResult;
|
import foundation.icall.RemoteContext;
|
import foundation.icall.RunType;
|
import foundation.icall.log.ICallLogger;
|
import foundation.icall.log.LogRecord;
|
import foundation.icall.stamp.Stamp;
|
import foundation.io.IOContext;
|
import foundation.io.define.DataIOContainer;
|
import foundation.io.define.IOTask;
|
import foundation.io.define.IOWorkStep;
|
import foundation.io.define.IOWorkflow;
|
import foundation.io.engine.IOEngine;
|
import foundation.persist.IStepLoadable;
|
import foundation.persist.NamedSQL;
|
import foundation.persist.SQLRunner;
|
import foundation.persist.source.NamedDataSource;
|
import foundation.util.Util;
|
|
public class RemoteDBCallProvider implements IActionProvider, IStepLoadable {
|
|
protected static Logger logger;
|
protected static int Page_Size = 5000;
|
protected ActionContext context;
|
protected RemoteContext remoteContext;
|
protected ICall iCall;
|
protected NamedDataSource dataSource;
|
protected DataObject mirrorDataObject;
|
protected DataObject historyDataObject;
|
protected FieldsRuntime remoteFields;
|
protected Stamp stamp;
|
protected NamedSQL getDataSQL;
|
protected Page page;
|
protected int totalLoaded;
|
protected int lastLoaded;
|
protected int workingCount;
|
|
static {
|
logger = LogManager.getLogger(RemoteDBCallProvider.class);
|
}
|
|
public RemoteDBCallProvider() {
|
stamp = new Stamp();
|
}
|
|
@Override
|
public void exec(ActionContext context, String method) throws Exception {
|
doExec(context);
|
}
|
|
@Override
|
public void test(ActionContext context, String method) throws Exception {
|
logger.info("icall {} test", iCall.getName());
|
}
|
|
public void doExec(ActionContext context) throws Exception {
|
this.context = context;
|
this.remoteContext = context.getContext(RemoteContext.class);
|
this.iCall = remoteContext.getICall();
|
String runType = context.getParam("run_type", RunType.Normal.name());
|
|
//1. 初始化
|
dataSource = remoteContext.getRemoteSource();
|
mirrorDataObject = DataObject.getInstance(iCall.getDataName());
|
historyDataObject = DataObject.getInstance(iCall.getHistoryTableName());
|
|
OutboundResult result = remoteContext.getOutboundResult();
|
LogRecord record = result.getRecord();
|
record.setUrl(iCall.getURL());
|
record.setRequestBody(iCall.getName());
|
ICallLogger.beginAction(record, runType);
|
|
stamp.init(iCall, dataSource);
|
|
//2. 清空表/镜像表
|
mirrorDataObject.emptyTable();
|
|
//3. 获取时间戳,如果是全量获取,需要清空历史表里面的数据
|
stamp.load();
|
|
if (stamp.isNeedClearHistory()) {
|
historyDataObject.emptyTable();
|
}
|
|
//4. 分批次获取数据
|
loadRemoteDataToLocalMirror();
|
|
//5. 删除重复数据(时间戳一样,ID一样)
|
deleteDoubleMirrorData();
|
|
//6. 保存本次数据到业务表
|
moveLocalMirrorToTarget();
|
|
//7. 保存本次数据到历史表
|
saveLocalMirrorToHistory();
|
|
//8. 保存时间戳
|
stamp.saveLocal();
|
|
//9. 输出总条数
|
String response = "从[" + iCall.getURL() + "]共获取 " + Math.max(totalLoaded, 0) + "条记录,排重生效" + workingCount + "条";
|
logger.debug(response);
|
|
result.setResponse(response);
|
remoteContext.setOutboundResult(result);
|
|
}
|
|
protected void loadRemoteDataToLocalMirror() throws Exception {
|
//1. 循环取数前初始化分页和继续加载标识
|
totalLoaded = 0;
|
lastLoaded = -1;
|
workingCount = 0;
|
page = new Page(dataSource.getDBaseType());
|
page.setPageSize(Page_Size);
|
|
//2. 根据是否有时间戳来获取取数SQL
|
String remoteTable = iCall.getURL();
|
String remoteFilter = iCall.getRemoteFilter();
|
String stampField = iCall.getStampField();
|
String orderByField = iCall.getOrderByField();
|
|
if (!stamp.isActive()) {
|
getDataSQL = NamedSQL.getInstance(dataSource.getDBaseType(), "getRemoteDataNoStep");
|
}
|
else {
|
if ("oa-3-dmsorder".equalsIgnoreCase(iCall.getId())) {
|
getDataSQL = NamedSQL.getInstance(dataSource.getDBaseType(), "getRemoteDataByStep_OA");
|
getDataSQL.setParam("timeStamp", stamp.getLocalValue(), false);
|
}
|
else {
|
getDataSQL = NamedSQL.getInstance(dataSource.getDBaseType(), "getRemoteDataByStep");
|
getDataSQL.setParam("timeStamp", stamp.getLocalValueString());
|
}
|
|
getDataSQL.setParam("fieldName", stampField);
|
|
}
|
|
//3. 设置初始参数
|
getDataSQL.setTableName(remoteTable);
|
getDataSQL.setFilter(remoteFilter);
|
getDataSQL.setOrderBy(new OrderBy(orderByField, " asc"));
|
|
if (iCall.isPageActive()) {
|
getDataSQL.setPage(page);
|
}
|
else {
|
getDataSQL.clearPage();
|
}
|
|
//4. 循环(分页)取数
|
SQLRunner.getData(dataSource, this);
|
}
|
|
private void deleteDoubleMirrorData() throws Exception {
|
//1. 如果没有时间戳,就不需要排重
|
if (!stamp.isActive()) {
|
workingCount = totalLoaded;
|
logger.debug("时间戳没有启用,不需要排重");
|
return;
|
}
|
|
//2. 获取ID字段(可能是多个字段)
|
String mirrorTable = mirrorDataObject.getTableName();
|
String historyTable = iCall.getHistoryTableName();
|
|
Fields idFields = iCall.getIdFields();
|
String idFieldsString = idFields.getFieldPairsString(mirrorTable, historyTable);
|
|
//3. 删除重复数据
|
NamedSQL namedSQL = NamedSQL.getInstance("deleteDoubleMirror");
|
namedSQL.setParam("mirrorTableName" , mirrorTable);
|
namedSQL.setParam("historyTableName" , historyTable);
|
namedSQL.setParam("idFieldPairs" , idFieldsString);
|
namedSQL.setParam("stampField" , iCall.getStampField());
|
|
int cnt = namedSQL.execute();
|
logger.debug("delete double mirror data {}", cnt);
|
|
//3. 获取剩余数据条数
|
workingCount = mirrorDataObject.getCount();
|
}
|
|
protected void moveLocalMirrorToTarget() throws Exception {
|
String taskName = iCall.getIOTaskName();
|
|
//1. 如果没有定义 搬运
|
if (Util.isEmpty(taskName)) {
|
logger.debug("没有配置搬运程序,将mirror数据搬运到业务表跳过");
|
return;
|
}
|
|
//2.运行IO Task -- 运行数据搬运
|
IOWorkflow ioWorkflow = IOWorkflow.getInstance(taskName);
|
|
if (ioWorkflow != null) {
|
for (IOWorkStep step: ioWorkflow) {
|
step.exec();
|
}
|
|
return;
|
}
|
|
//3.运行IO Task -- 运行数据搬运
|
DataIOContainer ioContainer = DataIOContainer.getInstance();
|
IOTask ioTask = ioContainer.getIOTask(taskName);
|
|
if (ioTask != null) {
|
IOEngine ioEngine = new IOEngine();
|
ioEngine.init(context.getDataReader(), context.getDataWriter(), ioTask, new IOContext());
|
|
ioEngine.runTableToTables();
|
}
|
}
|
|
private void saveLocalMirrorToHistory() throws Exception {
|
//1. 根据表结构,获取字段列表
|
FieldsRuntime historyFields = historyDataObject.getTableFieldMetas();
|
FieldsRuntime mirrorFields = mirrorDataObject.getTableFieldMetas();
|
Mappings mappings = Mappings.getInstance(mirrorFields, historyFields);
|
String fieldNames = mappings.getFromFieldsString();
|
|
//2. 搬运数据
|
NamedSQL namedSQL = NamedSQL.getInstance("moveMirrorToHistory");
|
namedSQL.setParam("historyTableName", historyDataObject.getTableName());
|
namedSQL.setParam("mirrorTableName", mirrorDataObject.getTableName());
|
namedSQL.setParam("fieldNames", fieldNames);
|
|
int cnt = namedSQL.execute();
|
logger.debug("move mirror to history {}", cnt);
|
}
|
|
@Override
|
public boolean hasNextForLoad() {
|
return (lastLoaded < 0) || ((lastLoaded >= Page_Size) && iCall.isPageActive());
|
}
|
|
@Override
|
public NamedSQL getStepSQL() {
|
if (iCall.isPageActive()) {
|
getDataSQL.setPage(page);
|
}
|
else {
|
getDataSQL.clearPage();
|
}
|
|
return getDataSQL;
|
}
|
|
@Override
|
public void loadResultSet(ResultSet rslt, Object... args) throws Exception {
|
try {
|
//1. 获取远程数据库的表字段
|
if (remoteFields == null) {
|
String rometeTable = iCall.getURL();
|
TableFieldLoader fieldsLoader = new TableFieldLoader(dataSource, rometeTable);
|
remoteFields = fieldsLoader.getFieldsByTableResult();
|
}
|
|
//2. 加载数据到 EntitySet
|
EntitySetGetter getter = new EntitySetGetter(remoteFields);
|
getter.load(rslt);
|
|
EntitySet entitySet = getter.getDataSet();
|
|
//3. 记录插入数量
|
lastLoaded = entitySet.size();
|
totalLoaded = totalLoaded + lastLoaded;
|
|
//4. 将本次数据保存到表中
|
mirrorDataObject.batchInsertEntitySet(entitySet);
|
}
|
finally {
|
page.next();
|
}
|
}
|
|
}
|