package foundation.icall.callout; import java.sql.ResultSet; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import foundation.action.ActionContext; import foundation.dao.OrderBy; import foundation.dao.Page; import foundation.dao.bizlogic.IActionProvider; import foundation.data.entity.EntitySet; import foundation.data.getter.EntitySetGetter; import foundation.data.mapping.Mappings; import foundation.data.meta.field.FieldsRuntime; import foundation.data.meta.field.TableFieldLoader; import foundation.data.object.DataObject; import foundation.icall.ICall; import foundation.icall.OutboundResult; import foundation.icall.RemoteContext; import foundation.icall.RunType; import foundation.icall.log.ICallLogger; import foundation.icall.log.LogRecord; import foundation.icall.stamp.Stamp; import foundation.io.IOContext; import foundation.io.define.DataIOContainer; import foundation.io.define.IOTask; import foundation.io.define.IOWorkStep; import foundation.io.define.IOWorkflow; import foundation.io.engine.IOEngine; import foundation.persist.IStepLoadable; import foundation.persist.NamedSQL; import foundation.persist.SQLRunner; import foundation.persist.source.NamedDataSource; import foundation.util.Util; public class RemoteDBCallProvider implements IActionProvider, IStepLoadable { protected static Logger logger; protected static int Page_Size = 5000; protected ActionContext context; protected RemoteContext remoteContext; protected ICall iCall; protected NamedDataSource dataSource; protected DataObject mirrorDataObject; protected DataObject historyDataObject; protected FieldsRuntime remoteFields; protected Stamp stamp; protected NamedSQL getDataSQL; protected Page page; protected int totalLoaded; protected int lastLoaded; protected int workingCount; static { logger = LogManager.getLogger(RemoteDBCallProvider.class); } public RemoteDBCallProvider() { stamp = new Stamp(); } @Override public void exec(ActionContext context, String method) throws Exception { doExec(context); } @Override public void test(ActionContext context, String method) throws Exception { logger.info("icall {} test", iCall.getName()); } public void doExec(ActionContext context) throws Exception { this.context = context; this.remoteContext = context.getContext(RemoteContext.class); this.iCall = remoteContext.getICall(); String runType = context.getParam("run_type", RunType.Normal.name()); //1. 初始化 dataSource = remoteContext.getRemoteSource(); mirrorDataObject = DataObject.getInstance(iCall.getDataName()); historyDataObject = DataObject.getInstance(iCall.getHistoryTableName()); OutboundResult result = remoteContext.getOutboundResult(); LogRecord record = result.getRecord(); record.setUrl(iCall.getURL()); record.setRequestBody(iCall.getName()); ICallLogger.beginAction(record, runType); stamp.init(iCall, dataSource); //2. 清空表/镜像表 mirrorDataObject.emptyTable(); //3. 获取时间戳,如果是全量获取,需要清空历史表里面的数据 stamp.load(); if (stamp.isNeedClearHistory()) { historyDataObject.emptyTable(); } //4. 分批次获取数据 loadRemoteDataToLocalMirror(); //5. 删除重复数据(时间戳一样,ID一样) deleteDoubleMirrorData(); //6. 保存本次数据到业务表 moveLocalMirrorToTarget(); //7. 保存本次数据到历史表 saveLocalMirrorToHistory(); //8. 保存时间戳 stamp.saveLocal(); //9. 输出总条数 String response = "从[" + iCall.getURL() + "]共获取 " + Math.max(totalLoaded, 0) + "条记录,排重生效" + workingCount + "条"; logger.debug(response); result.setResponse(response); remoteContext.setOutboundResult(result); } protected void loadRemoteDataToLocalMirror() throws Exception { //1. 循环取数前初始化分页和继续加载标识 totalLoaded = 0; lastLoaded = -1; workingCount = 0; page = new Page(dataSource.getDBaseType()); page.setPageSize(Page_Size); //2. 根据是否有时间戳来获取取数SQL String remoteTable = iCall.getURL(); String remoteFilter = iCall.getRemoteFilter(); String stampField = iCall.getStampField(); String orderByField = iCall.getOrderByField(); if (!stamp.isActive()) { getDataSQL = NamedSQL.getInstance(dataSource.getDBaseType(), "getRemoteDataNoStep"); } else { if ("oa-3-dmsorder".equalsIgnoreCase(iCall.getId())) { getDataSQL = NamedSQL.getInstance(dataSource.getDBaseType(), "getRemoteDataByStep_OA"); getDataSQL.setParam("timeStamp", stamp.getLocalValue(), false); } else { getDataSQL = NamedSQL.getInstance(dataSource.getDBaseType(), "getRemoteDataByStep"); getDataSQL.setParam("timeStamp", stamp.getLocalValueString()); } getDataSQL.setParam("fieldName", stampField); } //3. 设置初始参数 getDataSQL.setTableName(remoteTable); getDataSQL.setFilter(remoteFilter); getDataSQL.setOrderBy(new OrderBy(orderByField, " asc")); if (iCall.isPageActive()) { getDataSQL.setPage(page); } else { getDataSQL.clearPage(); } //4. 循环(分页)取数 SQLRunner.getData(dataSource, this); } private void deleteDoubleMirrorData() throws Exception { //1. 如果没有时间戳,就不需要排重 if (!stamp.isActive()) { workingCount = totalLoaded; logger.debug("时间戳没有启用,不需要排重"); return; } //2. 获取ID字段(可能是多个字段) String mirrorTable = mirrorDataObject.getTableName(); String historyTable = iCall.getHistoryTableName(); Fields idFields = iCall.getIdFields(); String idFieldsString = idFields.getFieldPairsString(mirrorTable, historyTable); //3. 删除重复数据 NamedSQL namedSQL = NamedSQL.getInstance("deleteDoubleMirror"); namedSQL.setParam("mirrorTableName" , mirrorTable); namedSQL.setParam("historyTableName" , historyTable); namedSQL.setParam("idFieldPairs" , idFieldsString); namedSQL.setParam("stampField" , iCall.getStampField()); int cnt = namedSQL.execute(); logger.debug("delete double mirror data {}", cnt); //3. 获取剩余数据条数 workingCount = mirrorDataObject.getCount(); } protected void moveLocalMirrorToTarget() throws Exception { String taskName = iCall.getIOTaskName(); //1. 如果没有定义 搬运 if (Util.isEmpty(taskName)) { logger.debug("没有配置搬运程序,将mirror数据搬运到业务表跳过"); return; } //2.运行IO Task -- 运行数据搬运 IOWorkflow ioWorkflow = IOWorkflow.getInstance(taskName); if (ioWorkflow != null) { for (IOWorkStep step: ioWorkflow) { step.exec(); } return; } //3.运行IO Task -- 运行数据搬运 DataIOContainer ioContainer = DataIOContainer.getInstance(); IOTask ioTask = ioContainer.getIOTask(taskName); if (ioTask != null) { IOEngine ioEngine = new IOEngine(); ioEngine.init(context.getDataReader(), context.getDataWriter(), ioTask, new IOContext()); ioEngine.runTableToTables(); } } private void saveLocalMirrorToHistory() throws Exception { //1. 根据表结构,获取字段列表 FieldsRuntime historyFields = historyDataObject.getTableFieldMetas(); FieldsRuntime mirrorFields = mirrorDataObject.getTableFieldMetas(); Mappings mappings = Mappings.getInstance(mirrorFields, historyFields); String fieldNames = mappings.getFromFieldsString(); //2. 搬运数据 NamedSQL namedSQL = NamedSQL.getInstance("moveMirrorToHistory"); namedSQL.setParam("historyTableName", historyDataObject.getTableName()); namedSQL.setParam("mirrorTableName", mirrorDataObject.getTableName()); namedSQL.setParam("fieldNames", fieldNames); int cnt = namedSQL.execute(); logger.debug("move mirror to history {}", cnt); } @Override public boolean hasNextForLoad() { return (lastLoaded < 0) || ((lastLoaded >= Page_Size) && iCall.isPageActive()); } @Override public NamedSQL getStepSQL() { if (iCall.isPageActive()) { getDataSQL.setPage(page); } else { getDataSQL.clearPage(); } return getDataSQL; } @Override public void loadResultSet(ResultSet rslt, Object... args) throws Exception { try { //1. 获取远程数据库的表字段 if (remoteFields == null) { String rometeTable = iCall.getURL(); TableFieldLoader fieldsLoader = new TableFieldLoader(dataSource, rometeTable); remoteFields = fieldsLoader.getFieldsByTableResult(); } //2. 加载数据到 EntitySet EntitySetGetter getter = new EntitySetGetter(remoteFields); getter.load(rslt); EntitySet entitySet = getter.getDataSet(); //3. 记录插入数量 lastLoaded = entitySet.size(); totalLoaded = totalLoaded + lastLoaded; //4. 将本次数据保存到表中 mirrorDataObject.batchInsertEntitySet(entitySet); } finally { page.next(); } } }